Compare commits

...

6 Commits

Author SHA1 Message Date
Luca Cominardi
f154c96b75 feat: expose IndexMerger and SegmentDocIdMapping for external doc-order control
Add `merge_segments_with_doc_id_mapping`, a public variant of
`merge_filtered_segments` that accepts a caller-supplied `SegmentDocIdMapping`
so that consumers can drive the final document order without a persistent sort
field.  Also expose `IndexMerger`, `SegmentDocIdMapping`, and the new
`write_with_doc_id_mapping` method in the public API.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-22 10:44:40 +02:00
Mithun Chicklore Yogendra
92b519d177 fix: preserve NULL ordering during numeric segment merges (#106)
The merger used first_or_default_col(0u64) for numeric sort fields,
making NULLs indistinguishable from zero during the k-way merge and
interleaving them with zeros. Use Column<u64>::first() (Option<u64>)
and match None/Some like the Str/Bytes path, and account for nullable
columns in the disjunct-stacking check.
2026-06-18 12:11:53 +02:00
Mithun Chicklore Yogendra
c09eacb994 fix: use native typed comparison for numeric sort keys (#105)
ColumnarWriter::sort_order() computed the sort key as
`f64::coerce(nv) as f32`, which has only 24 bits of mantissa: values
above 2^24 could collide (e.g. 16_777_216 and 16_777_217), and the 0.0
default made NULL indistinguishable from zero. Compare each
NumericalValue in its native type (u64/i64 cmp, f64 total_cmp), with
NULL ordered separately from Some(0).
2026-06-18 12:11:53 +02:00
Mithun Chicklore Yogendra
b59fac74bb feat: enable sort_by for Str/Bytes fast fields (#101)
Adds support for sorting by Str and Bytes fast fields on both
single-segment writes and cross-segment merges. Dictionary-encoded
fields use per-segment ordinals, so segment dictionaries are merged via
the columnar TermMerger to compute per-segment local-ord -> global-ord
mappings; the remapped u64 ordinals are then compared during kmerge.
2026-06-18 12:11:53 +02:00
Stu Hood
47ff79e2fc feat: Restore sort by field (#92)
Restores index sorting: an index can be physically ordered by a fast
field (IndexSettings::sort_by_field), applied both on single-segment
writes (stream to TempStore, then resort into Store) and on
cross-segment merges (disjunct fast path, otherwise a full k-merge with
a generated doc-id mapping).

Motivation (downstream join / sortedness work):
* https://github.com/paradedb/paradedb/issues/2997
* https://github.com/paradedb/paradedb/issues/3053

Re-expressed against current upstream APIs: no merge CancelSentinel,
MergeOptimizedInvertedIndexReader, or ignore_store plumbing.
2026-06-18 12:11:53 +02:00
Ming Ying
7b90642011 foundation: restore SegmentComponent::TempStore (revert #2815)
Index sorting streams documents to a temporary store before resorting
them into the final Store. Upstream #2815 ("Remove temp file") removed
SegmentComponent::TempStore as part of the index-sorting removal
cleanup; this restores it (the upstream-original mechanism, including
the include_temp_doc_store garbage-collection tracking).

This also subsumes paradedb/tantivy #104 ("Index sort test in CI"),
whose fix was re-adding TempStore to the component iterator and the
list_files GC handling.
2026-06-18 12:11:52 +02:00
40 changed files with 3556 additions and 372 deletions

View File

@@ -28,7 +28,7 @@ fn get_test_columns() -> Columns {
}
let mut buffer: Vec<u8> = Vec::new();
dataframe_writer
.serialize(data.len() as u32, &mut buffer)
.serialize(data.len() as u32, None, &mut buffer)
.unwrap();
let columnar = ColumnarReader::open(buffer).unwrap();

View File

@@ -54,6 +54,6 @@ pub fn generate_columnar_with_name(card: Card, num_docs: u32, column_name: &str)
}
let mut wrt: Vec<u8> = Vec::new();
columnar_writer.serialize(num_docs, &mut wrt).unwrap();
columnar_writer.serialize(num_docs, None, &mut wrt).unwrap();
ColumnarReader::open(wrt).unwrap()
}

View File

@@ -375,7 +375,7 @@ mod tests {
columnar_writer.record_numerical(5, "full", u64::MAX);
let mut wrt: Vec<u8> = Vec::new();
columnar_writer.serialize(7, &mut wrt).unwrap();
columnar_writer.serialize(7, None, &mut wrt).unwrap();
let reader = ColumnarReader::open(wrt).unwrap();
// Open the column as u64

View File

@@ -15,7 +15,9 @@ fn test_optional_index_with_num_docs(num_docs: u32) {
let mut dataframe_writer = ColumnarWriter::default();
dataframe_writer.record_numerical(100, "score", 80i64);
let mut buffer: Vec<u8> = Vec::new();
dataframe_writer.serialize(num_docs, &mut buffer).unwrap();
dataframe_writer
.serialize(num_docs, None, &mut buffer)
.unwrap();
let columnar = ColumnarReader::open(buffer).unwrap();
assert_eq!(columnar.num_columns(), 1);
let cols: Vec<DynamicColumnHandle> = columnar.read_columns("score").unwrap();

View File

@@ -33,6 +33,25 @@ pub fn merge_bytes_or_str_column(
Ok(())
}
/// Computes a per-segment mapping from old term ordinal to merged term ordinal.
///
/// Performs a streaming k-way merge of per-segment term dictionaries (SSTable-backed) to build
/// a unified ordering. For each segment, the output is a `Vec<TermOrdinal>` where index `i`
/// holds the merged global ordinal corresponding to segment-local ordinal `i`.
///
/// This is used by index sorting to compare terms from different segments without materializing
/// term bytes in memory — only ordinals are compared.
#[doc(hidden)]
pub fn compute_merged_term_ord_mapping(
bytes_columns: &[BytesColumn],
) -> io::Result<Vec<Vec<TermOrdinal>>> {
let bytes_columns_opt: Vec<Option<BytesColumn>> =
bytes_columns.iter().cloned().map(Some).collect();
let term_ord_mapping =
merge_dict_and_compute_term_ord_mapping(&bytes_columns_opt, |_| true, |_| Ok(()))?;
Ok(term_ord_mapping.into_per_segment_new_term_ordinals())
}
struct RemappedTermOrdinalsValues<'a> {
bytes_columns: &'a [Option<BytesColumn>],
term_ord_mapping: &'a TermOrdinalMapping,
@@ -118,14 +137,14 @@ fn is_term_present(bitsets: &[Option<BitSet>], term_merger: &TermMerger) -> bool
false
}
fn serialize_merged_dict(
fn merge_dict_and_compute_term_ord_mapping(
bytes_columns: &[Option<BytesColumn>],
merge_row_order: &MergeRowOrder,
output: &mut impl Write,
mut should_keep_term: impl FnMut(&TermMerger) -> bool,
mut emit_term: impl FnMut(&[u8]) -> io::Result<()>,
) -> io::Result<TermOrdinalMapping> {
let mut term_ord_mapping = TermOrdinalMapping::default();
let mut field_term_streams = Vec::new();
let mut field_term_streams = Vec::with_capacity(bytes_columns.len());
for (segment_ord, column_opt) in bytes_columns.iter().enumerate() {
if let Some(column) = column_opt {
term_ord_mapping.add_segment(column.dictionary.num_terms());
@@ -141,21 +160,33 @@ fn serialize_merged_dict(
}
let mut merged_terms = TermMerger::new(field_term_streams);
let mut sstable_builder = sstable::VoidSSTable::writer(output);
match merge_row_order {
MergeRowOrder::Stack(_) => {
let mut current_term_ord = 0;
while merged_terms.advance() {
let term_bytes: &[u8] = merged_terms.key();
sstable_builder.insert(term_bytes, &())?;
for (segment_ord, from_term_ord) in merged_terms.matching_segments() {
term_ord_mapping.register_from_to(segment_ord, from_term_ord, current_term_ord);
}
current_term_ord += 1;
}
sstable_builder.finish()?;
let mut current_term_ord = 0;
while merged_terms.advance() {
if !should_keep_term(&merged_terms) {
continue;
}
emit_term(merged_terms.key())?;
for (segment_ord, from_term_ord) in merged_terms.matching_segments() {
term_ord_mapping.register_from_to(segment_ord, from_term_ord, current_term_ord);
}
current_term_ord += 1;
}
Ok(term_ord_mapping)
}
fn serialize_merged_dict(
bytes_columns: &[Option<BytesColumn>],
merge_row_order: &MergeRowOrder,
output: &mut impl Write,
) -> io::Result<TermOrdinalMapping> {
let mut sstable_builder = sstable::VoidSSTable::writer(output);
let term_ord_mapping = match merge_row_order {
MergeRowOrder::Stack(_) => merge_dict_and_compute_term_ord_mapping(
bytes_columns,
|_| true,
|term_bytes| sstable_builder.insert(term_bytes, &()),
)?,
MergeRowOrder::Shuffled(shuffle_merge_order) => {
assert_eq!(shuffle_merge_order.alive_bitsets.len(), bytes_columns.len());
let mut term_bitsets: Vec<Option<BitSet>> = Vec::with_capacity(bytes_columns.len());
@@ -174,21 +205,14 @@ fn serialize_merged_dict(
}
}
}
let mut current_term_ord = 0;
while merged_terms.advance() {
let term_bytes: &[u8] = merged_terms.key();
if !is_term_present(&term_bitsets[..], &merged_terms) {
continue;
}
sstable_builder.insert(term_bytes, &())?;
for (segment_ord, from_term_ord) in merged_terms.matching_segments() {
term_ord_mapping.register_from_to(segment_ord, from_term_ord, current_term_ord);
}
current_term_ord += 1;
}
sstable_builder.finish()?;
merge_dict_and_compute_term_ord_mapping(
bytes_columns,
|merged_terms| is_term_present(&term_bitsets[..], merged_terms),
|term_bytes| sstable_builder.insert(term_bytes, &()),
)?
}
}
};
sstable_builder.finish()?;
Ok(term_ord_mapping)
}
@@ -211,4 +235,8 @@ impl TermOrdinalMapping {
fn get_segment(&self, segment_ord: u32) -> &[TermOrdinal] {
&self.per_segment_new_term_ordinals[segment_ord as usize]
}
fn into_per_segment_new_term_ordinals(self) -> Vec<Vec<TermOrdinal>> {
self.per_segment_new_term_ordinals
}
}

View File

@@ -7,6 +7,7 @@ use std::io;
use std::net::Ipv6Addr;
use std::sync::Arc;
pub use merge_dict_column::compute_merged_term_ord_mapping;
pub use merge_mapping::{MergeRowOrder, ShuffleMergeOrder, StackMergeOrder};
use super::writer::ColumnarSerializer;

View File

@@ -17,7 +17,7 @@ fn make_columnar<T: Into<NumericalValue> + HasAssociatedColumnType + Copy>(
}
let mut buffer: Vec<u8> = Vec::new();
dataframe_writer
.serialize(vals.len() as RowId, &mut buffer)
.serialize(vals.len() as RowId, None, &mut buffer)
.unwrap();
ColumnarReader::open(buffer).unwrap()
}
@@ -143,7 +143,9 @@ fn make_numerical_columnar_multiple_columns(
.max()
.unwrap_or(0u32);
let mut buffer: Vec<u8> = Vec::new();
dataframe_writer.serialize(num_rows, &mut buffer).unwrap();
dataframe_writer
.serialize(num_rows, None, &mut buffer)
.unwrap();
ColumnarReader::open(buffer).unwrap()
}
@@ -166,7 +168,9 @@ fn make_byte_columnar_multiple_columns(
}
}
let mut buffer: Vec<u8> = Vec::new();
dataframe_writer.serialize(num_rows, &mut buffer).unwrap();
dataframe_writer
.serialize(num_rows, None, &mut buffer)
.unwrap();
ColumnarReader::open(buffer).unwrap()
}
@@ -185,7 +189,9 @@ fn make_text_columnar_multiple_columns(columns: &[(&str, &[&[&str]])]) -> Column
.max()
.unwrap_or(0u32);
let mut buffer: Vec<u8> = Vec::new();
dataframe_writer.serialize(num_rows, &mut buffer).unwrap();
dataframe_writer
.serialize(num_rows, None, &mut buffer)
.unwrap();
ColumnarReader::open(buffer).unwrap()
}
@@ -544,7 +550,7 @@ fn build_columnar(spec: &ColumnarSpec) -> ColumnarReader {
}
let mut buffer = Vec::new();
writer.serialize(max_row_id + 1, &mut buffer).unwrap();
writer.serialize(max_row_id + 1, None, &mut buffer).unwrap();
ColumnarReader::open(buffer).unwrap()
}

View File

@@ -8,6 +8,9 @@ pub use column_type::{ColumnType, HasAssociatedColumnType};
pub use format_version::{CURRENT_VERSION, Version};
#[cfg(test)]
pub(crate) use merge::ColumnTypeCategory;
pub use merge::{MergeRowOrder, ShuffleMergeOrder, StackMergeOrder, merge_columnar};
pub use merge::{
MergeRowOrder, ShuffleMergeOrder, StackMergeOrder, compute_merged_term_ord_mapping,
merge_columnar,
};
pub use reader::ColumnarReader;
pub use writer::ColumnarWriter;

View File

@@ -226,7 +226,7 @@ mod tests {
columnar_writer.record_column_type("col1", ColumnType::Str, false);
columnar_writer.record_column_type("col2", ColumnType::U64, false);
let mut buffer = Vec::new();
columnar_writer.serialize(1, &mut buffer).unwrap();
columnar_writer.serialize(1, None, &mut buffer).unwrap();
let columnar = ColumnarReader::open(buffer).unwrap();
let columns = columnar.list_columns().unwrap();
assert_eq!(columns.len(), 2);
@@ -242,7 +242,7 @@ mod tests {
columnar_writer.record_column_type("count", ColumnType::U64, false);
columnar_writer.record_numerical(1, "count", 1u64);
let mut buffer = Vec::new();
columnar_writer.serialize(2, &mut buffer).unwrap();
columnar_writer.serialize(2, None, &mut buffer).unwrap();
let columnar = ColumnarReader::open(buffer).unwrap();
let columns = columnar.list_columns().unwrap();
assert_eq!(columns.len(), 1);
@@ -256,7 +256,7 @@ mod tests {
columnar_writer.record_column_type("col", ColumnType::U64, false);
columnar_writer.record_numerical(1, "col", 1u64);
let mut buffer = Vec::new();
columnar_writer.serialize(2, &mut buffer).unwrap();
columnar_writer.serialize(2, None, &mut buffer).unwrap();
let columnar = ColumnarReader::open(buffer).unwrap();
{
let columns = columnar.read_columns("col").unwrap();
@@ -285,7 +285,7 @@ mod tests {
columnar_writer.record_str(1, "col1", "hello");
columnar_writer.record_str(0, "col2", "hello");
let mut buffer = Vec::new();
columnar_writer.serialize(2, &mut buffer).unwrap();
columnar_writer.serialize(2, None, &mut buffer).unwrap();
let columnar = ColumnarReader::open(buffer).unwrap();
{

View File

@@ -41,10 +41,31 @@ impl ColumnWriter {
pub(super) fn operation_iterator<'a, V: SymbolValue>(
&self,
arena: &MemoryArena,
old_to_new_ids_opt: Option<&[RowId]>,
buffer: &'a mut Vec<u8>,
) -> impl Iterator<Item = ColumnOperation<V>> + 'a + use<'a, V> {
buffer.clear();
self.values.read_to_end(arena, buffer);
if let Some(old_to_new_ids) = old_to_new_ids_opt {
// TODO avoid the extra deserialization / serialization.
let mut sorted_ops: Vec<(RowId, ColumnOperation<V>)> = Vec::new();
let mut new_doc = 0u32;
let mut cursor = &buffer[..];
for op in std::iter::from_fn(|| ColumnOperation::<V>::deserialize(&mut cursor)) {
if let ColumnOperation::NewDoc(doc) = &op {
new_doc = old_to_new_ids[*doc as usize];
sorted_ops.push((new_doc, ColumnOperation::NewDoc(new_doc)));
} else {
sorted_ops.push((new_doc, op));
}
}
// stable sort is crucial here.
sorted_ops.sort_by_key(|(new_doc_id, _)| *new_doc_id);
buffer.clear();
for (_, op) in sorted_ops {
buffer.extend_from_slice(op.serialize().as_ref());
}
}
let mut cursor: &[u8] = &buffer[..];
std::iter::from_fn(move || ColumnOperation::deserialize(&mut cursor))
}
@@ -211,9 +232,11 @@ impl NumericalColumnWriter {
pub(super) fn operation_iterator<'a>(
self,
arena: &MemoryArena,
old_to_new_ids: Option<&[RowId]>,
buffer: &'a mut Vec<u8>,
) -> impl Iterator<Item = ColumnOperation<NumericalValue>> + 'a + use<'a> {
self.column_writer.operation_iterator(arena, buffer)
self.column_writer
.operation_iterator(arena, old_to_new_ids, buffer)
}
}
@@ -255,9 +278,11 @@ impl StrOrBytesColumnWriter {
pub(super) fn operation_iterator<'a>(
&self,
arena: &MemoryArena,
old_to_new_ids: Option<&[RowId]>,
byte_buffer: &'a mut Vec<u8>,
) -> impl Iterator<Item = ColumnOperation<UnorderedId>> + 'a + use<'a> {
self.column_writer.operation_iterator(arena, byte_buffer)
self.column_writer
.operation_iterator(arena, old_to_new_ids, byte_buffer)
}
}

View File

@@ -44,7 +44,7 @@ struct SpareBuffers {
/// columnar_writer.record_str(1u32 /* doc id */, "product_name", "Apple");
/// columnar_writer.record_numerical(0u32 /* doc id */, "price", 10.5f64); //< uh oh we ended up mixing integer and floats.
/// let mut wrt: Vec<u8> = Vec::new();
/// columnar_writer.serialize(2u32, &mut wrt).unwrap();
/// columnar_writer.serialize(2u32, None, &mut wrt).unwrap();
/// ```
#[derive(Default)]
pub struct ColumnarWriter {
@@ -76,6 +76,75 @@ impl ColumnarWriter {
.sum::<usize>()
}
/// Returns the list of doc ids from 0..num_docs sorted by the `sort_field`
/// column.
///
/// If the column is multivalued, use the first value for scoring.
/// If no value is associated to a specific row, the document is assigned
/// the lowest possible score.
///
/// The sort applied is stable.
pub fn sort_order(&self, sort_field: &str, num_docs: RowId, reversed: bool) -> Vec<u32> {
let Some(numerical_col_writer) = self
.numerical_field_hash_map
.get::<NumericalColumnWriter>(sort_field.as_bytes())
.or_else(|| {
self.datetime_field_hash_map
.get::<NumericalColumnWriter>(sort_field.as_bytes())
})
else {
let str_or_bytes_column_opt = self
.str_field_hash_map
.get::<StrOrBytesColumnWriter>(sort_field.as_bytes())
.or_else(|| {
self.bytes_field_hash_map
.get::<StrOrBytesColumnWriter>(sort_field.as_bytes())
});
let Some(str_or_bytes_column) = str_or_bytes_column_opt else {
return Vec::new();
};
let dictionary_builder = &self.dictionaries[str_or_bytes_column.dictionary_id as usize];
let term_id_mapping = dictionary_builder.build_term_id_mapping(&self.arena);
let mut symbols_buffer = Vec::new();
return collect_sort_order_from_ops(
str_or_bytes_column.operation_iterator(&self.arena, None, &mut symbols_buffer),
num_docs,
reversed,
|uid| Some(term_id_mapping.to_ord(uid).0),
None,
|a, b| a.cmp(b),
);
};
let mut symbols_buffer = Vec::new();
collect_sort_order_from_ops(
numerical_col_writer.operation_iterator(&self.arena, None, &mut symbols_buffer),
num_docs,
reversed,
// MonotonicallyMappableToU64 converts each value to u64 in an
// order-preserving way (u64: identity, i64: XOR sign bit, f64: bit
// manipulation). Converting once per document lets the comparator be
// a simple u64 cmp instead of unwrapping the NumericalValue variant
// on every comparison.
//
// For f64, NaN maps to a deterministic u64 via raw bit manipulation,
// so it sorts to a consistent position. Sorting only requires total
// ordering, not IEEE 754 equality semantics where NaN != NaN.
|nv| {
Some(match nv {
NumericalValue::U64(v) => v.to_u64(),
NumericalValue::I64(v) => v.to_u64(),
NumericalValue::F64(v) => v.to_u64(),
})
},
// None for missing values. Option<u64> sorts None < Some(_),
// placing nulls before non-null values.
None,
|a, b| a.cmp(b),
)
}
/// Records a column type. This is useful to bypass the coercion process,
/// makes sure the empty is present in the resulting columnar, or set
/// the `sort_values_within_row`.
@@ -246,7 +315,12 @@ impl ColumnarWriter {
},
);
}
pub fn serialize(&mut self, num_docs: RowId, wrt: &mut dyn io::Write) -> io::Result<()> {
pub fn serialize(
&mut self,
num_docs: RowId,
old_to_new_row_ids: Option<&[RowId]>,
wrt: &mut dyn io::Write,
) -> io::Result<()> {
let mut serializer = ColumnarSerializer::new(wrt);
let mut columns: Vec<(&[u8], ColumnType, Addr)> = self
@@ -303,7 +377,11 @@ impl ColumnarWriter {
serialize_bool_column(
cardinality,
num_docs,
column_writer.operation_iterator(arena, &mut symbol_byte_buffer),
column_writer.operation_iterator(
arena,
old_to_new_row_ids,
&mut symbol_byte_buffer,
),
buffers,
&mut column_serializer,
)?;
@@ -317,7 +395,11 @@ impl ColumnarWriter {
serialize_ip_addr_column(
cardinality,
num_docs,
column_writer.operation_iterator(arena, &mut symbol_byte_buffer),
column_writer.operation_iterator(
arena,
old_to_new_row_ids,
&mut symbol_byte_buffer,
),
buffers,
&mut column_serializer,
)?;
@@ -342,8 +424,11 @@ impl ColumnarWriter {
num_docs,
str_or_bytes_column_writer.sort_values_within_row,
dictionary_builder,
str_or_bytes_column_writer
.operation_iterator(arena, &mut symbol_byte_buffer),
str_or_bytes_column_writer.operation_iterator(
arena,
old_to_new_row_ids,
&mut symbol_byte_buffer,
),
buffers,
&self.arena,
&mut column_serializer,
@@ -361,7 +446,11 @@ impl ColumnarWriter {
cardinality,
num_docs,
numerical_type,
numerical_column_writer.operation_iterator(arena, &mut symbol_byte_buffer),
numerical_column_writer.operation_iterator(
arena,
old_to_new_row_ids,
&mut symbol_byte_buffer,
),
buffers,
&mut column_serializer,
)?;
@@ -376,7 +465,11 @@ impl ColumnarWriter {
cardinality,
num_docs,
NumericalType::I64,
column_writer.operation_iterator(arena, &mut symbol_byte_buffer),
column_writer.operation_iterator(
arena,
old_to_new_row_ids,
&mut symbol_byte_buffer,
),
buffers,
&mut column_serializer,
)?;
@@ -389,6 +482,56 @@ impl ColumnarWriter {
}
}
/// Shared sorting pattern for both numeric and Str/Bytes sort fields.
///
/// Iterates column operations, fills gaps for missing docs with `default_key`, converts each value
/// to a sort key via `value_to_key`, then sorts by the key using `cmp_keys`. Returns the doc ids
/// in sorted order.
fn collect_sort_order_from_ops<V, K: Clone>(
ops: impl Iterator<Item = ColumnOperation<V>>,
num_docs: RowId,
reversed: bool,
value_to_key: impl Fn(V) -> K,
default_key: K,
cmp_keys: impl Fn(&K, &K) -> std::cmp::Ordering,
) -> Vec<u32> {
let mut doc_sort_keys: Vec<(K, RowId)> = Vec::with_capacity(num_docs as usize);
let mut start_doc_check_fill: RowId = 0;
let mut current_doc_opt: Option<RowId> = None;
for op in ops {
match op {
ColumnOperation::NewDoc(doc) => {
current_doc_opt = Some(doc);
}
ColumnOperation::Value(val) => {
if let Some(current_doc) = current_doc_opt {
// Fill gaps since the last doc with the default key.
doc_sort_keys.extend(
(start_doc_check_fill..current_doc).map(|doc| (default_key.clone(), doc)),
);
start_doc_check_fill = current_doc + 1;
// For multivalued fields, only the first value is used.
current_doc_opt = None;
doc_sort_keys.push((value_to_key(val), current_doc));
}
}
}
}
// Fill remaining docs at the tail.
doc_sort_keys.extend((start_doc_check_fill..num_docs).map(|doc| (default_key.clone(), doc)));
doc_sort_keys.sort_by(|(left_key, _), (right_key, _)| {
let cmp = cmp_keys(left_key, right_key);
if reversed { cmp.reverse() } else { cmp }
});
doc_sort_keys
.into_iter()
.map(|(_sort_key, doc)| doc)
.collect()
}
// Serialize [Dictionary, Column, dictionary num bytes U32::LE]
// Column: [Column Index, Column Values, column index num bytes U32::LE]
#[expect(clippy::too_many_arguments)]
@@ -689,7 +832,7 @@ mod tests {
assert_eq!(column_writer.get_cardinality(3), Cardinality::Full);
let mut buffer = Vec::new();
let symbols: Vec<ColumnOperation<NumericalValue>> = column_writer
.operation_iterator(&arena, &mut buffer)
.operation_iterator(&arena, None, &mut buffer)
.collect();
assert_eq!(symbols.len(), 6);
assert!(matches!(symbols[0], ColumnOperation::NewDoc(0u32)));
@@ -718,7 +861,7 @@ mod tests {
assert_eq!(column_writer.get_cardinality(3), Cardinality::Optional);
let mut buffer = Vec::new();
let symbols: Vec<ColumnOperation<NumericalValue>> = column_writer
.operation_iterator(&arena, &mut buffer)
.operation_iterator(&arena, None, &mut buffer)
.collect();
assert_eq!(symbols.len(), 4);
assert!(matches!(symbols[0], ColumnOperation::NewDoc(1u32)));
@@ -741,7 +884,7 @@ mod tests {
assert_eq!(column_writer.get_cardinality(2), Cardinality::Optional);
let mut buffer = Vec::new();
let symbols: Vec<ColumnOperation<NumericalValue>> = column_writer
.operation_iterator(&arena, &mut buffer)
.operation_iterator(&arena, None, &mut buffer)
.collect();
assert_eq!(symbols.len(), 2);
assert!(matches!(symbols[0], ColumnOperation::NewDoc(0u32)));
@@ -760,7 +903,7 @@ mod tests {
assert_eq!(column_writer.get_cardinality(1), Cardinality::Multivalued);
let mut buffer = Vec::new();
let symbols: Vec<ColumnOperation<NumericalValue>> = column_writer
.operation_iterator(&arena, &mut buffer)
.operation_iterator(&arena, None, &mut buffer)
.collect();
assert_eq!(symbols.len(), 3);
assert!(matches!(symbols[0], ColumnOperation::NewDoc(0u32)));

View File

@@ -27,7 +27,7 @@ fn generate_columnar(num_docs: u32, value_offset: u64) -> Vec<u8> {
}
let mut wrt: Vec<u8> = Vec::new();
columnar_writer.serialize(num_docs, &mut wrt).unwrap();
columnar_writer.serialize(num_docs, None, &mut wrt).unwrap();
wrt
}

View File

@@ -51,6 +51,16 @@ impl DictionaryBuilder {
UnorderedId(unordered_id)
}
fn build_sorted_terms<'a>(&'a self, arena: &'a MemoryArena) -> Vec<(&'a [u8], UnorderedId)> {
let mut terms: Vec<(&[u8], UnorderedId)> = self
.dict
.iter(arena)
.map(|(k, v)| (k, arena.read(v)))
.collect();
terms.sort_unstable_by_key(|(key, _)| *key);
terms
}
/// Serialize the dictionary into an fst, and returns the
/// `UnorderedId -> TermOrdinal` map.
pub fn serialize<'a, W: io::Write + 'a>(
@@ -58,12 +68,7 @@ impl DictionaryBuilder {
arena: &MemoryArena,
wrt: &mut W,
) -> io::Result<TermIdMapping> {
let mut terms: Vec<(&[u8], UnorderedId)> = self
.dict
.iter(arena)
.map(|(k, v)| (k, arena.read(v)))
.collect();
terms.sort_unstable_by_key(|(key, _)| *key);
let terms = self.build_sorted_terms(arena);
// TODO Remove the allocation.
let mut unordered_to_ord: Vec<OrderedId> = vec![OrderedId(0u32); terms.len()];
let mut sstable_builder = sstable::VoidSSTable::writer(wrt);
@@ -76,6 +81,16 @@ impl DictionaryBuilder {
Ok(TermIdMapping { unordered_to_ord })
}
/// Build the `UnorderedId -> OrderedId` mapping in memory without serializing.
pub fn build_term_id_mapping(&self, arena: &MemoryArena) -> TermIdMapping {
let terms = self.build_sorted_terms(arena);
let mut unordered_to_ord: Vec<OrderedId> = vec![OrderedId(0u32); terms.len()];
for (ord, (_key, unordered_id)) in terms.into_iter().enumerate() {
unordered_to_ord[unordered_id.0 as usize] = OrderedId(ord as u32);
}
TermIdMapping { unordered_to_ord }
}
pub(crate) fn mem_usage(&self) -> usize {
self.dict.mem_usage()
}

View File

@@ -43,7 +43,8 @@ pub use column_values::{
};
pub use columnar::{
CURRENT_VERSION, ColumnType, ColumnarReader, ColumnarWriter, HasAssociatedColumnType,
MergeRowOrder, ShuffleMergeOrder, StackMergeOrder, Version, merge_columnar,
MergeRowOrder, ShuffleMergeOrder, StackMergeOrder, Version, compute_merged_term_ord_mapping,
merge_columnar,
};
use sstable::VoidSSTable;
pub use value::{NumericalType, NumericalValue};

View File

@@ -21,7 +21,7 @@ fn test_dataframe_writer_str() {
dataframe_writer.record_str(1u32, "my_string", "hello");
dataframe_writer.record_str(3u32, "my_string", "helloeee");
let mut buffer: Vec<u8> = Vec::new();
dataframe_writer.serialize(5, &mut buffer).unwrap();
dataframe_writer.serialize(5, None, &mut buffer).unwrap();
let columnar = ColumnarReader::open(buffer).unwrap();
assert_eq!(columnar.num_columns(), 1);
let cols: Vec<DynamicColumnHandle> = columnar.read_columns("my_string").unwrap();
@@ -35,7 +35,7 @@ fn test_dataframe_writer_bytes() {
dataframe_writer.record_bytes(1u32, "my_string", b"hello");
dataframe_writer.record_bytes(3u32, "my_string", b"helloeee");
let mut buffer: Vec<u8> = Vec::new();
dataframe_writer.serialize(5, &mut buffer).unwrap();
dataframe_writer.serialize(5, None, &mut buffer).unwrap();
let columnar = ColumnarReader::open(buffer).unwrap();
assert_eq!(columnar.num_columns(), 1);
let cols: Vec<DynamicColumnHandle> = columnar.read_columns("my_string").unwrap();
@@ -49,7 +49,7 @@ fn test_dataframe_writer_bool() {
dataframe_writer.record_bool(1u32, "bool.value", false);
dataframe_writer.record_bool(3u32, "bool.value", true);
let mut buffer: Vec<u8> = Vec::new();
dataframe_writer.serialize(5, &mut buffer).unwrap();
dataframe_writer.serialize(5, None, &mut buffer).unwrap();
let columnar = ColumnarReader::open(buffer).unwrap();
assert_eq!(columnar.num_columns(), 1);
let cols: Vec<DynamicColumnHandle> = columnar.read_columns("bool.value").unwrap();
@@ -74,7 +74,7 @@ fn test_dataframe_writer_u64_multivalued() {
dataframe_writer.record_numerical(6u32, "divisor", 2u64);
dataframe_writer.record_numerical(6u32, "divisor", 3u64);
let mut buffer: Vec<u8> = Vec::new();
dataframe_writer.serialize(7, &mut buffer).unwrap();
dataframe_writer.serialize(7, None, &mut buffer).unwrap();
let columnar = ColumnarReader::open(buffer).unwrap();
assert_eq!(columnar.num_columns(), 1);
let cols: Vec<DynamicColumnHandle> = columnar.read_columns("divisor").unwrap();
@@ -97,7 +97,7 @@ fn test_dataframe_writer_ip_addr() {
dataframe_writer.record_ip_addr(1, "ip_addr", Ipv6Addr::from_u128(1001));
dataframe_writer.record_ip_addr(3, "ip_addr", Ipv6Addr::from_u128(1050));
let mut buffer: Vec<u8> = Vec::new();
dataframe_writer.serialize(5, &mut buffer).unwrap();
dataframe_writer.serialize(5, None, &mut buffer).unwrap();
let columnar = ColumnarReader::open(buffer).unwrap();
assert_eq!(columnar.num_columns(), 1);
let cols: Vec<DynamicColumnHandle> = columnar.read_columns("ip_addr").unwrap();
@@ -128,7 +128,7 @@ fn test_dataframe_writer_numerical() {
dataframe_writer.record_numerical(2u32, "srical.value", NumericalValue::U64(13u64));
dataframe_writer.record_numerical(4u32, "srical.value", NumericalValue::U64(15u64));
let mut buffer: Vec<u8> = Vec::new();
dataframe_writer.serialize(6, &mut buffer).unwrap();
dataframe_writer.serialize(6, None, &mut buffer).unwrap();
let columnar = ColumnarReader::open(buffer).unwrap();
assert_eq!(columnar.num_columns(), 1);
let cols: Vec<DynamicColumnHandle> = columnar.read_columns("srical.value").unwrap();
@@ -153,6 +153,46 @@ fn test_dataframe_writer_numerical() {
assert_eq!(column_i64.first(6), None); //< we can change the spec for that one.
}
#[test]
fn test_dataframe_sort_by_full() {
let mut dataframe_writer = ColumnarWriter::default();
dataframe_writer.record_numerical(0u32, "value", NumericalValue::U64(1));
dataframe_writer.record_numerical(1u32, "value", NumericalValue::U64(2));
let data = dataframe_writer.sort_order("value", 2, false);
assert_eq!(data, vec![0, 1]);
}
#[test]
fn test_dataframe_sort_by_opt() {
let mut dataframe_writer = ColumnarWriter::default();
dataframe_writer.record_numerical(1u32, "value", NumericalValue::U64(3));
dataframe_writer.record_numerical(3u32, "value", NumericalValue::U64(2));
let data = dataframe_writer.sort_order("value", 5, false);
// 0, 2, 4 is 0.0
assert_eq!(data, vec![0, 2, 4, 3, 1]);
let data = dataframe_writer.sort_order("value", 5, true);
assert_eq!(
data,
vec![4, 2, 0, 3, 1].into_iter().rev().collect::<Vec<_>>()
);
}
#[test]
fn test_dataframe_sort_by_multi() {
let mut dataframe_writer = ColumnarWriter::default();
// valid for sort
dataframe_writer.record_numerical(1u32, "value", NumericalValue::U64(2));
// those are ignored for sort
dataframe_writer.record_numerical(1u32, "value", NumericalValue::U64(4));
dataframe_writer.record_numerical(1u32, "value", NumericalValue::U64(4));
// valid for sort
dataframe_writer.record_numerical(3u32, "value", NumericalValue::U64(3));
// ignored, would change sort order
dataframe_writer.record_numerical(3u32, "value", NumericalValue::U64(1));
let data = dataframe_writer.sort_order("value", 4, false);
assert_eq!(data, vec![0, 2, 1, 3]);
}
#[test]
fn test_dictionary_encoded_str() {
let mut buffer = Vec::new();
@@ -161,7 +201,7 @@ fn test_dictionary_encoded_str() {
columnar_writer.record_str(3, "my.column", "c");
columnar_writer.record_str(3, "my.column2", "different_column!");
columnar_writer.record_str(4, "my.column", "b");
columnar_writer.serialize(5, &mut buffer).unwrap();
columnar_writer.serialize(5, None, &mut buffer).unwrap();
let columnar_reader = ColumnarReader::open(buffer).unwrap();
assert_eq!(columnar_reader.num_columns(), 2);
let col_handles = columnar_reader.read_columns("my.column").unwrap();
@@ -195,7 +235,7 @@ fn test_dictionary_encoded_bytes() {
columnar_writer.record_bytes(3, "my.column", b"c");
columnar_writer.record_bytes(3, "my.column2", b"different_column!");
columnar_writer.record_bytes(4, "my.column", b"b");
columnar_writer.serialize(5, &mut buffer).unwrap();
columnar_writer.serialize(5, None, &mut buffer).unwrap();
let columnar_reader = ColumnarReader::open(buffer).unwrap();
assert_eq!(columnar_reader.num_columns(), 2);
let col_handles = columnar_reader.read_columns("my.column").unwrap();
@@ -232,6 +272,93 @@ fn test_dictionary_encoded_bytes() {
assert_eq!(term_buffer, b"b");
}
#[test]
fn test_sort_order_str_asc_desc() {
let mut dataframe_writer = ColumnarWriter::default();
dataframe_writer.record_str(0, "s", "z");
dataframe_writer.record_str(2, "s", "a");
dataframe_writer.record_str(3, "s", "m");
let asc = dataframe_writer.sort_order("s", 4, false);
assert_eq!(asc, vec![1, 2, 3, 0]); // None, a, m, z
let desc = dataframe_writer.sort_order("s", 4, true);
assert_eq!(desc, vec![0, 3, 2, 1]); // z, m, a, None
}
#[test]
fn test_sort_order_str_empty_vs_missing() {
let mut dataframe_writer = ColumnarWriter::default();
dataframe_writer.record_str(0, "s", "");
let asc = dataframe_writer.sort_order("s", 2, false);
assert_eq!(asc, vec![1, 0]); // None first, then empty string
}
#[test]
fn test_sort_order_str_multivalued_stable() {
let mut dataframe_writer = ColumnarWriter::default();
dataframe_writer.record_str(0, "s", "z");
dataframe_writer.record_str(0, "s", "a"); // multivalued; first value wins
dataframe_writer.record_str(1, "s", "b");
dataframe_writer.record_str(2, "s", "b");
let asc = dataframe_writer.sort_order("s", 3, false);
assert_eq!(asc, vec![1, 2, 0]); // b, b (stable), z
}
#[test]
fn test_sort_order_bytes_asc() {
let mut dataframe_writer = ColumnarWriter::default();
dataframe_writer.record_bytes(1, "b", &[0x01]);
dataframe_writer.record_bytes(3, "b", &[0x00]);
let asc = dataframe_writer.sort_order("b", 4, false);
assert_eq!(asc, vec![0, 2, 3, 1]); // None, None, 0x00, 0x01
}
#[test]
fn test_sort_order_numeric_u64_above_2_24() {
let mut dataframe_writer = ColumnarWriter::default();
dataframe_writer.record_numerical(0, "n", 16_777_217u64);
dataframe_writer.record_numerical(1, "n", 16_777_216u64);
let asc = dataframe_writer.sort_order("n", 2, false);
assert_eq!(asc, vec![1, 0]); // 16,777,216 then 16,777,217
}
#[test]
fn test_sort_order_numeric_u64_above_2_53() {
let mut dataframe_writer = ColumnarWriter::default();
dataframe_writer.record_numerical(0, "n", 9_007_199_254_740_993u64);
dataframe_writer.record_numerical(1, "n", 9_007_199_254_740_992u64);
let asc = dataframe_writer.sort_order("n", 2, false);
assert_eq!(asc, vec![1, 0]); // smaller value first
}
#[test]
fn test_sort_order_numeric_null_vs_zero() {
let mut dataframe_writer = ColumnarWriter::default();
dataframe_writer.record_numerical(0, "n", 0u64);
let asc = dataframe_writer.sort_order("n", 2, false);
assert_eq!(asc, vec![1, 0]); // None first, then 0
}
#[test]
fn test_sort_order_datetime_close_timestamps() {
let mut dataframe_writer = ColumnarWriter::default();
// Two timestamps 1 nanosecond apart. As f32, both round to the same value.
let dt1 = DateTime::from_timestamp_nanos(1_700_000_000_000_000_001);
let dt2 = DateTime::from_timestamp_nanos(1_700_000_000_000_000_000);
dataframe_writer.record_datetime(0, "ts", dt1);
dataframe_writer.record_datetime(1, "ts", dt2);
let asc = dataframe_writer.sort_order("ts", 2, false);
assert_eq!(asc, vec![1, 0]); // smaller timestamp first
}
fn num_strategy() -> impl Strategy<Value = NumericalValue> {
prop_oneof![
3 => Just(NumericalValue::U64(0u64)),
@@ -329,12 +456,26 @@ fn columnar_docs_strategy() -> impl Strategy<Value = Vec<Vec<(&'static str, Colu
.prop_flat_map(|num_docs| proptest::collection::vec(doc_strategy(), num_docs))
}
fn columnar_docs_and_mapping_strategy()
-> impl Strategy<Value = (Vec<Vec<(&'static str, ColumnValue)>>, Vec<RowId>)> {
columnar_docs_strategy().prop_flat_map(|docs| {
permutation_strategy(docs.len()).prop_map(move |permutation| (docs.clone(), permutation))
})
}
fn permutation_strategy(n: usize) -> impl Strategy<Value = Vec<RowId>> {
Just((0u32..n as RowId).collect()).prop_shuffle()
}
fn permutation_and_subset_strategy(n: usize) -> impl Strategy<Value = Vec<usize>> {
let vals: Vec<usize> = (0..n).collect();
subsequence(vals, 0..=n).prop_shuffle()
}
fn build_columnar_with_mapping(docs: &[Vec<(&'static str, ColumnValue)>]) -> ColumnarReader {
fn build_columnar_with_mapping(
docs: &[Vec<(&'static str, ColumnValue)>],
old_to_new_row_ids_opt: Option<&[RowId]>,
) -> ColumnarReader {
let num_docs = docs.len() as u32;
let mut buffer = Vec::new();
let mut columnar_writer = ColumnarWriter::default();
@@ -362,13 +503,15 @@ fn build_columnar_with_mapping(docs: &[Vec<(&'static str, ColumnValue)>]) -> Col
}
}
}
columnar_writer.serialize(num_docs, &mut buffer).unwrap();
columnar_writer
.serialize(num_docs, old_to_new_row_ids_opt, &mut buffer)
.unwrap();
ColumnarReader::open(buffer).unwrap()
}
fn build_columnar(docs: &[Vec<(&'static str, ColumnValue)>]) -> ColumnarReader {
build_columnar_with_mapping(docs)
build_columnar_with_mapping(docs, None)
}
fn assert_columnar_eq_strict(left: &ColumnarReader, right: &ColumnarReader) {
@@ -628,6 +771,54 @@ proptest! {
}
}
// Same as `test_single_columnar_builder_proptest` but with a shuffling mapping.
proptest! {
#![proptest_config(ProptestConfig::with_cases(500))]
#[test]
fn test_single_columnar_builder_with_shuffle_proptest((docs, mapping) in columnar_docs_and_mapping_strategy()) {
let columnar = build_columnar_with_mapping(&docs[..], Some(&mapping));
assert_eq!(columnar.num_docs() as usize, docs.len());
let mut expected_columns: HashMap<(&str, ColumnTypeCategory), HashMap<u32, Vec<&ColumnValue>> > = Default::default();
for (doc_id, doc_vals) in docs.iter().enumerate() {
for (col_name, col_val) in doc_vals {
expected_columns
.entry((col_name, col_val.column_type_category()))
.or_default()
.entry(mapping[doc_id])
.or_default()
.push(col_val);
}
}
let column_list = columnar.list_columns().unwrap();
assert_eq!(expected_columns.len(), column_list.len());
for (column_name, column) in column_list {
let dynamic_column = column.open().unwrap();
let col_category: ColumnTypeCategory = dynamic_column.column_type().into();
let expected_col_values: &HashMap<u32, Vec<&ColumnValue>> = expected_columns.get(&(column_name.as_str(), col_category)).unwrap();
for _doc_id in 0..columnar.num_docs() {
match &dynamic_column {
DynamicColumn::Bool(col) =>
assert_column_values(col, expected_col_values),
DynamicColumn::I64(col) =>
assert_column_values(col, expected_col_values),
DynamicColumn::U64(col) =>
assert_column_values(col, expected_col_values),
DynamicColumn::F64(col) =>
assert_column_values(col, expected_col_values),
DynamicColumn::IpAddr(col) =>
assert_column_values(col, expected_col_values),
DynamicColumn::DateTime(col) =>
assert_column_values(col, expected_col_values),
DynamicColumn::Bytes(col) =>
assert_bytes_column_values(col, expected_col_values, false),
DynamicColumn::Str(col) =>
assert_bytes_column_values(col, expected_col_values, true),
}
}
}
}
}
// This tests create 2 or 3 random small columnar and attempts to merge them.
// It compares the resulting merged dataframe with what would have been obtained by building the
// dataframe from the concatenated rows to begin with.

View File

@@ -7,11 +7,6 @@
- [Other](#other)
- [Usage](#usage)
# Index Sorting has been removed!
More infos here:
https://github.com/quickwit-oss/tantivy/issues/2352
# Index Sorting
Tantivy allows you to sort the index according to a property.

View File

@@ -676,7 +676,7 @@ mod tests {
let num_segments = reader.searcher().segment_readers().len();
assert!(num_segments <= 4);
let num_components_except_deletes_and_tempstore =
crate::index::SegmentComponent::iterator().len() - 1;
crate::index::SegmentComponent::iterator().len() - 2;
let max_num_mmapped = num_components_except_deletes_and_tempstore * num_segments;
assert_eventually(|| {
let num_mmapped = mmap_directory.get_cache_info().mmapped.len();

View File

@@ -127,7 +127,7 @@ mod tests {
fast_field_writers
.add_document(&doc!(*FIELD=>2u64))
.unwrap();
fast_field_writers.serialize(&mut write).unwrap();
fast_field_writers.serialize(&mut write, None).unwrap();
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
@@ -178,7 +178,7 @@ mod tests {
fast_field_writers
.add_document(&doc!(*FIELD=>215u64))
.unwrap();
fast_field_writers.serialize(&mut write).unwrap();
fast_field_writers.serialize(&mut write, None).unwrap();
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
@@ -211,7 +211,7 @@ mod tests {
.add_document(&doc!(*FIELD=>100_000u64))
.unwrap();
}
fast_field_writers.serialize(&mut write).unwrap();
fast_field_writers.serialize(&mut write, None).unwrap();
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
@@ -243,7 +243,7 @@ mod tests {
.add_document(&doc!(*FIELD=>5_000_000_000_000_000_000u64 + doc_id))
.unwrap();
}
fast_field_writers.serialize(&mut write).unwrap();
fast_field_writers.serialize(&mut write, None).unwrap();
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
@@ -276,7 +276,7 @@ mod tests {
doc.add_i64(i64_field, i);
fast_field_writers.add_document(&doc).unwrap();
}
fast_field_writers.serialize(&mut write).unwrap();
fast_field_writers.serialize(&mut write, None).unwrap();
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
@@ -315,7 +315,7 @@ mod tests {
let mut fast_field_writers = FastFieldsWriter::from_schema(&schema).unwrap();
let doc = TantivyDocument::default();
fast_field_writers.add_document(&doc).unwrap();
fast_field_writers.serialize(&mut write).unwrap();
fast_field_writers.serialize(&mut write, None).unwrap();
write.terminate().unwrap();
}
@@ -348,7 +348,7 @@ mod tests {
let mut fast_field_writers = FastFieldsWriter::from_schema(&schema).unwrap();
let doc = TantivyDocument::default();
fast_field_writers.add_document(&doc).unwrap();
fast_field_writers.serialize(&mut write).unwrap();
fast_field_writers.serialize(&mut write, None).unwrap();
write.terminate().unwrap();
}
@@ -385,7 +385,7 @@ mod tests {
for &x in &permutation {
fast_field_writers.add_document(&doc!(*FIELD=>x)).unwrap();
}
fast_field_writers.serialize(&mut write).unwrap();
fast_field_writers.serialize(&mut write, None).unwrap();
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
@@ -770,7 +770,7 @@ mod tests {
fast_field_writers
.add_document(&doc!(field=>false))
.unwrap();
fast_field_writers.serialize(&mut write).unwrap();
fast_field_writers.serialize(&mut write, None).unwrap();
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
@@ -802,7 +802,7 @@ mod tests {
.add_document(&doc!(field=>false))
.unwrap();
}
fast_field_writers.serialize(&mut write).unwrap();
fast_field_writers.serialize(&mut write, None).unwrap();
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
@@ -827,7 +827,7 @@ mod tests {
let mut fast_field_writers = FastFieldsWriter::from_schema(&schema).unwrap();
let doc = TantivyDocument::default();
fast_field_writers.add_document(&doc).unwrap();
fast_field_writers.serialize(&mut write).unwrap();
fast_field_writers.serialize(&mut write, None).unwrap();
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
@@ -855,7 +855,7 @@ mod tests {
for doc in docs {
fast_field_writers.add_document(doc).unwrap();
}
fast_field_writers.serialize(&mut write).unwrap();
fast_field_writers.serialize(&mut write, None).unwrap();
write.terminate().unwrap();
}
Ok(directory)

View File

@@ -4,6 +4,7 @@ use columnar::{ColumnarWriter, NumericalValue};
use common::{DateTimePrecision, JsonPathWriter};
use tokenizer_api::Token;
use crate::indexer::doc_id_mapping::DocIdMapping;
use crate::schema::document::{Document, ReferenceValue, ReferenceValueLeaf, Value};
use crate::schema::{value_type_to_column_type, Field, FieldType, Schema, Type};
use crate::tokenizer::{TextAnalyzer, TokenizerManager};
@@ -105,6 +106,16 @@ impl FastFieldsWriter {
self.columnar_writer.mem_usage()
}
pub(crate) fn sort_order(
&self,
sort_field: &str,
num_docs: DocId,
reversed: bool,
) -> Vec<DocId> {
self.columnar_writer
.sort_order(sort_field, num_docs, reversed)
}
/// Indexes all of the fastfields of a new document.
pub fn add_document<D: Document>(&mut self, doc: &D) -> crate::Result<()> {
let doc_id = self.num_docs;
@@ -222,9 +233,16 @@ impl FastFieldsWriter {
/// Serializes all of the `FastFieldWriter`s by pushing them in
/// order to the fast field serializer.
pub fn serialize(mut self, wrt: &mut dyn io::Write) -> io::Result<()> {
pub fn serialize(
mut self,
wrt: &mut dyn io::Write,
doc_id_map_opt: Option<&DocIdMapping>,
) -> io::Result<()> {
let num_docs = self.num_docs;
self.columnar_writer.serialize(num_docs, wrt)?;
let old_to_new_row_ids =
doc_id_map_opt.map(|doc_id_mapping| doc_id_mapping.old_to_new_ids());
self.columnar_writer
.serialize(num_docs, old_to_new_row_ids, wrt)?;
Ok(())
}
}
@@ -374,7 +392,7 @@ mod tests {
}
let mut buffer = Vec::new();
columnar_writer
.serialize(json_docs.len() as DocId, &mut buffer)
.serialize(json_docs.len() as DocId, None, &mut buffer)
.unwrap();
ColumnarReader::open(buffer).unwrap()
}

View File

@@ -77,7 +77,7 @@ mod tests {
let mut fieldnorm_writers = FieldNormsWriter::for_schema(&SCHEMA);
fieldnorm_writers.record(2u32, *TXT_FIELD, 5);
fieldnorm_writers.record(3u32, *TXT_FIELD, 3);
fieldnorm_writers.serialize(serializer)?;
fieldnorm_writers.serialize(serializer, None)?;
}
let file = directory.open_read(path)?;
{

View File

@@ -2,6 +2,7 @@ use std::cmp::Ordering;
use std::{io, iter};
use super::{fieldnorm_to_id, FieldNormsSerializer};
use crate::indexer::doc_id_mapping::DocIdMapping;
use crate::schema::{Field, Schema};
use crate::DocId;
@@ -91,7 +92,11 @@ impl FieldNormsWriter {
}
/// Serialize the seen fieldnorm values to the serializer for all fields.
pub fn serialize(&self, mut fieldnorms_serializer: FieldNormsSerializer) -> io::Result<()> {
pub fn serialize(
&self,
mut fieldnorms_serializer: FieldNormsSerializer,
doc_id_map: Option<&DocIdMapping>,
) -> io::Result<()> {
for (field, fieldnorms_buffer) in self.fieldnorms_buffers.iter().enumerate().filter_map(
|(field_id, fieldnorms_buffer_opt)| {
fieldnorms_buffer_opt.as_ref().map(|fieldnorms_buffer| {
@@ -99,7 +104,12 @@ impl FieldNormsWriter {
})
},
) {
fieldnorms_serializer.serialize_field(field, fieldnorms_buffer)?;
if let Some(doc_id_map) = doc_id_map {
let remapped_fieldnorm_buffer = doc_id_map.remap(fieldnorms_buffer);
fieldnorms_serializer.serialize_field(field, &remapped_fieldnorm_buffer)?;
} else {
fieldnorms_serializer.serialize_field(field, fieldnorms_buffer)?;
}
}
fieldnorms_serializer.close()?;
Ok(())

View File

@@ -4,7 +4,8 @@ use rand::{rng, Rng};
use crate::indexer::index_writer::MEMORY_BUDGET_NUM_BYTES_MIN;
use crate::schema::*;
use crate::{doc, schema, Index, IndexWriter, Searcher};
#[allow(deprecated)]
use crate::{doc, schema, Index, IndexSettings, IndexSortByField, IndexWriter, Order, Searcher};
fn check_index_content(searcher: &Searcher, vals: &[u64]) -> crate::Result<()> {
assert!(searcher.segment_readers().len() < 20);
@@ -62,6 +63,71 @@ fn get_num_iterations() -> usize {
.map(|str| str.parse().unwrap())
.unwrap_or(2000)
}
#[test]
#[ignore]
fn test_functional_indexing_sorted() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
let id_field = schema_builder.add_u64_field("id", INDEXED | FAST);
let multiples_field = schema_builder.add_u64_field("multiples", INDEXED);
let text_field_options = TextOptions::default()
.set_indexing_options(
TextFieldIndexing::default()
.set_index_option(schema::IndexRecordOption::WithFreqsAndPositions),
)
.set_stored();
let text_field = schema_builder.add_text_field("text_field", text_field_options);
let schema = schema_builder.build();
let mut index_builder = Index::builder().schema(schema);
index_builder = index_builder.settings(IndexSettings {
sort_by_field: Some(IndexSortByField {
field: "id".to_string(),
order: Order::Desc,
}),
..Default::default()
});
let index = index_builder.create_from_tempdir().unwrap();
let reader = index.reader()?;
let mut rng = rng();
let mut index_writer: IndexWriter =
index.writer_with_num_threads(3, 3 * MEMORY_BUDGET_NUM_BYTES_MIN)?;
let mut committed_docs: HashSet<u64> = HashSet::new();
let mut uncommitted_docs: HashSet<u64> = HashSet::new();
for _ in 0..get_num_iterations() {
let random_val = rng.random_range(0..20);
if random_val == 0 {
index_writer.commit()?;
committed_docs.extend(&uncommitted_docs);
uncommitted_docs.clear();
reader.reload()?;
let searcher = reader.searcher();
// check that everything is correct.
check_index_content(
&searcher,
&committed_docs.iter().cloned().collect::<Vec<u64>>(),
)?;
} else if committed_docs.remove(&random_val) || uncommitted_docs.remove(&random_val) {
let doc_id_term = Term::from_field_u64(id_field, random_val);
index_writer.delete_term(doc_id_term);
} else {
uncommitted_docs.insert(random_val);
let mut doc = TantivyDocument::new();
doc.add_u64(id_field, random_val);
for i in 1u64..10u64 {
doc.add_u64(multiples_field, random_val * i);
}
doc.add_text(text_field, get_text());
index_writer.add_document(doc)?;
}
}
Ok(())
}
const LOREM: &str = "Doc Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod \
tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, \

View File

@@ -22,7 +22,7 @@ use crate::indexer::segment_updater::save_metas;
use crate::indexer::{IndexWriter, SingleSegmentIndexWriter};
use crate::reader::{IndexReader, IndexReaderBuilder};
use crate::schema::document::Document;
use crate::schema::{Field, FieldType, Schema};
use crate::schema::{Field, FieldType, Schema, Type};
use crate::tokenizer::{TextAnalyzer, TokenizerManager};
use crate::SegmentReader;
@@ -232,7 +232,38 @@ impl IndexBuilder {
}
fn validate(&self) -> crate::Result<()> {
if let Some(_schema) = self.schema.as_ref() {
if let Some(schema) = self.schema.as_ref() {
if let Some(sort_by_field) = self.index_settings.sort_by_field.as_ref() {
let schema_field = schema.get_field(&sort_by_field.field).map_err(|_| {
TantivyError::InvalidArgument(format!(
"Field to sort index {} not found in schema",
sort_by_field.field
))
})?;
let entry = schema.get_field_entry(schema_field);
if !entry.is_fast() {
return Err(TantivyError::InvalidArgument(format!(
"Field {} is no fast field. Field needs to be a single value fast field \
to be used to sort an index",
sort_by_field.field
)));
}
let supported_field_types = [
Type::I64,
Type::U64,
Type::F64,
Type::Date,
Type::Str,
Type::Bytes,
];
let field_type = entry.field_type().value_type();
if !supported_field_types.contains(&field_type) {
return Err(TantivyError::InvalidArgument(format!(
"Unsupported field type in sort_by_field: {field_type:?}. Supported field \
types: {supported_field_types:?} ",
)));
}
}
Ok(())
} else {
Err(TantivyError::InvalidArgument(

View File

@@ -1,6 +1,8 @@
use std::collections::HashSet;
use std::fmt;
use std::path::PathBuf;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use serde::{Deserialize, Serialize};
@@ -35,6 +37,7 @@ impl SegmentMetaInventory {
let inner = InnerSegmentMeta {
segment_id,
max_doc,
include_temp_doc_store: Arc::new(AtomicBool::new(true)),
deletes: None,
};
SegmentMeta::from(self.inventory.track(inner))
@@ -82,6 +85,15 @@ impl SegmentMeta {
self.tracked.segment_id
}
/// Removes the Component::TempStore from the alive list and
/// therefore marks the temp docstore file to be deleted by
/// the garbage collection.
pub fn untrack_temp_docstore(&self) {
self.tracked
.include_temp_doc_store
.store(false, std::sync::atomic::Ordering::Relaxed);
}
/// Returns the number of deleted documents.
pub fn num_deleted_docs(&self) -> u32 {
self.tracked
@@ -99,9 +111,20 @@ impl SegmentMeta {
/// is by removing all files that have been created by tantivy
/// and are not used by any segment anymore.
pub fn list_files(&self) -> HashSet<PathBuf> {
SegmentComponent::iterator()
.map(|component| self.relative_path(*component))
.collect::<HashSet<PathBuf>>()
if self
.tracked
.include_temp_doc_store
.load(std::sync::atomic::Ordering::Relaxed)
{
SegmentComponent::iterator()
.map(|component| self.relative_path(*component))
.collect::<HashSet<PathBuf>>()
} else {
SegmentComponent::iterator()
.filter(|comp| *comp != &SegmentComponent::TempStore)
.map(|component| self.relative_path(*component))
.collect::<HashSet<PathBuf>>()
}
}
/// Returns the relative path of a component of our segment.
@@ -115,6 +138,7 @@ impl SegmentMeta {
SegmentComponent::Positions => ".pos".to_string(),
SegmentComponent::Terms => ".term".to_string(),
SegmentComponent::Store => ".store".to_string(),
SegmentComponent::TempStore => ".store.temp".to_string(),
SegmentComponent::FastFields => ".fast".to_string(),
SegmentComponent::FieldNorms => ".fieldnorm".to_string(),
SegmentComponent::Delete => format!(".{}.del", self.delete_opstamp().unwrap_or(0)),
@@ -159,6 +183,7 @@ impl SegmentMeta {
segment_id: inner_meta.segment_id,
max_doc,
deletes: None,
include_temp_doc_store: Arc::new(AtomicBool::new(true)),
});
SegmentMeta { tracked }
}
@@ -177,6 +202,7 @@ impl SegmentMeta {
let tracked = self.tracked.map(move |inner_meta| InnerSegmentMeta {
segment_id: inner_meta.segment_id,
max_doc: inner_meta.max_doc,
include_temp_doc_store: Arc::new(AtomicBool::new(true)),
deletes: Some(delete_meta),
});
SegmentMeta { tracked }
@@ -188,6 +214,14 @@ struct InnerSegmentMeta {
segment_id: SegmentId,
max_doc: u32,
pub deletes: Option<DeleteMeta>,
/// If you want to avoid the SegmentComponent::TempStore file to be covered by
/// garbage collection and deleted, set this to true. This is used during merge.
#[serde(skip)]
#[serde(default = "default_temp_store")]
pub(crate) include_temp_doc_store: Arc<AtomicBool>,
}
fn default_temp_store() -> Arc<AtomicBool> {
Arc::new(AtomicBool::new(false))
}
impl InnerSegmentMeta {
@@ -212,6 +246,10 @@ fn is_true(val: &bool) -> bool {
/// index, like presort documents.
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct IndexSettings {
/// Sorts the documents by information
/// provided in `IndexSortByField`
#[serde(skip_serializing_if = "Option::is_none")]
pub sort_by_field: Option<IndexSortByField>,
/// The `Compressor` used to compress the doc store.
#[serde(default)]
pub docstore_compression: Compressor,
@@ -234,6 +272,7 @@ fn default_docstore_blocksize() -> usize {
impl Default for IndexSettings {
fn default() -> Self {
Self {
sort_by_field: None,
docstore_compression: Compressor::default(),
docstore_blocksize: default_docstore_blocksize(),
docstore_compress_dedicated_thread: true,
@@ -241,6 +280,18 @@ impl Default for IndexSettings {
}
}
/// Settings to presort the documents in an index
///
/// Presorting documents can greatly improve performance
/// in some scenarios, by applying top n
/// optimizations.
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct IndexSortByField {
/// The field to sort the documents by
pub field: String,
/// The order to sort the documents by
pub order: Order,
}
/// The order to sort by
#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub enum Order {
@@ -360,7 +411,7 @@ mod tests {
use crate::store::Compressor;
#[cfg(feature = "zstd-compression")]
use crate::store::ZstdCompressor;
use crate::IndexSettings;
use crate::{IndexSettings, IndexSortByField, Order};
#[test]
fn test_serialize_metas() {
@@ -372,6 +423,10 @@ mod tests {
let index_metas = IndexMeta {
index_settings: IndexSettings {
docstore_compression: Compressor::None,
sort_by_field: Some(IndexSortByField {
field: "text".to_string(),
order: Order::Asc,
}),
..Default::default()
},
segments: Vec::new(),
@@ -382,7 +437,7 @@ mod tests {
let json = serde_json::ser::to_string(&index_metas).expect("serialization failed");
assert_eq!(
json,
r#"{"index_settings":{"docstore_compression":"none","docstore_blocksize":16384},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0}"#
r#"{"index_settings":{"sort_by_field":{"field":"text","order":"Asc"},"docstore_compression":"none","docstore_blocksize":16384},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0}"#
);
let deser_meta: UntrackedIndexMeta = serde_json::from_str(&json).unwrap();
@@ -401,6 +456,10 @@ mod tests {
};
let index_metas = IndexMeta {
index_settings: IndexSettings {
sort_by_field: Some(IndexSortByField {
field: "text".to_string(),
order: Order::Asc,
}),
docstore_compression: crate::store::Compressor::Zstd(ZstdCompressor {
compression_level: Some(4),
}),
@@ -415,7 +474,7 @@ mod tests {
let json = serde_json::ser::to_string(&index_metas).expect("serialization failed");
assert_eq!(
json,
r#"{"index_settings":{"docstore_compression":"zstd(compression_level=4)","docstore_blocksize":1000000},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0}"#
r#"{"index_settings":{"sort_by_field":{"field":"text","order":"Asc"},"docstore_compression":"zstd(compression_level=4)","docstore_blocksize":1000000},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0}"#
);
let deser_meta: UntrackedIndexMeta = serde_json::from_str(&json).unwrap();
@@ -427,35 +486,35 @@ mod tests {
#[test]
#[cfg(all(feature = "lz4-compression", feature = "zstd-compression"))]
fn test_serialize_metas_invalid_comp() {
let json = r#"{"index_settings":{"docstore_compression":"zsstd","docstore_blocksize":1000000},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0}"#;
let json = r#"{"index_settings":{"sort_by_field":{"field":"text","order":"Asc"},"docstore_compression":"zsstd","docstore_blocksize":1000000},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0}"#;
let err = serde_json::from_str::<UntrackedIndexMeta>(json).unwrap_err();
assert_eq!(
err.to_string(),
"unknown variant `zsstd`, expected one of `none`, `lz4`, `zstd`, \
`zstd(compression_level=5)` at line 1 column 49"
`zstd(compression_level=5)` at line 1 column 96"
.to_string()
);
let json = r#"{"index_settings":{"docstore_compression":"zstd(bla=10)","docstore_blocksize":1000000},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0}"#;
let json = r#"{"index_settings":{"sort_by_field":{"field":"text","order":"Asc"},"docstore_compression":"zstd(bla=10)","docstore_blocksize":1000000},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0}"#;
let err = serde_json::from_str::<UntrackedIndexMeta>(json).unwrap_err();
assert_eq!(
err.to_string(),
"unknown zstd option \"bla\" at line 1 column 56".to_string()
"unknown zstd option \"bla\" at line 1 column 103".to_string()
);
}
#[test]
#[cfg(not(feature = "zstd-compression"))]
fn test_serialize_metas_unsupported_comp() {
let json = r#"{"index_settings":{"docstore_compression":"zstd","docstore_blocksize":1000000},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0}"#;
let json = r#"{"index_settings":{"sort_by_field":{"field":"text","order":"Asc"},"docstore_compression":"zstd","docstore_blocksize":1000000},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0}"#;
let err = serde_json::from_str::<UntrackedIndexMeta>(json).unwrap_err();
assert_eq!(
err.to_string(),
"unsupported variant `zstd`, please enable Tantivy's `zstd-compression` feature at \
line 1 column 48"
line 1 column 95"
.to_string()
);
}
@@ -469,6 +528,7 @@ mod tests {
assert_eq!(
index_settings,
IndexSettings {
sort_by_field: None,
docstore_compression: Compressor::default(),
docstore_compress_dedicated_thread: true,
docstore_blocksize: 16_384

View File

@@ -12,7 +12,7 @@ mod segment_reader;
pub use self::index::{Index, IndexBuilder};
pub(crate) use self::index_meta::SegmentMetaInventory;
pub use self::index_meta::{IndexMeta, IndexSettings, Order, SegmentMeta};
pub use self::index_meta::{IndexMeta, IndexSettings, IndexSortByField, Order, SegmentMeta};
pub use self::inverted_index_reader::InvertedIndexReader;
pub use self::segment::Segment;
pub use self::segment_component::SegmentComponent;

View File

@@ -23,6 +23,8 @@ pub enum SegmentComponent {
/// Accessing a document from the store is relatively slow, as it
/// requires to decompress the entire block it belongs to.
Store,
/// Temporary storage of the documents, before streamed to `Store`.
TempStore,
/// Bitset describing which document of the segment is alive.
/// (It was representing deleted docs but changed to represent alive docs from v0.17)
Delete,
@@ -31,13 +33,14 @@ pub enum SegmentComponent {
impl SegmentComponent {
/// Iterates through the components.
pub fn iterator() -> slice::Iter<'static, SegmentComponent> {
static SEGMENT_COMPONENTS: [SegmentComponent; 7] = [
static SEGMENT_COMPONENTS: [SegmentComponent; 8] = [
SegmentComponent::Postings,
SegmentComponent::Positions,
SegmentComponent::FastFields,
SegmentComponent::FieldNorms,
SegmentComponent::Terms,
SegmentComponent::Store,
SegmentComponent::TempStore,
SegmentComponent::Delete,
];
SEGMENT_COMPONENTS.iter()

View File

@@ -3,17 +3,28 @@
use common::ReadOnlyBitSet;
use crate::DocAddress;
use super::SegmentWriter;
use crate::schema::{Field, Schema};
use crate::{DocAddress, DocId, IndexSortByField, TantivyError};
/// Describes how the document ID mapping was produced during a merge.
#[derive(Copy, Clone, Eq, PartialEq)]
pub enum MappingType {
/// Segments are concatenated in order with no deletes; doc IDs are contiguous ranges.
Stacked,
/// Segments are concatenated in order but some documents have been deleted and are skipped.
StackedWithDeletes,
/// Documents have been reordered (e.g. sorted by a field or externally shuffled).
Shuffled,
}
/// Struct to provide mapping from new doc_id to old doc_id and segment.
///
/// Callers outside tantivy (e.g. pomsky's merge executor) can construct a
/// `Shuffled` mapping directly from a precomputed permutation and pass it
/// into [`IndexMerger::write_with_doc_id_mapping`].
#[derive(Clone)]
pub(crate) struct SegmentDocIdMapping {
pub struct SegmentDocIdMapping {
pub(crate) new_doc_id_to_old_doc_addr: Vec<DocAddress>,
pub(crate) alive_bitsets: Vec<Option<ReadOnlyBitSet>>,
mapping_type: MappingType,
@@ -32,6 +43,25 @@ impl SegmentDocIdMapping {
}
}
/// Build a `Shuffled` mapping from an explicit permutation of [`DocAddress`]es.
///
/// `new_doc_id_to_old_doc_addr[new_id]` gives the source segment and doc id for
/// the document that should appear at position `new_id` in the merged segment.
/// `alive_bitsets` must contain one entry per source segment (in the same order
/// as passed to [`IndexMerger::open_with_custom_alive_set`]), each `None` if that
/// segment has no deletes.
pub fn new_shuffled(
new_doc_id_to_old_doc_addr: Vec<DocAddress>,
alive_bitsets: Vec<Option<ReadOnlyBitSet>>,
) -> Self {
Self {
new_doc_id_to_old_doc_addr,
mapping_type: MappingType::Shuffled,
alive_bitsets,
}
}
/// Returns the [`MappingType`] that describes how this mapping was constructed.
pub fn mapping_type(&self) -> MappingType {
self.mapping_type
}
@@ -43,4 +73,559 @@ impl SegmentDocIdMapping {
pub(crate) fn iter_old_doc_addrs(&self) -> impl Iterator<Item = DocAddress> + '_ {
self.new_doc_id_to_old_doc_addr.iter().copied()
}
/// This flags means the segments are simply stacked in the order of their ordinal.
/// e.g. [(0, 1), .. (n, 1), (0, 2)..., (m, 2)]
///
/// The different segment may present some deletes, in which case it is expressed by skipping a
/// `DocId`. [(0, 1), (0, 3)] <--- here doc_id=0 and doc_id=1 have been deleted
///
/// Being trivial is equivalent to having the `new_doc_id_to_old_doc_addr` array sorted.
///
/// This allows for some optimization.
pub(crate) fn is_trivial(&self) -> bool {
match self.mapping_type {
MappingType::Stacked | MappingType::StackedWithDeletes => true,
MappingType::Shuffled => false,
}
}
}
/// Bidirectional mapping between old and new doc IDs within a single segment.
pub struct DocIdMapping {
new_doc_id_to_old: Vec<DocId>,
old_doc_id_to_new: Vec<DocId>,
}
impl DocIdMapping {
/// Constructs a [`DocIdMapping`] from a vector mapping each new doc ID to its old doc ID.
pub fn from_new_id_to_old_id(new_doc_id_to_old: Vec<DocId>) -> Self {
let max_doc = new_doc_id_to_old.len();
let old_max_doc = new_doc_id_to_old
.iter()
.cloned()
.max()
.map(|n| n + 1)
.unwrap_or(0);
let mut old_doc_id_to_new = vec![0; old_max_doc as usize];
for i in 0..max_doc {
old_doc_id_to_new[new_doc_id_to_old[i] as usize] = i as DocId;
}
DocIdMapping {
new_doc_id_to_old,
old_doc_id_to_new,
}
}
/// returns the new doc_id for the old doc_id
pub fn get_new_doc_id(&self, doc_id: DocId) -> DocId {
self.old_doc_id_to_new[doc_id as usize]
}
/// returns the old doc_id for the new doc_id
pub fn get_old_doc_id(&self, doc_id: DocId) -> DocId {
self.new_doc_id_to_old[doc_id as usize]
}
/// iterate over old doc_ids in order of the new doc_ids
pub fn iter_old_doc_ids(&self) -> impl Iterator<Item = DocId> + Clone + '_ {
self.new_doc_id_to_old.iter().cloned()
}
/// Returns a slice mapping each old doc ID to its corresponding new doc ID.
pub fn old_to_new_ids(&self) -> &[DocId] {
&self.old_doc_id_to_new[..]
}
/// Remaps a given array to the new doc ids.
pub fn remap<T: Copy>(&self, els: &[T]) -> Vec<T> {
self.new_doc_id_to_old
.iter()
.map(|old_doc| els[*old_doc as usize])
.collect()
}
/// Returns the number of new (post-sort) doc IDs in this mapping.
pub fn num_new_doc_ids(&self) -> usize {
self.new_doc_id_to_old.len()
}
/// Returns the number of old (pre-sort) doc IDs covered by this mapping.
pub fn num_old_doc_ids(&self) -> usize {
self.old_doc_id_to_new.len()
}
}
pub(crate) fn expect_field_id_for_sort_field(
schema: &Schema,
sort_by_field: &IndexSortByField,
) -> crate::Result<Field> {
schema.get_field(&sort_by_field.field).map_err(|_| {
TantivyError::InvalidArgument(format!(
"field to sort index by not found: {:?}",
sort_by_field.field
))
})
}
// Generates a document mapping in the form of [index new doc_id] -> old doc_id
// TODO detect if field is already sorted and discard mapping
pub(crate) fn get_doc_id_mapping_from_field(
sort_by_field: IndexSortByField,
segment_writer: &SegmentWriter,
) -> crate::Result<DocIdMapping> {
let schema = segment_writer.segment_serializer.segment().schema();
expect_field_id_for_sort_field(&schema, &sort_by_field)?; // for now expect
let new_doc_id_to_old = segment_writer.fast_field_writers.sort_order(
sort_by_field.field.as_str(),
segment_writer.max_doc(),
sort_by_field.order.is_desc(),
);
// create new doc_id to old doc_id index (used in fast_field_writers)
Ok(DocIdMapping::from_new_id_to_old_id(new_doc_id_to_old))
}
#[cfg(test)]
mod tests_indexsorting {
use common::DateTime;
use crate::collector::TopDocs;
use crate::indexer::doc_id_mapping::DocIdMapping;
use crate::indexer::NoMergePolicy;
use crate::query::QueryParser;
use crate::schema::*;
use crate::{DocAddress, Index, IndexBuilder, IndexSettings, IndexSortByField, Order};
fn create_test_index(
index_settings: Option<IndexSettings>,
text_field_options: TextOptions,
) -> crate::Result<Index> {
let mut schema_builder = Schema::builder();
let my_text_field = schema_builder.add_text_field("text_field", text_field_options);
let my_string_field = schema_builder.add_text_field("string_field", STRING | STORED);
let my_number =
schema_builder.add_u64_field("my_number", NumericOptions::default().set_fast());
let multi_numbers =
schema_builder.add_u64_field("multi_numbers", NumericOptions::default().set_fast());
let schema = schema_builder.build();
let mut index_builder = Index::builder().schema(schema);
if let Some(settings) = index_settings {
index_builder = index_builder.settings(settings);
}
let index = index_builder.create_in_ram()?;
let mut index_writer = index.writer_for_tests()?;
index_writer.add_document(doc!(my_number=>40_u64))?;
index_writer.add_document(
doc!(my_number=>20_u64, multi_numbers => 5_u64, multi_numbers => 6_u64),
)?;
index_writer.add_document(doc!(my_number=>100_u64))?;
index_writer.add_document(
doc!(my_number=>10_u64, my_string_field=> "blublub", my_text_field => "some text"),
)?;
index_writer.add_document(doc!(my_number=>30_u64, multi_numbers => 3_u64 ))?;
index_writer.commit()?;
Ok(index)
}
fn get_text_options() -> TextOptions {
TextOptions::default().set_indexing_options(
TextFieldIndexing::default().set_index_option(IndexRecordOption::Basic),
)
}
#[test]
fn test_sort_index_test_text_field() -> crate::Result<()> {
// there are different serializers for different settings in postings/recorder.rs
// test remapping for all of them
let options = vec![
get_text_options(),
get_text_options().set_indexing_options(
TextFieldIndexing::default().set_index_option(IndexRecordOption::WithFreqs),
),
get_text_options().set_indexing_options(
TextFieldIndexing::default()
.set_index_option(IndexRecordOption::WithFreqsAndPositions),
),
];
for option in options {
// let options = get_text_options();
// no index_sort
let index = create_test_index(None, option.clone())?;
let my_text_field = index.schema().get_field("text_field").unwrap();
let searcher = index.reader()?.searcher();
let query = QueryParser::for_index(&index, vec![my_text_field]).parse_query("text")?;
let top_docs: Vec<(f32, DocAddress)> =
searcher.search(&query, &TopDocs::with_limit(3).order_by_score())?;
assert_eq!(
top_docs.iter().map(|el| el.1.doc_id).collect::<Vec<_>>(),
vec![3]
);
// sort by field asc
let index = create_test_index(
Some(IndexSettings {
sort_by_field: Some(IndexSortByField {
field: "my_number".to_string(),
order: Order::Asc,
}),
..Default::default()
}),
option.clone(),
)?;
let my_text_field = index.schema().get_field("text_field").unwrap();
let reader = index.reader()?;
let searcher = reader.searcher();
let query = QueryParser::for_index(&index, vec![my_text_field]).parse_query("text")?;
let top_docs: Vec<(f32, DocAddress)> =
searcher.search(&query, &TopDocs::with_limit(3).order_by_score())?;
assert_eq!(
top_docs.iter().map(|el| el.1.doc_id).collect::<Vec<_>>(),
vec![0]
);
// test new field norm mapping
{
let my_text_field = index.schema().get_field("text_field").unwrap();
let fieldnorm_reader = searcher
.segment_reader(0)
.get_fieldnorms_reader(my_text_field)?;
assert_eq!(fieldnorm_reader.fieldnorm(0), 2); // some text
assert_eq!(fieldnorm_reader.fieldnorm(1), 0);
}
// sort by field desc
let index = create_test_index(
Some(IndexSettings {
sort_by_field: Some(IndexSortByField {
field: "my_number".to_string(),
order: Order::Desc,
}),
..Default::default()
}),
option.clone(),
)?;
let my_string_field = index.schema().get_field("text_field").unwrap();
let searcher = index.reader()?.searcher();
let query =
QueryParser::for_index(&index, vec![my_string_field]).parse_query("text")?;
let top_docs: Vec<(f32, DocAddress)> =
searcher.search(&query, &TopDocs::with_limit(3).order_by_score())?;
assert_eq!(
top_docs.iter().map(|el| el.1.doc_id).collect::<Vec<_>>(),
vec![4]
);
// test new field norm mapping
{
let my_text_field = index.schema().get_field("text_field").unwrap();
let fieldnorm_reader = searcher
.segment_reader(0)
.get_fieldnorms_reader(my_text_field)?;
assert_eq!(fieldnorm_reader.fieldnorm(0), 0);
assert_eq!(fieldnorm_reader.fieldnorm(1), 0);
assert_eq!(fieldnorm_reader.fieldnorm(2), 0);
assert_eq!(fieldnorm_reader.fieldnorm(3), 0);
assert_eq!(fieldnorm_reader.fieldnorm(4), 2); // some text
}
}
Ok(())
}
#[test]
fn test_sort_index_get_documents() -> crate::Result<()> {
// default baseline
let index = create_test_index(None, get_text_options())?;
let my_string_field = index.schema().get_field("string_field").unwrap();
let searcher = index.reader()?.searcher();
{
assert!(searcher
.doc::<TantivyDocument>(DocAddress::new(0, 0))?
.get_first(my_string_field)
.is_none());
assert_eq!(
searcher
.doc::<TantivyDocument>(DocAddress::new(0, 3))?
.get_first(my_string_field)
.unwrap()
.as_str(),
Some("blublub")
);
}
// sort by field asc
let index = create_test_index(
Some(IndexSettings {
sort_by_field: Some(IndexSortByField {
field: "my_number".to_string(),
order: Order::Asc,
}),
..Default::default()
}),
get_text_options(),
)?;
let my_string_field = index.schema().get_field("string_field").unwrap();
let searcher = index.reader()?.searcher();
{
assert_eq!(
searcher
.doc::<TantivyDocument>(DocAddress::new(0, 0))?
.get_first(my_string_field)
.unwrap()
.as_str(),
Some("blublub")
);
let doc = searcher.doc::<TantivyDocument>(DocAddress::new(0, 4))?;
assert!(doc.get_first(my_string_field).is_none());
}
// sort by field desc
let index = create_test_index(
Some(IndexSettings {
sort_by_field: Some(IndexSortByField {
field: "my_number".to_string(),
order: Order::Desc,
}),
..Default::default()
}),
get_text_options(),
)?;
let my_string_field = index.schema().get_field("string_field").unwrap();
let searcher = index.reader()?.searcher();
{
let doc = searcher.doc::<TantivyDocument>(DocAddress::new(0, 4))?;
assert_eq!(
doc.get_first(my_string_field).unwrap().as_str(),
Some("blublub")
);
}
Ok(())
}
#[test]
fn test_sort_index_test_string_field() -> crate::Result<()> {
let index = create_test_index(None, get_text_options())?;
let my_string_field = index.schema().get_field("string_field").unwrap();
let searcher = index.reader()?.searcher();
let query = QueryParser::for_index(&index, vec![my_string_field]).parse_query("blublub")?;
let top_docs: Vec<(f32, DocAddress)> =
searcher.search(&query, &TopDocs::with_limit(3).order_by_score())?;
assert_eq!(
top_docs.iter().map(|el| el.1.doc_id).collect::<Vec<_>>(),
vec![3]
);
let index = create_test_index(
Some(IndexSettings {
sort_by_field: Some(IndexSortByField {
field: "my_number".to_string(),
order: Order::Asc,
}),
..Default::default()
}),
get_text_options(),
)?;
let my_string_field = index.schema().get_field("string_field").unwrap();
let reader = index.reader()?;
let searcher = reader.searcher();
let query = QueryParser::for_index(&index, vec![my_string_field]).parse_query("blublub")?;
let top_docs: Vec<(f32, DocAddress)> =
searcher.search(&query, &TopDocs::with_limit(3).order_by_score())?;
assert_eq!(
top_docs.iter().map(|el| el.1.doc_id).collect::<Vec<_>>(),
vec![0]
);
// test new field norm mapping
{
let my_text_field = index.schema().get_field("text_field").unwrap();
let fieldnorm_reader = searcher
.segment_reader(0)
.get_fieldnorms_reader(my_text_field)?;
assert_eq!(fieldnorm_reader.fieldnorm(0), 2); // some text
assert_eq!(fieldnorm_reader.fieldnorm(1), 0);
}
// sort by field desc
let index = create_test_index(
Some(IndexSettings {
sort_by_field: Some(IndexSortByField {
field: "my_number".to_string(),
order: Order::Desc,
}),
..Default::default()
}),
get_text_options(),
)?;
let my_string_field = index.schema().get_field("string_field").unwrap();
let searcher = index.reader()?.searcher();
let query = QueryParser::for_index(&index, vec![my_string_field]).parse_query("blublub")?;
let top_docs: Vec<(f32, DocAddress)> =
searcher.search(&query, &TopDocs::with_limit(3).order_by_score())?;
assert_eq!(
top_docs.iter().map(|el| el.1.doc_id).collect::<Vec<_>>(),
vec![4]
);
// test new field norm mapping
{
let my_text_field = index.schema().get_field("text_field").unwrap();
let fieldnorm_reader = searcher
.segment_reader(0)
.get_fieldnorms_reader(my_text_field)?;
assert_eq!(fieldnorm_reader.fieldnorm(0), 0);
assert_eq!(fieldnorm_reader.fieldnorm(1), 0);
assert_eq!(fieldnorm_reader.fieldnorm(2), 0);
assert_eq!(fieldnorm_reader.fieldnorm(3), 0);
assert_eq!(fieldnorm_reader.fieldnorm(4), 2); // some text
}
Ok(())
}
#[test]
fn test_sort_index_fast_field() -> crate::Result<()> {
let index = create_test_index(
Some(IndexSettings {
sort_by_field: Some(IndexSortByField {
field: "my_number".to_string(),
order: Order::Asc,
}),
..Default::default()
}),
get_text_options(),
)?;
assert_eq!(
index.settings().sort_by_field.as_ref().unwrap().field,
"my_number".to_string()
);
let searcher = index.reader()?.searcher();
assert_eq!(searcher.segment_readers().len(), 1);
let segment_reader = searcher.segment_reader(0);
let fast_fields = segment_reader.fast_fields();
let fast_field = fast_fields
.u64("my_number")
.unwrap()
.first_or_default_col(999);
assert_eq!(fast_field.get_val(0), 10u64);
assert_eq!(fast_field.get_val(1), 20u64);
assert_eq!(fast_field.get_val(2), 30u64);
let multifield = fast_fields.u64("multi_numbers").unwrap();
let vals: Vec<u64> = multifield.values_for_doc(0u32).collect();
assert_eq!(vals, &[] as &[u64]);
let vals: Vec<_> = multifield.values_for_doc(1u32).collect();
assert_eq!(vals, &[5, 6]);
let vals: Vec<_> = multifield.values_for_doc(2u32).collect();
assert_eq!(vals, &[3]);
Ok(())
}
#[test]
fn test_with_sort_by_date_field() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
let date_field = schema_builder.add_date_field("date", INDEXED | STORED | FAST);
let schema = schema_builder.build();
let settings = IndexSettings {
sort_by_field: Some(IndexSortByField {
field: "date".to_string(),
order: Order::Desc,
}),
..Default::default()
};
let index = Index::builder()
.schema(schema)
.settings(settings)
.create_in_ram()?;
let mut index_writer = index.writer_for_tests()?;
index_writer.set_merge_policy(Box::new(NoMergePolicy));
index_writer.add_document(doc!(
date_field => DateTime::from_timestamp_secs(1000),
))?;
index_writer.add_document(doc!(
date_field => DateTime::from_timestamp_secs(999),
))?;
index_writer.add_document(doc!(
date_field => DateTime::from_timestamp_secs(1001),
))?;
index_writer.commit()?;
let searcher = index.reader()?.searcher();
assert_eq!(searcher.segment_readers().len(), 1);
let segment_reader = searcher.segment_reader(0);
let fast_fields = segment_reader.fast_fields();
let fast_field = fast_fields
.date("date")
.unwrap()
.first_or_default_col(DateTime::from_timestamp_secs(0));
assert_eq!(fast_field.get_val(0), DateTime::from_timestamp_secs(1001));
assert_eq!(fast_field.get_val(1), DateTime::from_timestamp_secs(1000));
assert_eq!(fast_field.get_val(2), DateTime::from_timestamp_secs(999));
Ok(())
}
#[test]
fn test_doc_mapping() {
let doc_mapping = DocIdMapping::from_new_id_to_old_id(vec![3, 2, 5]);
assert_eq!(doc_mapping.get_old_doc_id(0), 3);
assert_eq!(doc_mapping.get_old_doc_id(1), 2);
assert_eq!(doc_mapping.get_old_doc_id(2), 5);
assert_eq!(doc_mapping.get_new_doc_id(0), 0);
assert_eq!(doc_mapping.get_new_doc_id(1), 0);
assert_eq!(doc_mapping.get_new_doc_id(2), 1);
assert_eq!(doc_mapping.get_new_doc_id(3), 0);
assert_eq!(doc_mapping.get_new_doc_id(4), 0);
assert_eq!(doc_mapping.get_new_doc_id(5), 2);
}
#[test]
fn test_doc_mapping_remap() {
let doc_mapping = DocIdMapping::from_new_id_to_old_id(vec![2, 8, 3]);
assert_eq!(
&doc_mapping.remap(&[0, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000]),
&[2000, 8000, 3000]
);
}
#[test]
fn test_text_sort() -> crate::Result<()> {
let mut schema_builder = SchemaBuilder::new();
let id_field = schema_builder.add_text_field("id", STRING | FAST | STORED);
schema_builder.add_text_field("name", TEXT | STORED);
let index = IndexBuilder::new()
.schema(schema_builder.build())
.settings(IndexSettings {
sort_by_field: Some(IndexSortByField {
field: "id".to_string(),
order: Order::Asc,
}),
..Default::default()
})
.create_in_ram()?;
let mut index_writer = index.writer_for_tests()?;
index_writer.add_document(doc!(id_field => "z"))?;
index_writer.add_document(doc!(id_field => "a"))?;
index_writer.add_document(doc!(id_field => "m"))?;
index_writer.commit()?;
let searcher = index.reader()?.searcher();
let segment_reader = searcher.segment_reader(0);
let str_col = segment_reader.fast_fields().str("id")?.unwrap();
let mut values = Vec::new();
for doc in 0..segment_reader.max_doc() {
if let Some(ord) = str_col.ords().first(doc) {
let mut s = String::new();
str_col.ord_to_str(ord, &mut s)?;
values.push(s);
}
}
assert_eq!(values, vec!["a", "m", "z"]);
Ok(())
}
}

View File

@@ -218,7 +218,7 @@ fn index_documents<D: Document>(
let alive_bitset_opt = apply_deletes(&segment_with_max_doc, &mut delete_cursor, &doc_opstamps)?;
let meta = segment_with_max_doc.meta().clone();
meta.untrack_temp_docstore();
// update segment_updater inventory to remove tempstore
let segment_entry = SegmentEntry::new(meta, delete_cursor, alive_bitset_opt);
segment_updater.schedule_add_segment(segment_entry).wait()?;
@@ -819,7 +819,7 @@ mod tests {
use std::collections::{HashMap, HashSet};
use std::net::Ipv6Addr;
use columnar::{Column, MonotonicallyMappableToU128};
use columnar::{Cardinality, Column, MonotonicallyMappableToU128};
use itertools::Itertools;
use proptest::prop_oneof;
@@ -829,7 +829,7 @@ mod tests {
use crate::error::*;
use crate::indexer::index_writer::MEMORY_BUDGET_NUM_BYTES_MIN;
use crate::indexer::{IndexWriterOptions, NoMergePolicy};
use crate::query::{QueryParser, TermQuery};
use crate::query::{BooleanQuery, Occur, Query, QueryParser, TermQuery};
use crate::schema::{
self, Facet, FacetOptions, IndexRecordOption, IpAddrOptions, JsonObjectOptions,
NumericOptions, Schema, TextFieldIndexing, TextOptions, Value, FAST, INDEXED, STORED,
@@ -837,8 +837,8 @@ mod tests {
};
use crate::store::DOCSTORE_CACHE_CAPACITY;
use crate::{
DateTime, DocAddress, Index, IndexSettings, IndexWriter, ReloadPolicy, TantivyDocument,
Term,
DateTime, DocAddress, Index, IndexSettings, IndexSortByField, IndexWriter, Order,
ReloadPolicy, TantivyDocument, Term,
};
const LOREM: &str = "Doc Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do \
@@ -1479,6 +1479,116 @@ mod tests {
assert!(text_fast_field.term_ords(1).eq([1].into_iter()));
}
#[test]
fn test_delete_with_sort_by_field() -> crate::Result<()> {
let mut schema_builder = schema::Schema::builder();
let id_field = schema_builder.add_u64_field("id", INDEXED | schema::STORED | FAST);
let schema = schema_builder.build();
let settings = IndexSettings {
sort_by_field: Some(IndexSortByField {
field: "id".to_string(),
order: Order::Desc,
}),
..Default::default()
};
let index = Index::builder()
.schema(schema)
.settings(settings)
.create_in_ram()?;
let index_reader = index.reader()?;
let mut index_writer = index.writer_for_tests()?;
// create and delete docs in same commit
for id in 0u64..5u64 {
index_writer.add_document(doc!(id_field => id))?;
}
for id in 2u64..4u64 {
index_writer.delete_term(Term::from_field_u64(id_field, id));
}
for id in 5u64..10u64 {
index_writer.add_document(doc!(id_field => id))?;
}
index_writer.commit()?;
index_reader.reload()?;
let searcher = index_reader.searcher();
assert_eq!(searcher.segment_readers().len(), 1);
let segment_reader = searcher.segment_reader(0);
assert_eq!(segment_reader.num_docs(), 8);
assert_eq!(segment_reader.max_doc(), 10);
let fast_field_reader = segment_reader.fast_fields().u64("id")?;
let in_order_alive_ids: Vec<u64> = segment_reader
.doc_ids_alive()
.flat_map(|doc| fast_field_reader.values_for_doc(doc))
.collect();
assert_eq!(&in_order_alive_ids[..], &[9, 8, 7, 6, 5, 4, 1, 0]);
Ok(())
}
#[test]
fn test_delete_query_with_sort_by_field() -> crate::Result<()> {
let mut schema_builder = schema::Schema::builder();
let id_field = schema_builder.add_u64_field("id", INDEXED | schema::STORED | FAST);
let schema = schema_builder.build();
let settings = IndexSettings {
sort_by_field: Some(IndexSortByField {
field: "id".to_string(),
order: Order::Desc,
}),
..Default::default()
};
let index = Index::builder()
.schema(schema)
.settings(settings)
.create_in_ram()?;
let index_reader = index.reader()?;
let mut index_writer = index.writer_for_tests()?;
// create and delete docs in same commit
for id in 0u64..5u64 {
index_writer.add_document(doc!(id_field => id))?;
}
for id in 1u64..4u64 {
let term = Term::from_field_u64(id_field, id);
let not_term = Term::from_field_u64(id_field, 2);
let term = Box::new(TermQuery::new(term, Default::default()));
let not_term = Box::new(TermQuery::new(not_term, Default::default()));
let query: BooleanQuery = vec![
(Occur::Must, term as Box<dyn Query>),
(Occur::MustNot, not_term as Box<dyn Query>),
]
.into();
index_writer.delete_query(Box::new(query))?;
}
for id in 5u64..10u64 {
index_writer.add_document(doc!(id_field => id))?;
}
index_writer.commit()?;
index_reader.reload()?;
let searcher = index_reader.searcher();
assert_eq!(searcher.segment_readers().len(), 1);
let segment_reader = searcher.segment_reader(0);
assert_eq!(segment_reader.num_docs(), 8);
assert_eq!(segment_reader.max_doc(), 10);
let fast_field_reader = segment_reader.fast_fields().u64("id")?;
let in_order_alive_ids: Vec<u64> = segment_reader
.doc_ids_alive()
.flat_map(|doc| fast_field_reader.values_for_doc(doc))
.collect();
assert_eq!(&in_order_alive_ids[..], &[9, 8, 7, 6, 5, 4, 2, 0]);
Ok(())
}
#[derive(Debug, Clone)]
enum IndexingOp {
AddMultipleDoc {
@@ -1625,7 +1735,11 @@ mod tests {
id_list
}
fn test_operation_strategy(ops: &[IndexingOp], force_end_merge: bool) -> crate::Result<Index> {
fn test_operation_strategy(
ops: &[IndexingOp],
sort_index: bool,
force_end_merge: bool,
) -> crate::Result<Index> {
let mut schema_builder = schema::Schema::builder();
let json_field = schema_builder.add_json_field("json", FAST | TEXT | STORED);
let ip_field = schema_builder.add_ip_addr_field("ip", FAST | INDEXED | STORED);
@@ -1661,7 +1775,15 @@ mod tests {
);
let facet_field = schema_builder.add_facet_field("facet", FacetOptions::default());
let schema = schema_builder.build();
let settings = {
let settings = if sort_index {
IndexSettings {
sort_by_field: Some(IndexSortByField {
field: "id_opt".to_string(),
order: Order::Asc,
}),
..Default::default()
}
} else {
IndexSettings {
..Default::default()
}
@@ -2225,13 +2347,33 @@ mod tests {
}
}
// Test if index property is in sort order
if sort_index {
// load all id_opt in each segment and check they are in order
for reader in searcher.segment_readers() {
let (ff_reader, _) = reader.fast_fields().u64_lenient("id_opt").unwrap().unwrap();
let mut ids_in_segment: Vec<u64> = Vec::new();
for doc in 0..reader.num_docs() {
ids_in_segment.extend(ff_reader.values_for_doc(doc));
}
assert!(is_sorted(&ids_in_segment));
fn is_sorted<T>(data: &[T]) -> bool
where T: Ord {
data.windows(2).all(|w| w[0] <= w[1])
}
}
}
Ok(index)
}
#[test]
fn test_fast_field_range() {
let ops: Vec<_> = (0..1000).map(IndexingOp::add).collect();
assert!(test_operation_strategy(&ops, true).is_ok());
assert!(test_operation_strategy(&ops, false, true).is_ok());
}
#[test]
@@ -2245,6 +2387,7 @@ mod tests {
IndexingOp::Commit,
IndexingOp::Merge
],
true,
false
)
.is_ok());
@@ -2261,6 +2404,7 @@ mod tests {
IndexingOp::add(1),
IndexingOp::Commit,
],
false,
true
)
.is_ok());
@@ -2268,24 +2412,97 @@ mod tests {
#[test]
fn test_minimal_sort_force_end_merge() {
assert!(
test_operation_strategy(&[IndexingOp::add(23), IndexingOp::add(13),], false).is_ok()
);
assert!(test_operation_strategy(
&[IndexingOp::add(23), IndexingOp::add(13),],
false,
false
)
.is_ok());
}
#[test]
fn test_minimal_no_force_end_merge() {
fn test_minimal_sort() {
let mut schema_builder = Schema::builder();
let val = schema_builder.add_u64_field("val", FAST);
let id = schema_builder.add_u64_field("id", FAST);
let schema = schema_builder.build();
let settings = IndexSettings {
sort_by_field: Some(IndexSortByField {
field: "id".to_string(),
order: Order::Asc,
}),
..Default::default()
};
let index = Index::builder()
.schema(schema)
.settings(settings)
.create_in_ram()
.unwrap();
let mut writer = index.writer_for_tests().unwrap();
writer
.add_document(doc!(id=> 3u64, val=>4u64, val=>4u64))
.unwrap();
writer
.add_document(doc!(id=> 2u64, val=>2u64, val=>2u64))
.unwrap();
writer
.add_document(doc!(id=> 1u64, val=>1u64, val=>1u64))
.unwrap();
writer.commit().unwrap();
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let segment_reader = searcher.segment_reader(0);
let id_col: Column = segment_reader
.fast_fields()
.column_opt("id")
.unwrap()
.unwrap();
let val_col: Column = segment_reader
.fast_fields()
.column_opt("val")
.unwrap()
.unwrap();
assert_eq!(id_col.get_cardinality(), Cardinality::Full);
assert_eq!(val_col.get_cardinality(), Cardinality::Multivalued);
assert_eq!(id_col.first(0u32), Some(1u64));
assert_eq!(id_col.first(1u32), Some(2u64));
assert!(val_col.values_for_doc(0u32).eq([1u64, 1u64].into_iter()));
assert!(val_col.values_for_doc(1u32).eq([2u64, 2u64].into_iter()));
}
#[test]
fn test_minimal_sort_force_end_merge_with_delete() {
assert!(test_operation_strategy(
&[
IndexingOp::add(23),
IndexingOp::add(13),
IndexingOp::DeleteDoc { id: 13 }
],
true,
true
)
.is_ok());
}
#[test]
fn test_minimal_no_sort_no_force_end_merge() {
assert!(test_operation_strategy(
&[
IndexingOp::add(23),
IndexingOp::add(13),
IndexingOp::DeleteDoc { id: 13 }
],
false,
false
)
.is_ok());
}
#[test]
fn test_minimal_sort_merge() {
assert!(test_operation_strategy(&[IndexingOp::add(3),], true, true).is_ok());
}
use proptest::prelude::*;
proptest! {
@@ -2293,23 +2510,77 @@ mod tests {
#![proptest_config(ProptestConfig::with_cases(20))]
#[test]
fn test_delete_proptest_adding(ops in proptest::collection::vec(adding_operation_strategy(), 1..100)) {
assert!(test_operation_strategy(&ops[..], false).is_ok());
assert!(test_operation_strategy(&ops[..], true, false).is_ok());
}
#[test]
fn test_delete_proptest_with_merge_adding(ops in proptest::collection::vec(adding_operation_strategy(), 1..100)) {
assert!(test_operation_strategy(&ops[..], true).is_ok());
assert!(test_operation_strategy(&ops[..], false, false).is_ok());
}
#[test]
fn test_delete_proptest(ops in proptest::collection::vec(balanced_operation_strategy(), 1..10)) {
assert!(test_operation_strategy(&ops[..], false).is_ok());
assert!(test_operation_strategy(&ops[..], true, true).is_ok());
}
#[test]
fn test_delete_proptest_with_merge(ops in proptest::collection::vec(balanced_operation_strategy(), 1..100)) {
assert!(test_operation_strategy(&ops[..], true).is_ok());
assert!(test_operation_strategy(&ops[..], false, true).is_ok());
}
#[test]
#[ignore = "doesn't work with deferred segment loading"]
fn test_delete_without_sort_proptest(ops in proptest::collection::vec(balanced_operation_strategy(), 1..10)) {
assert!(test_operation_strategy(&ops[..], false, false).is_ok());
}
#[test]
#[ignore = "doesn't work with deferred segment loading"]
fn test_delete_with_sort_proptest_with_merge(ops in proptest::collection::vec(balanced_operation_strategy(), 1..10)) {
assert!(test_operation_strategy(&ops[..], true, true).is_ok());
}
}
#[test]
fn test_delete_with_sort_by_field_last_opstamp_is_not_max() -> crate::Result<()> {
let mut schema_builder = schema::Schema::builder();
let sort_by_field = schema_builder.add_u64_field("sort_by", FAST);
let id_field = schema_builder.add_u64_field("id", INDEXED);
let schema = schema_builder.build();
let settings = IndexSettings {
sort_by_field: Some(IndexSortByField {
field: "sort_by".to_string(),
order: Order::Asc,
}),
..Default::default()
};
let index = Index::builder()
.schema(schema)
.settings(settings)
.create_in_ram()?;
let mut index_writer = index.writer_for_tests()?;
// We add a doc...
index_writer.add_document(doc!(sort_by_field => 2u64, id_field => 0u64))?;
// And remove it.
index_writer.delete_term(Term::from_field_u64(id_field, 0u64));
// We add another doc.
index_writer.add_document(doc!(sort_by_field=>1u64, id_field => 0u64))?;
// The expected result is a segment with
// maxdoc = 2
// numdoc = 1.
index_writer.commit()?;
let searcher = index.reader()?.searcher();
assert_eq!(searcher.segment_readers().len(), 1);
let segment_reader = searcher.segment_reader(0);
assert_eq!(segment_reader.max_doc(), 2);
assert_eq!(segment_reader.num_docs(), 1);
Ok(())
}
#[test]
@@ -2326,7 +2597,7 @@ mod tests {
IndexingOp::add(4),
Commit,
];
test_operation_strategy(&ops[..], true).unwrap();
test_operation_strategy(&ops[..], false, true).unwrap();
}
#[test]
@@ -2339,7 +2610,7 @@ mod tests {
Commit,
Merge,
];
test_operation_strategy(&ops[..], true).unwrap();
test_operation_strategy(&ops[..], false, true).unwrap();
}
#[test]
@@ -2351,7 +2622,7 @@ mod tests {
IndexingOp::add(13),
Commit,
];
test_operation_strategy(&ops[..], true).unwrap();
test_operation_strategy(&ops[..], false, true).unwrap();
}
#[test]
@@ -2362,7 +2633,7 @@ mod tests {
IndexingOp::add(9),
IndexingOp::add(10),
];
test_operation_strategy(&ops[..], false).unwrap();
test_operation_strategy(&ops[..], false, false).unwrap();
}
#[test]
@@ -2389,6 +2660,7 @@ mod tests {
IndexingOp::Commit,
IndexingOp::Commit
],
false,
false
)
.is_ok());
@@ -2409,6 +2681,7 @@ mod tests {
IndexingOp::Merge,
],
true,
false,
)
.unwrap();
}

View File

@@ -1,148 +0,0 @@
#[cfg(test)]
mod tests {
use crate::collector::TopDocs;
use crate::fastfield::AliveBitSet;
use crate::index::Index;
use crate::postings::Postings;
use crate::query::QueryParser;
use crate::schema::{
self, BytesOptions, Facet, FacetOptions, IndexRecordOption, NumericOptions,
TextFieldIndexing, TextOptions,
};
use crate::{DocAddress, DocSet, IndexSettings, IndexWriter, Term};
fn create_test_index(index_settings: Option<IndexSettings>) -> crate::Result<Index> {
let mut schema_builder = schema::Schema::builder();
let int_options = NumericOptions::default()
.set_fast()
.set_stored()
.set_indexed();
let int_field = schema_builder.add_u64_field("intval", int_options);
let bytes_options = BytesOptions::default().set_fast().set_indexed();
let bytes_field = schema_builder.add_bytes_field("bytes", bytes_options);
let facet_field = schema_builder.add_facet_field("facet", FacetOptions::default());
let multi_numbers =
schema_builder.add_u64_field("multi_numbers", NumericOptions::default().set_fast());
let text_field_options = TextOptions::default()
.set_indexing_options(
TextFieldIndexing::default()
.set_index_option(schema::IndexRecordOption::WithFreqsAndPositions),
)
.set_stored();
let text_field = schema_builder.add_text_field("text_field", text_field_options);
let schema = schema_builder.build();
let mut index_builder = Index::builder().schema(schema);
if let Some(settings) = index_settings {
index_builder = index_builder.settings(settings);
}
let index = index_builder.create_in_ram()?;
{
let mut index_writer = index.writer_for_tests()?;
// segment 1 - range 1-3
index_writer.add_document(doc!(int_field=>1_u64))?;
index_writer.add_document(
doc!(int_field=>3_u64, multi_numbers => 3_u64, multi_numbers => 4_u64, bytes_field => vec![1, 2, 3], text_field => "some text", facet_field=> Facet::from("/book/crime")),
)?;
index_writer.add_document(
doc!(int_field=>1_u64, text_field=> "deleteme", text_field => "ok text more text"),
)?;
index_writer.add_document(
doc!(int_field=>2_u64, multi_numbers => 2_u64, multi_numbers => 3_u64, text_field => "ok text more text"),
)?;
index_writer.commit()?;
index_writer.add_document(doc!(int_field=>20_u64, multi_numbers => 20_u64))?;
let in_val = 1u64;
index_writer.add_document(doc!(int_field=>in_val, text_field=> "deleteme" , text_field => "ok text more text", facet_field=> Facet::from("/book/crime")))?;
index_writer.commit()?;
let int_vals = [10u64, 5];
index_writer.add_document( // position of this doc after delete in desc sorting = [2], in disjunct case [1]
doc!(int_field=>int_vals[0], multi_numbers => 10_u64, multi_numbers => 11_u64, text_field=> "blubber", facet_field=> Facet::from("/book/fantasy")),
)?;
index_writer.add_document(doc!(int_field=>int_vals[1], text_field=> "deleteme"))?;
index_writer.add_document(
doc!(int_field=>1_000u64, multi_numbers => 1001_u64, multi_numbers => 1002_u64, bytes_field => vec![5, 5],text_field => "the biggest num")
)?;
index_writer.delete_term(Term::from_field_text(text_field, "deleteme"));
index_writer.commit()?;
}
// Merging the segments
{
let segment_ids = index.searchable_segment_ids()?;
let mut index_writer: IndexWriter = index.writer_for_tests()?;
index_writer.merge(&segment_ids).wait()?;
index_writer.wait_merging_threads()?;
}
Ok(index)
}
#[test]
fn test_merge_index() {
let index = create_test_index(Some(IndexSettings {
..Default::default()
}))
.unwrap();
let reader = index.reader().unwrap();
let searcher = reader.searcher();
assert_eq!(searcher.segment_readers().len(), 1);
let segment_reader = searcher.segment_readers().last().unwrap();
let searcher = index.reader().unwrap().searcher();
{
let my_text_field = index.schema().get_field("text_field").unwrap();
let do_search = |term: &str| {
let query = QueryParser::for_index(&index, vec![my_text_field])
.parse_query(term)
.unwrap();
let top_docs: Vec<(f32, DocAddress)> = searcher
.search(&query, &TopDocs::with_limit(3).order_by_score())
.unwrap();
top_docs.iter().map(|el| el.1.doc_id).collect::<Vec<_>>()
};
assert_eq!(do_search("some"), vec![1]);
assert_eq!(do_search("blubber"), vec![3]);
assert_eq!(do_search("biggest"), vec![4]);
}
// postings file
{
let my_text_field = index.schema().get_field("text_field").unwrap();
let term_a = Term::from_field_text(my_text_field, "text");
let inverted_index = segment_reader.inverted_index(my_text_field).unwrap();
let mut postings = inverted_index
.read_postings(&term_a, IndexRecordOption::WithFreqsAndPositions)
.unwrap()
.unwrap();
assert_eq!(postings.doc_freq(), 2);
let fallback_bitset = AliveBitSet::for_test_from_deleted_docs(&[0], 100);
assert_eq!(
postings.doc_freq_given_deletes(
segment_reader.alive_bitset().unwrap_or(&fallback_bitset)
),
2
);
assert_eq!(postings.term_freq(), 1);
let mut output = vec![];
postings.positions(&mut output);
assert_eq!(output, vec![1]);
postings.advance();
assert_eq!(postings.term_freq(), 2);
postings.positions(&mut output);
assert_eq!(output, vec![1, 3]);
}
}
}

View File

@@ -1,7 +1,8 @@
use std::sync::Arc;
use columnar::{
ColumnType, ColumnarReader, MergeRowOrder, RowAddr, ShuffleMergeOrder, StackMergeOrder,
compute_merged_term_ord_mapping, BytesColumn, Column, ColumnType, ColumnarReader,
MergeRowOrder, RowAddr, ShuffleMergeOrder, StackMergeOrder,
};
use common::ReadOnlyBitSet;
use itertools::Itertools;
@@ -10,16 +11,52 @@ use measure_time::debug_time;
use crate::directory::WritePtr;
use crate::docset::{DocSet, TERMINATED};
use crate::error::DataCorruption;
use crate::fastfield::AliveBitSet;
use crate::fastfield::{AliveBitSet, FastFieldNotAvailableError};
use crate::fieldnorm::{FieldNormReader, FieldNormReaders, FieldNormsSerializer, FieldNormsWriter};
use crate::index::{Segment, SegmentComponent, SegmentReader};
use crate::indexer::doc_id_mapping::{MappingType, SegmentDocIdMapping};
use crate::indexer::SegmentSerializer;
use crate::postings::{InvertedIndexSerializer, Postings, SegmentPostings};
use crate::schema::{value_type_to_column_type, Field, FieldType, Schema};
use crate::schema::{value_type_to_column_type, Field, FieldType, Schema, Type};
use crate::store::StoreWriter;
use crate::termdict::{TermMerger, TermOrdinal};
use crate::{DocAddress, DocId, InvertedIndexReader};
use crate::{
DocAddress, DocId, IndexSettings, IndexSortByField, InvertedIndexReader, Order, SegmentOrdinal,
};
/// Per-segment accessor for Str/Bytes sort fields during index merging.
///
/// Each segment stores its own term dictionary with segment-local ordinals. To compare terms
/// across segments we compute a merged global dictionary and map each segment's local ordinals
/// to the corresponding merged ordinal via `merged_term_ord_mapping`. This avoids materializing
/// the actual term bytes during the merge sort — ordinal comparison is sufficient because the
/// merged dictionary preserves lexicographic order.
struct StrBytesSortFieldAccessor {
ords: Column<u64>,
merged_term_ord_mapping: Vec<TermOrdinal>,
}
impl StrBytesSortFieldAccessor {
fn remapped_term_ord(&self, doc_id: DocId) -> Option<TermOrdinal> {
self.ords.first(doc_id).map(|old_ord| {
let old_ord = old_ord as usize;
debug_assert!(old_ord < self.merged_term_ord_mapping.len());
self.merged_term_ord_mapping[old_ord]
})
}
}
/// Owned per-segment sort-field accessors, kept alive for the duration of the merge.
///
/// - `Numeric`: direct column value access — all numeric/datetime types share a single u64 column
/// interface, so segments can be compared directly by value.
/// - `StrBytes`: ordinal-based access — each segment's local term ordinals are remapped to merged
/// global ordinals so that cross-segment lexicographic comparison works without loading term
/// bytes.
enum ReaderSortFieldAccessors {
Numeric(Vec<(SegmentOrdinal, Column<u64>)>),
StrBytes(Vec<(SegmentOrdinal, StrBytesSortFieldAccessor)>),
}
/// Segment's max doc must be `< MAX_DOC_LIMIT`.
///
@@ -76,7 +113,9 @@ fn estimate_total_num_tokens(readers: &[SegmentReader], field: Field) -> crate::
Ok(total_num_tokens)
}
/// Merges multiple index segments into a single new segment.
pub struct IndexMerger {
index_settings: IndexSettings,
schema: Schema,
pub(crate) readers: Vec<SegmentReader>,
max_doc: u32,
@@ -112,7 +151,7 @@ fn convert_to_merge_order(
) -> MergeRowOrder {
match doc_id_mapping.mapping_type() {
MappingType::Stacked => MergeRowOrder::Stack(StackMergeOrder::stack(columnars)),
MappingType::StackedWithDeletes => {
MappingType::StackedWithDeletes | MappingType::Shuffled => {
// RUST/LLVM is amazing. The following conversion is actually a no-op:
// no allocation, no copy.
let new_row_id_to_old_row_id: Vec<RowAddr> = doc_id_mapping
@@ -145,25 +184,62 @@ fn extract_fast_field_required_columns(schema: &Schema) -> Vec<(String, ColumnTy
}
impl IndexMerger {
pub fn open(schema: Schema, segments: &[Segment]) -> crate::Result<IndexMerger> {
let alive_bitset = segments.iter().map(|_| None).collect_vec();
Self::open_with_custom_alive_set(schema, segments, alive_bitset)
fn total_num_new_docs(&self) -> usize {
self.readers
.iter()
.map(|reader| reader.num_docs() as usize)
.sum()
}
// Create merge with a custom delete set.
// For every Segment, a delete bitset can be provided, which
// will be merged with the existing bit set. Make sure the index
// corresponds to the segment index.
//
// If `None` is provided for custom alive set, the regular alive set will be used.
// If a alive_bitset is provided, the union between the provided and regular
// alive set will be used.
//
// This can be used to merge but also apply an additional filter.
// One use case is demux, which is basically taking a list of
// segments and partitions them e.g. by a value in a field.
fn collect_alive_bitsets(&self) -> Vec<Option<ReadOnlyBitSet>> {
self.readers
.iter()
.map(|reader| {
reader
.alive_bitset()
.map(|alive_bitset| alive_bitset.bitset().clone())
})
.collect()
}
/// Column cardinality metadata (`Optional`) covers all docs including deleted ones.
/// A segment can report `Optional` but have zero live NULLs if every NULL doc was
/// deleted. We scan alive docs to distinguish this case, because deleted NULLs
/// are excluded from the merge and shouldn't block the disjunct-stack path.
fn segment_has_live_nulls(&self, segment_ord: SegmentOrdinal, col: &Column<u64>) -> bool {
if col.get_cardinality() != columnar::Cardinality::Optional {
return false;
}
let reader = &self.readers[segment_ord as usize];
if !reader.has_deletes() {
return true;
}
reader
.doc_ids_alive()
.any(|doc_id| col.first(doc_id).is_none())
}
/// Opens an [`IndexMerger`] over the given segments using their existing delete sets.
pub fn open(
schema: Schema,
index_settings: IndexSettings,
segments: &[Segment],
) -> crate::Result<IndexMerger> {
let alive_bitset = segments.iter().map(|_| None).collect_vec();
Self::open_with_custom_alive_set(schema, index_settings, segments, alive_bitset)
}
/// Opens an [`IndexMerger`] with a custom alive (delete) set per segment.
///
/// For every segment, an optional [`AliveBitSet`] can be provided which is intersected
/// with the segment's existing alive set. Pass `None` for a segment to use its existing
/// delete set unchanged.
///
/// This allows merging while also applying an additional filter, for example to demux
/// documents by a field value into separate output segments.
pub fn open_with_custom_alive_set(
schema: Schema,
index_settings: IndexSettings,
segments: &[Segment],
alive_bitset_opt: Vec<Option<AliveBitSet>>,
) -> crate::Result<IndexMerger> {
@@ -177,6 +253,12 @@ impl IndexMerger {
}
let max_doc = readers.iter().map(|reader| reader.num_docs()).sum();
if let Some(sort_by_field) = index_settings.sort_by_field.as_ref() {
let schema_field = schema.get_field(&sort_by_field.field)?;
let field_entry = schema.get_field_entry(schema_field);
let field_type = field_entry.field_type().value_type();
readers = Self::sort_readers_by_min_sort_field(readers, sort_by_field, field_type)?;
}
// sort segments by their natural sort setting
if max_doc >= MAX_DOC_LIMIT {
let err_msg = format!(
@@ -186,12 +268,50 @@ impl IndexMerger {
return Err(crate::TantivyError::InvalidArgument(err_msg));
}
Ok(IndexMerger {
index_settings,
schema,
readers,
max_doc,
})
}
fn sort_by_field_type(&self, sort_by_field: &IndexSortByField) -> crate::Result<Type> {
let schema_field = self.schema.get_field(&sort_by_field.field)?;
let field_entry = self.schema.get_field_entry(schema_field);
Ok(field_entry.field_type().value_type())
}
fn sort_readers_by_min_sort_field(
readers: Vec<SegmentReader>,
sort_by_field: &IndexSortByField,
field_type: Type,
) -> crate::Result<Vec<SegmentReader>> {
if matches!(field_type, Type::Str | Type::Bytes) {
// Ordinals are per-segment and not directly comparable, so the "disjunct min/max"
// shortcut that works for numeric fields does not apply here.
return Ok(readers);
}
// presort the readers by their min_values, so that when they are disjunct, we can use
// the regular merge logic (implicitly sorted)
let mut readers_with_min_sort_values = readers
.into_iter()
.map(|reader| {
let accessor = Self::get_numeric_accessor(&reader, sort_by_field)?;
Ok((reader, accessor.min_value()))
})
.collect::<crate::Result<Vec<_>>>()?;
if sort_by_field.order.is_asc() {
readers_with_min_sort_values.sort_by_key(|(_, min_val)| *min_val);
} else {
readers_with_min_sort_values.sort_by_key(|(_, min_val)| std::cmp::Reverse(*min_val));
}
Ok(readers_with_min_sort_values
.into_iter()
.map(|(reader, _)| reader)
.collect())
}
fn write_fieldnorms(
&self,
mut fieldnorms_serializer: FieldNormsSerializer,
@@ -239,14 +359,261 @@ impl IndexMerger {
Ok(())
}
/// Checks if segments can use the fast disjunct-stack path (byte concatenation)
/// instead of a full k-way merge.
///
/// Stacking preserves per-segment order but doesn't reposition docs across segments.
/// NULLs must sort first (ASC) or last (DESC) globally, but stacking can't move a
/// NULL from segment 2 before values in segment 1. So any live NULL forces a full
/// k-way merge to place NULLs correctly.
fn is_disjunct_and_sorted_on_sort_property(
&self,
sort_by_field: &IndexSortByField,
) -> crate::Result<bool> {
let field_type = self.sort_by_field_type(sort_by_field)?;
// Disjunct shortcut is invalid for Str/Bytes because ords are per-segment.
if matches!(field_type, Type::Str | Type::Bytes) {
return Ok(false);
}
let reader_ordinal_and_field_accessors = self.get_numeric_accessors(sort_by_field)?;
let asc = sort_by_field.order.is_asc();
let values_disjunct = reader_ordinal_and_field_accessors
.iter()
.map(|(_, col)| col)
.tuple_windows()
.all(|(col1, col2)| {
if asc {
col1.max_value() <= col2.min_value()
} else {
col1.min_value() >= col2.max_value()
}
});
if !values_disjunct {
return Ok(false);
}
let has_live_nulls = reader_ordinal_and_field_accessors
.iter()
.any(|(segment_ord, col)| self.segment_has_live_nulls(*segment_ord, col));
Ok(!has_live_nulls)
}
fn get_str_bytes_column(
reader: &SegmentReader,
sort_by_field: &IndexSortByField,
field_type: Type,
) -> crate::Result<BytesColumn> {
let not_available = || -> crate::TantivyError {
FastFieldNotAvailableError {
field_name: sort_by_field.field.to_string(),
}
.into()
};
match field_type {
Type::Str => reader
.fast_fields()
.str(&sort_by_field.field)?
.map(Into::into)
.ok_or_else(not_available),
Type::Bytes => reader
.fast_fields()
.bytes(&sort_by_field.field)?
.ok_or_else(not_available),
_ => unreachable!("get_str_bytes_column called with non-Str/Bytes type"),
}
}
/// Builds per-segment [`StrBytesSortFieldAccessor`]s for Str/Bytes sort fields.
///
/// 1. Extracts each segment's `BytesColumn` (term dictionary + ordinal column).
/// 2. Computes a merged dictionary across all segments via [`compute_merged_term_ord_mapping`],
/// producing a per-segment mapping from local term ordinal → merged global ordinal.
/// 3. Wraps each segment's ordinal column and mapping into a `StrBytesSortFieldAccessor`.
fn get_str_bytes_accessors(
&self,
sort_by_field: &IndexSortByField,
field_type: Type,
) -> crate::Result<Vec<(SegmentOrdinal, StrBytesSortFieldAccessor)>> {
let bytes_columns = self
.readers
.iter()
.map(|reader| Self::get_str_bytes_column(reader, sort_by_field, field_type))
.collect::<crate::Result<Vec<_>>>()?;
let merged_term_ord_mappings = compute_merged_term_ord_mapping(&bytes_columns)?;
debug_assert_eq!(bytes_columns.len(), merged_term_ord_mappings.len());
let accessors = bytes_columns
.into_iter()
.zip(merged_term_ord_mappings)
.enumerate()
.map(
|(reader_ordinal, (bytes_column, merged_term_ord_mapping))| {
(
reader_ordinal as SegmentOrdinal,
StrBytesSortFieldAccessor {
ords: bytes_column.ords().clone(),
merged_term_ord_mapping,
},
)
},
)
.collect::<Vec<_>>();
Ok(accessors)
}
/// Returns the full `Column<u64>` so callers can use `Column::first()` which
/// returns `Option<u64>` — `None` for NULLs, `Some` for real values. This
/// distinction is required for correct NULL ordering during merge sort and
/// for detecting live NULLs in the disjunct-stack check.
fn get_numeric_accessor(
reader: &SegmentReader,
sort_by_field: &IndexSortByField,
) -> crate::Result<Column<u64>> {
reader.schema().get_field(&sort_by_field.field)?;
let (value_accessor, _column_type) = reader
.fast_fields()
.u64_lenient(&sort_by_field.field)?
.ok_or_else(|| FastFieldNotAvailableError {
field_name: sort_by_field.field.to_string(),
})?;
Ok(value_accessor)
}
fn get_numeric_accessors(
&self,
sort_by_field: &IndexSortByField,
) -> crate::Result<Vec<(SegmentOrdinal, Column<u64>)>> {
self.readers
.iter()
.enumerate()
.map(|(reader_ordinal, reader)| {
let reader_ordinal = reader_ordinal as SegmentOrdinal;
let accessor = Self::get_numeric_accessor(reader, sort_by_field)?;
Ok((reader_ordinal, accessor))
})
.collect::<crate::Result<Vec<_>>>()
}
/// Builds owned per-segment sort accessors so they stay alive during merge.
///
/// Dispatches on the sort field's value type: numeric types use direct column value access,
/// while Str/Bytes types go through the ordinal-remapping path (see
/// [`StrBytesSortFieldAccessor`]).
fn get_reader_with_sort_field_accessor(
&self,
sort_by_field: &IndexSortByField,
) -> crate::Result<ReaderSortFieldAccessors> {
let field_type = self.sort_by_field_type(sort_by_field)?;
if matches!(field_type, Type::Str | Type::Bytes) {
let accessors = self.get_str_bytes_accessors(sort_by_field, field_type)?;
return Ok(ReaderSortFieldAccessors::StrBytes(accessors));
}
let accessors = self.get_numeric_accessors(sort_by_field)?;
Ok(ReaderSortFieldAccessors::Numeric(accessors))
}
fn extend_sorted_doc_ids<T, F>(
&self,
reader_ordinal_and_field_accessors: &[(SegmentOrdinal, T)],
mut is_less: F,
sorted_doc_ids: &mut Vec<DocAddress>,
) where
F: FnMut(&(DocId, &SegmentOrdinal, &T), &(DocId, &SegmentOrdinal, &T)) -> bool,
{
let doc_id_reader_pair =
reader_ordinal_and_field_accessors
.iter()
.map(|(reader_ord, ff_reader)| {
let reader = &self.readers[*reader_ord as usize];
reader
.doc_ids_alive()
.map(move |doc_id| (doc_id, reader_ord, ff_reader))
});
sorted_doc_ids.extend(
doc_id_reader_pair
.into_iter()
.kmerge_by(|a, b| is_less(a, b))
.map(|(doc_id, &segment_ord, _)| DocAddress {
doc_id,
segment_ord,
}),
);
}
/// Generates the doc_id mapping where position in the vec=new
/// doc_id.
/// ReaderWithOrdinal will include the ordinal position of the
/// reader in self.readers.
pub(crate) fn generate_doc_id_mapping_with_sort_by_field(
&self,
sort_by_field: &IndexSortByField,
) -> crate::Result<SegmentDocIdMapping> {
let sort_field_accessors = self.get_reader_with_sort_field_accessor(sort_by_field)?;
// Loading the field accessor on demand causes a 15x regression
let total_num_new_docs = self.total_num_new_docs();
let mut sorted_doc_ids: Vec<DocAddress> = Vec::with_capacity(total_num_new_docs);
// K-way merge of alive doc ids across segments, ordered by the sort field.
//
// Numeric: compare raw u64 column values directly.
// Str/Bytes: compare merged global ordinals obtained via `remapped_term_ord`.
// Documents without a value map to `None` — first in ascending, last in descending.
let asc = sort_by_field.order == Order::Asc;
match sort_field_accessors {
ReaderSortFieldAccessors::Numeric(reader_ordinal_and_field_accessors) => {
self.extend_sorted_doc_ids(
&reader_ordinal_and_field_accessors,
|a, b| {
// Column::first() returns Option<u64>: None for NULLs, Some for values.
// Option's Ord puts None < Some, giving NULL-first in ASC, NULL-last in
// DESC.
let val1 = a.2.first(a.0);
let val2 = b.2.first(b.0);
if asc {
val1 < val2
} else {
val1 > val2
}
},
&mut sorted_doc_ids,
);
}
ReaderSortFieldAccessors::StrBytes(reader_ordinal_and_field_accessors) => {
self.extend_sorted_doc_ids(
&reader_ordinal_and_field_accessors,
|a, b| {
let val1 = a.2.remapped_term_ord(a.0);
let val2 = b.2.remapped_term_ord(b.0);
if asc {
val1 < val2
} else {
val1 > val2
}
},
&mut sorted_doc_ids,
);
}
}
let alive_bitsets = self.collect_alive_bitsets();
Ok(SegmentDocIdMapping::new(
sorted_doc_ids,
MappingType::Shuffled,
alive_bitsets,
))
}
/// Creates a mapping if the segments are stacked. this is helpful to merge codelines between
/// index sorting and the others
pub(crate) fn get_doc_id_from_concatenated_data(&self) -> crate::Result<SegmentDocIdMapping> {
let total_num_new_docs = self
.readers
.iter()
.map(|reader| reader.num_docs() as usize)
.sum();
let total_num_new_docs = self.total_num_new_docs();
let mut mapping: Vec<DocAddress> = Vec::with_capacity(total_num_new_docs);
@@ -262,20 +629,13 @@ impl IndexMerger {
}),
);
let has_deletes: bool = self.readers.iter().any(SegmentReader::has_deletes);
let has_deletes = self.readers.iter().any(SegmentReader::has_deletes);
let mapping_type = if has_deletes {
MappingType::StackedWithDeletes
} else {
MappingType::Stacked
};
let alive_bitsets: Vec<Option<ReadOnlyBitSet>> = self
.readers
.iter()
.map(|reader| {
let alive_bitset = reader.alive_bitset()?;
Some(alive_bitset.bitset().clone())
})
.collect();
let alive_bitsets = self.collect_alive_bitsets();
Ok(SegmentDocIdMapping::new(
mapping,
mapping_type,
@@ -356,6 +716,7 @@ impl IndexMerger {
);
let mut segment_postings_containing_the_term: Vec<(usize, SegmentPostings)> = vec![];
let mut doc_id_and_positions = vec![];
while merged_terms.advance() {
segment_postings_containing_the_term.clear();
@@ -451,13 +812,37 @@ impl IndexMerger {
0u32
};
let delta_positions = delta_computer.compute_delta(&positions_buffer);
field_serializer.write_doc(remapped_doc_id, term_freq, delta_positions);
// if doc_id_mapping exists, the doc_ids are reordered, they are
// not just stacked. The field serializer expects monotonically increasing
// doc_ids, so we collect and sort them first, before writing.
//
// I think this is not strictly necessary, it would be possible to
// avoid the loading into a vec via some form of kmerge, but then the merge
// logic would deviate much more from the stacking case (unsorted index)
if !doc_id_mapping.is_trivial() {
doc_id_and_positions.push((
remapped_doc_id,
term_freq,
positions_buffer.to_vec(),
));
} else {
let delta_positions = delta_computer.compute_delta(&positions_buffer);
field_serializer.write_doc(remapped_doc_id, term_freq, delta_positions);
}
}
doc = segment_postings.advance();
}
}
if !doc_id_mapping.is_trivial() {
doc_id_and_positions.sort_unstable_by_key(|&(doc_id, _, _)| doc_id);
for (doc_id, term_freq, positions) in &doc_id_and_positions {
let delta_positions = delta_computer.compute_delta(positions);
field_serializer.write_doc(*doc_id, *term_freq, delta_positions);
}
doc_id_and_positions.clear();
}
// closing the term.
field_serializer.close_term()?;
}
@@ -486,13 +871,47 @@ impl IndexMerger {
Ok(())
}
fn write_storable_fields(&self, store_writer: &mut StoreWriter) -> crate::Result<()> {
fn write_storable_fields(
&self,
store_writer: &mut StoreWriter,
doc_id_mapping: &SegmentDocIdMapping,
) -> crate::Result<()> {
debug_time!("write-storable-fields");
debug!("write-storable-field");
for reader in &self.readers {
let store_reader = reader.get_store_reader(1)?;
if reader.has_deletes()
if !doc_id_mapping.is_trivial() {
debug!("non-trivial-doc-id-mapping");
let store_readers: Vec<_> = self
.readers
.iter()
.map(|reader| reader.get_store_reader(50))
.collect::<Result<_, _>>()?;
let mut document_iterators: Vec<_> = store_readers
.iter()
.enumerate()
.map(|(i, store)| store.iter_raw(self.readers[i].alive_bitset()))
.collect();
for old_doc_addr in doc_id_mapping.iter_old_doc_addrs() {
let doc_bytes_it = &mut document_iterators[old_doc_addr.segment_ord as usize];
if let Some(doc_bytes_res) = doc_bytes_it.next() {
let doc_bytes = doc_bytes_res?;
store_writer.store_bytes(&doc_bytes)?;
} else {
return Err(DataCorruption::comment_only(format!(
"unexpected missing document in docstore on merge, doc address \
{old_doc_addr:?}",
))
.into());
}
}
} else {
debug!("trivial-doc-id-mapping");
for reader in &self.readers {
let store_reader = reader.get_store_reader(1)?;
if reader.has_deletes()
// If there is not enough data in the store, we avoid stacking in order to
// avoid creating many small blocks in the doc store. Once we have 5 full blocks,
// we start stacking. In the worst case 2/7 of the blocks would be very small.
@@ -508,13 +927,14 @@ impl IndexMerger {
// take 7 in order to not walk over all checkpoints.
|| store_reader.block_checkpoints().take(7).count() < 6
|| store_reader.decompressor() != store_writer.compressor().into()
{
for doc_bytes_res in store_reader.iter_raw(reader.alive_bitset()) {
let doc_bytes = doc_bytes_res?;
store_writer.store_bytes(&doc_bytes)?;
{
for doc_bytes_res in store_reader.iter_raw(reader.alive_bitset()) {
let doc_bytes = doc_bytes_res?;
store_writer.store_bytes(&doc_bytes)?;
}
} else {
store_writer.stack(store_reader)?;
}
} else {
store_writer.stack(store_reader)?;
}
}
Ok(())
@@ -525,8 +945,42 @@ impl IndexMerger {
///
/// # Returns
/// The number of documents in the resulting segment.
pub fn write(&self, mut serializer: SegmentSerializer) -> crate::Result<u32> {
let doc_id_mapping = self.get_doc_id_from_concatenated_data()?;
pub fn write(&self, serializer: SegmentSerializer) -> crate::Result<u32> {
let doc_id_mapping = if let Some(sort_by_field) = self.index_settings.sort_by_field.as_ref()
{
if self.is_disjunct_and_sorted_on_sort_property(sort_by_field)? {
self.get_doc_id_from_concatenated_data()?
} else {
self.generate_doc_id_mapping_with_sort_by_field(sort_by_field)?
}
} else {
self.get_doc_id_from_concatenated_data()?
};
self.write_with_mapping(serializer, doc_id_mapping)
}
/// Like [`write`], but uses the caller-supplied `doc_id_mapping` instead of
/// deriving one from an index sort field.
///
/// The mapping must cover *all* live documents across every segment passed to
/// [`IndexMerger::open_with_custom_alive_set`]. The simplest way to build one
/// is [`SegmentDocIdMapping::new_shuffled`].
///
/// # Returns
/// The number of documents in the resulting segment.
pub fn write_with_doc_id_mapping(
&self,
serializer: SegmentSerializer,
doc_id_mapping: SegmentDocIdMapping,
) -> crate::Result<u32> {
self.write_with_mapping(serializer, doc_id_mapping)
}
fn write_with_mapping(
&self,
mut serializer: SegmentSerializer,
doc_id_mapping: SegmentDocIdMapping,
) -> crate::Result<u32> {
debug!("write-fieldnorms");
if let Some(fieldnorms_serializer) = serializer.extract_fieldnorms_serializer() {
self.write_fieldnorms(fieldnorms_serializer, &doc_id_mapping)?;
@@ -543,7 +997,7 @@ impl IndexMerger {
)?;
debug!("write-storagefields");
self.write_storable_fields(serializer.get_store_writer())?;
self.write_storable_fields(serializer.get_store_writer(), &doc_id_mapping)?;
debug!("write-fastfields");
self.write_fast_fields(serializer.get_fast_field_write(), doc_id_mapping)?;
@@ -555,7 +1009,6 @@ impl IndexMerger {
#[cfg(test)]
mod tests {
use columnar::Column;
use proptest::prop_oneof;
use proptest::strategy::Strategy;
@@ -575,7 +1028,7 @@ mod tests {
use crate::time::OffsetDateTime;
use crate::{
assert_nearly_equals, schema, DateTime, DocAddress, DocId, DocSet, IndexSettings,
IndexWriter, Searcher,
IndexSortByField, IndexWriter, Order, Searcher,
};
#[test]
@@ -1048,6 +1501,60 @@ mod tests {
test_merge_facets(None, true)
}
#[test]
fn test_merge_facets_sort_asc() {
// In the merge case this will go through the doc_id mapping code
test_merge_facets(
Some(IndexSettings {
sort_by_field: Some(IndexSortByField {
field: "intval".to_string(),
order: Order::Desc,
}),
..Default::default()
}),
true,
);
// In the merge case this will not go through the doc_id mapping code, because the data
// sorted and disjunct
test_merge_facets(
Some(IndexSettings {
sort_by_field: Some(IndexSortByField {
field: "intval".to_string(),
order: Order::Desc,
}),
..Default::default()
}),
false,
);
}
#[test]
fn test_merge_facets_sort_desc() {
// In the merge case this will go through the doc_id mapping code
test_merge_facets(
Some(IndexSettings {
sort_by_field: Some(IndexSortByField {
field: "intval".to_string(),
order: Order::Desc,
}),
..Default::default()
}),
true,
);
// In the merge case this will not go through the doc_id mapping code, because the data
// sorted and disjunct
test_merge_facets(
Some(IndexSettings {
sort_by_field: Some(IndexSortByField {
field: "intval".to_string(),
order: Order::Desc,
}),
..Default::default()
}),
false,
);
}
// force_segment_value_overlap forces the int value for sorting to have overlapping min and max
// ranges between segments so that merge algorithm can't apply certain optimizations
fn test_merge_facets(index_settings: Option<IndexSettings>, force_segment_value_overlap: bool) {

File diff suppressed because it is too large Load Diff

View File

@@ -8,17 +8,18 @@
pub(crate) mod delete_queue;
pub(crate) mod path_to_unordered_id;
pub(crate) mod doc_id_mapping;
pub mod doc_id_mapping;
mod doc_opstamp_mapping;
mod flat_map_with_buffer;
pub(crate) mod index_writer;
pub(crate) mod index_writer_status;
pub(crate) mod indexing_term;
mod log_merge_policy;
mod merge_index_test;
mod merge_operation;
pub(crate) mod merge_policy;
pub(crate) mod merger;
/// Segment merger: combines multiple segments into one.
pub mod merger;
mod merger_sorted_index_test;
pub(crate) mod operation;
pub(crate) mod prepared_commit;
mod segment_entry;
@@ -33,15 +34,19 @@ mod stamper;
use crossbeam_channel as channel;
use smallvec::SmallVec;
pub use self::doc_id_mapping::SegmentDocIdMapping;
pub use self::index_writer::{advance_deletes, IndexWriter, IndexWriterOptions};
pub use self::log_merge_policy::LogMergePolicy;
pub use self::merge_operation::MergeOperation;
pub use self::merge_policy::{MergeCandidate, MergePolicy, NoMergePolicy};
pub use self::merger::IndexMerger;
pub use self::operation::{AddOperation, DeleteOperation, UserOperation};
pub use self::prepared_commit::PreparedCommit;
pub use self::segment_entry::SegmentEntry;
pub(crate) use self::segment_serializer::SegmentSerializer;
pub use self::segment_updater::{merge_filtered_segments, merge_indices};
pub use self::segment_updater::{
merge_filtered_segments, merge_indices, merge_segments_with_doc_id_mapping,
};
pub use self::segment_writer::SegmentWriter;
pub use self::single_segment_index_writer::SingleSegmentIndexWriter;

View File

@@ -18,9 +18,27 @@ pub struct SegmentSerializer {
impl SegmentSerializer {
/// Creates a new `SegmentSerializer`.
pub fn for_segment(mut segment: Segment) -> crate::Result<SegmentSerializer> {
pub fn for_segment(
mut segment: Segment,
is_in_merge: bool,
) -> crate::Result<SegmentSerializer> {
// If the segment is going to be sorted, we stream the docs first to a temporary file.
// In the merge case this is not necessary because we can kmerge the already sorted
// segments
let remapping_required = segment.index().settings().sort_by_field.is_some() && !is_in_merge;
let settings = segment.index().settings().clone();
let store_writer = {
let store_writer = if remapping_required {
let store_write = segment.open_write(SegmentComponent::TempStore)?;
StoreWriter::new(
store_write,
crate::store::Compressor::None,
// We want fast random access on the docs, so we choose a small block size.
// If this is zero, the skip index will contain too many checkpoints and
// therefore will be relatively slow.
16000,
settings.docstore_compress_dedicated_thread,
)?
} else {
let store_write = segment.open_write(SegmentComponent::Store)?;
StoreWriter::new(
store_write,
@@ -54,6 +72,10 @@ impl SegmentSerializer {
&self.segment
}
pub fn segment_mut(&mut self) -> &mut Segment {
&mut self.segment
}
/// Accessor to the `PostingsSerializer`.
pub fn get_postings_serializer(&mut self) -> &mut InvertedIndexSerializer {
&mut self.postings_serializer

View File

@@ -15,6 +15,7 @@ use crate::directory::{Directory, DirectoryClone, GarbageCollectionResult};
use crate::fastfield::AliveBitSet;
use crate::index::{Index, IndexMeta, IndexSettings, Segment, SegmentId, SegmentMeta};
use crate::indexer::delete_queue::DeleteCursor;
use crate::indexer::doc_id_mapping::SegmentDocIdMapping;
use crate::indexer::index_writer::advance_deletes;
use crate::indexer::merge_operation::MergeOperationInventory;
use crate::indexer::merger::IndexMerger;
@@ -114,10 +115,11 @@ fn merge(
.collect();
// An IndexMerger is like a "view" of our merged segments.
let merger: IndexMerger = IndexMerger::open(index.schema(), &segments[..])?;
let merger: IndexMerger =
IndexMerger::open(index.schema(), index.settings().clone(), &segments[..])?;
// ... we just serialize this index merger in our new segment to merge the segments.
let segment_serializer = SegmentSerializer::for_segment(merged_segment.clone())?;
let segment_serializer = SegmentSerializer::for_segment(merged_segment.clone(), true)?;
let num_docs = merger.write(segment_serializer)?;
@@ -218,9 +220,13 @@ pub fn merge_filtered_segments<T: Into<Box<dyn Directory>>>(
)?;
let merged_segment = merged_index.new_segment();
let merged_segment_id = merged_segment.id();
let merger: IndexMerger =
IndexMerger::open_with_custom_alive_set(merged_index.schema(), segments, filter_doc_ids)?;
let segment_serializer = SegmentSerializer::for_segment(merged_segment)?;
let merger: IndexMerger = IndexMerger::open_with_custom_alive_set(
merged_index.schema(),
merged_index.settings().clone(),
segments,
filter_doc_ids,
)?;
let segment_serializer = SegmentSerializer::for_segment(merged_segment, true)?;
let num_docs = merger.write(segment_serializer)?;
let segment_meta = merged_index.new_segment_meta(merged_segment_id, num_docs);
@@ -250,6 +256,82 @@ pub fn merge_filtered_segments<T: Into<Box<dyn Directory>>>(
Ok(merged_index)
}
/// Like [`merge_filtered_segments`], but uses a caller-supplied [`SegmentDocIdMapping`]
/// to control the final document order. The mapping should be built from the same
/// segments (in the same order) passed here.
///
/// Use this to apply an external reordering during a merge without relying on a persistent fast field.
///
/// # Warning
/// Same caveats as [`merge_filtered_segments`]: no live `IndexWriter` allowed.
#[doc(hidden)]
pub fn merge_segments_with_doc_id_mapping<T: Into<Box<dyn Directory>>>(
segments: &[Segment],
target_settings: IndexSettings,
filter_doc_ids: Vec<Option<AliveBitSet>>,
doc_id_mapping: SegmentDocIdMapping,
output_directory: T,
) -> crate::Result<Index> {
if segments.is_empty() {
return Err(crate::TantivyError::InvalidArgument(
"No segments given to merge".to_string(),
));
}
let target_schema = segments[0].schema();
if segments
.iter()
.skip(1)
.any(|seg| seg.schema() != target_schema)
{
return Err(crate::TantivyError::InvalidArgument(
"Attempt to merge different schema indices".to_string(),
));
}
let mut merged_index = Index::create(
output_directory,
target_schema.clone(),
target_settings.clone(),
)?;
let merged_segment = merged_index.new_segment();
let merged_segment_id = merged_segment.id();
let merger: IndexMerger = IndexMerger::open_with_custom_alive_set(
merged_index.schema(),
merged_index.settings().clone(),
segments,
filter_doc_ids,
)?;
let segment_serializer = SegmentSerializer::for_segment(merged_segment, true)?;
let num_docs = merger.write_with_doc_id_mapping(segment_serializer, doc_id_mapping)?;
let segment_meta = merged_index.new_segment_meta(merged_segment_id, num_docs);
let stats = format!(
"Segments Merge (external reordering): [{}]",
segments
.iter()
.fold(String::new(), |sum, current| format!(
"{sum}{} ",
current.meta().id().uuid_string()
))
.trim_end()
);
let index_meta = IndexMeta {
index_settings: target_settings,
segments: vec![segment_meta],
schema: target_schema,
opstamp: 0u64,
payload: Some(stats),
};
save_metas(&index_meta, merged_index.directory_mut())?;
Ok(merged_index)
}
pub(crate) struct InnerSegmentUpdater {
// we keep a copy of the current active IndexMeta to
// avoid loading the file every time we need it in the
@@ -1115,6 +1197,7 @@ mod tests {
)?;
let merger: IndexMerger = IndexMerger::open_with_custom_alive_set(
merged_index.schema(),
merged_index.settings().clone(),
&segments[..],
filter_segments,
)?;
@@ -1130,6 +1213,7 @@ mod tests {
Index::create(RamDirectory::default(), target_schema, target_settings)?;
let merger: IndexMerger = IndexMerger::open_with_custom_alive_set(
merged_index.schema(),
merged_index.settings().clone(),
&segments[..],
filter_segments,
)?;

View File

@@ -3,6 +3,7 @@ use common::JsonPathWriter;
use itertools::Itertools;
use tokenizer_api::BoxTokenStream;
use super::doc_id_mapping::{get_doc_id_mapping_from_field, DocIdMapping};
use super::operation::AddOperation;
use crate::fastfield::FastFieldsWriter;
use crate::fieldnorm::{FieldNormReaders, FieldNormsWriter};
@@ -16,6 +17,7 @@ use crate::postings::{
};
use crate::schema::document::{Document, Value};
use crate::schema::{FieldEntry, FieldType, Schema, DATE_TIME_PRECISION_INDEXED};
use crate::store::{StoreReader, StoreWriter};
use crate::tokenizer::{FacetTokenizer, PreTokenizedStream, TextAnalyzer, Tokenizer};
use crate::{DocId, Opstamp, TantivyError};
@@ -40,6 +42,20 @@ fn compute_initial_table_size(per_thread_memory_budget: usize) -> crate::Result<
})
}
fn remap_doc_opstamps(
opstamps: Vec<Opstamp>,
doc_id_mapping_opt: Option<&DocIdMapping>,
) -> Vec<Opstamp> {
if let Some(doc_id_mapping_opt) = doc_id_mapping_opt {
doc_id_mapping_opt
.iter_old_doc_ids()
.map(|doc| opstamps[doc as usize])
.collect()
} else {
opstamps
}
}
/// A `SegmentWriter` is in charge of creating segment index from a
/// set of documents.
///
@@ -75,7 +91,7 @@ impl SegmentWriter {
let tokenizer_manager = segment.index().tokenizers().clone();
let tokenizer_manager_fast_field = segment.index().fast_field_tokenizer().clone();
let table_size = compute_initial_table_size(memory_budget_in_bytes)?;
let segment_serializer = SegmentSerializer::for_segment(segment)?;
let segment_serializer = SegmentSerializer::for_segment(segment, false)?;
let per_field_postings_writers = PerFieldPostingsWriter::for_schema(&schema);
let per_field_text_analyzers = schema
.fields()
@@ -124,6 +140,15 @@ impl SegmentWriter {
/// be used afterwards.
pub fn finalize(mut self) -> crate::Result<Vec<u64>> {
self.fieldnorms_writer.fill_up_to_max_doc(self.max_doc);
let mapping: Option<DocIdMapping> = self
.segment_serializer
.segment()
.index()
.settings()
.sort_by_field
.clone()
.map(|sort_by_field| get_doc_id_mapping_from_field(sort_by_field, &self))
.transpose()?;
remap_and_write(
self.schema,
&self.per_field_postings_writers,
@@ -131,8 +156,10 @@ impl SegmentWriter {
self.fast_field_writers,
&self.fieldnorms_writer,
self.segment_serializer,
mapping.as_ref(),
)?;
Ok(self.doc_opstamps)
let doc_opstamps = remap_doc_opstamps(self.doc_opstamps, mapping.as_ref());
Ok(doc_opstamps)
}
/// Returns an estimation of the current memory usage of the segment writer.
@@ -393,10 +420,11 @@ fn remap_and_write(
fast_field_writers: FastFieldsWriter,
fieldnorms_writer: &FieldNormsWriter,
mut serializer: SegmentSerializer,
doc_id_map: Option<&DocIdMapping>,
) -> crate::Result<()> {
debug!("remap-and-write");
if let Some(fieldnorms_serializer) = serializer.extract_fieldnorms_serializer() {
fieldnorms_writer.serialize(fieldnorms_serializer)?;
fieldnorms_writer.serialize(fieldnorms_serializer, doc_id_map)?;
}
let fieldnorm_data = serializer
.segment()
@@ -407,10 +435,39 @@ fn remap_and_write(
schema,
per_field_postings_writers,
fieldnorm_readers,
doc_id_map,
serializer.get_postings_serializer(),
)?;
debug!("fastfield-serialize");
fast_field_writers.serialize(serializer.get_fast_field_write())?;
fast_field_writers.serialize(serializer.get_fast_field_write(), doc_id_map)?;
// finalize temp docstore and create version, which reflects the doc_id_map
if let Some(doc_id_map) = doc_id_map {
debug!("resort-docstore");
let store_write = serializer
.segment_mut()
.open_write(SegmentComponent::Store)?;
let settings = serializer.segment().index().settings();
let store_writer = StoreWriter::new(
store_write,
settings.docstore_compression,
settings.docstore_blocksize,
settings.docstore_compress_dedicated_thread,
)?;
let old_store_writer = std::mem::replace(&mut serializer.store_writer, store_writer);
old_store_writer.close()?;
let store_read = StoreReader::open(
serializer
.segment()
.open_read(SegmentComponent::TempStore)?,
1, /* The docstore is configured to have one doc per block, and each doc is
* accessed only once: we don't need caching. */
)?;
for old_doc_id in doc_id_map.iter_old_doc_ids() {
let doc_bytes = store_read.get_document_bytes(old_doc_id)?;
serializer.get_store_writer().store_bytes(&doc_bytes)?;
}
}
debug!("serializer-close");
serializer.close()?;

View File

@@ -226,10 +226,13 @@ pub use self::docset::{DocSet, COLLECT_BLOCK_BUFFER_LEN, TERMINATED};
pub use crate::core::{json_utils, Executor, Searcher, SearcherGeneration};
pub use crate::directory::Directory;
pub use crate::index::{
Index, IndexBuilder, IndexMeta, IndexSettings, InvertedIndexReader, Order, Segment,
SegmentMeta, SegmentReader,
Index, IndexBuilder, IndexMeta, IndexSettings, IndexSortByField, InvertedIndexReader, Order,
Segment, SegmentMeta, SegmentReader,
};
pub use crate::indexer::{
IndexMerger, IndexWriter, SegmentDocIdMapping, SingleSegmentIndexWriter,
merge_segments_with_doc_id_mapping,
};
pub use crate::indexer::{IndexWriter, SingleSegmentIndexWriter};
pub use crate::schema::{Document, TantivyDocument, Term};
/// Index format version.

View File

@@ -3,6 +3,7 @@ use std::io;
use common::json_path_writer::JSON_END_OF_PATH;
use stacker::Addr;
use crate::indexer::doc_id_mapping::DocIdMapping;
use crate::indexer::indexing_term::IndexingTerm;
use crate::indexer::path_to_unordered_id::OrderedPathId;
use crate::postings::postings_writer::SpecializedPostingsWriter;
@@ -62,6 +63,7 @@ impl<Rec: Recorder> PostingsWriter for JsonPostingsWriter<Rec> {
&self,
ordered_term_addrs: &[(Field, OrderedPathId, &[u8], Addr)],
ordered_id_to_path: &[&str],
doc_id_map: Option<&DocIdMapping>,
ctx: &IndexingContext,
serializer: &mut FieldSerializer,
) -> io::Result<()> {
@@ -84,6 +86,7 @@ impl<Rec: Recorder> PostingsWriter for JsonPostingsWriter<Rec> {
SpecializedPostingsWriter::<Rec>::serialize_one_term(
term_buffer.as_bytes(),
*addr,
doc_id_map,
&mut buffer_lender,
ctx,
serializer,
@@ -92,6 +95,7 @@ impl<Rec: Recorder> PostingsWriter for JsonPostingsWriter<Rec> {
SpecializedPostingsWriter::<DocIdRecorder>::serialize_one_term(
term_buffer.as_bytes(),
*addr,
doc_id_map,
&mut buffer_lender,
ctx,
serializer,

View File

@@ -5,6 +5,7 @@ use std::ops::Range;
use stacker::Addr;
use crate::fieldnorm::FieldNormReaders;
use crate::indexer::doc_id_mapping::DocIdMapping;
use crate::indexer::indexing_term::IndexingTerm;
use crate::indexer::path_to_unordered_id::OrderedPathId;
use crate::postings::recorder::{BufferLender, Recorder};
@@ -50,6 +51,7 @@ pub(crate) fn serialize_postings(
schema: Schema,
per_field_postings_writers: &PerFieldPostingsWriter,
fieldnorm_readers: FieldNormReaders,
doc_id_map: Option<&DocIdMapping>,
serializer: &mut InvertedIndexSerializer,
) -> crate::Result<()> {
// Replace unordered ids by ordered ids to be able to sort
@@ -85,6 +87,7 @@ pub(crate) fn serialize_postings(
postings_writer.serialize(
&term_offsets[byte_offsets],
&ordered_id_to_path,
doc_id_map,
&ctx,
&mut field_serializer,
)?;
@@ -120,6 +123,7 @@ pub(crate) trait PostingsWriter: Send + Sync {
&self,
term_addrs: &[(Field, OrderedPathId, &[u8], Addr)],
ordered_id_to_path: &[&str],
doc_id_map: Option<&DocIdMapping>,
ctx: &IndexingContext,
serializer: &mut FieldSerializer,
) -> io::Result<()>;
@@ -184,6 +188,7 @@ impl<Rec: Recorder> SpecializedPostingsWriter<Rec> {
pub(crate) fn serialize_one_term(
term: &[u8],
addr: Addr,
doc_id_map: Option<&DocIdMapping>,
buffer_lender: &mut BufferLender,
ctx: &IndexingContext,
serializer: &mut FieldSerializer,
@@ -191,7 +196,7 @@ impl<Rec: Recorder> SpecializedPostingsWriter<Rec> {
let recorder: Rec = ctx.term_index.read(addr);
let term_doc_freq = recorder.term_doc_freq().unwrap_or(0u32);
serializer.new_term(term, term_doc_freq, recorder.has_term_freq())?;
recorder.serialize(&ctx.arena, serializer, buffer_lender);
recorder.serialize(&ctx.arena, doc_id_map, serializer, buffer_lender);
serializer.close_term()?;
Ok(())
}
@@ -231,12 +236,13 @@ impl<Rec: Recorder> PostingsWriter for SpecializedPostingsWriter<Rec> {
&self,
term_addrs: &[(Field, OrderedPathId, &[u8], Addr)],
_ordered_id_to_path: &[&str],
doc_id_map: Option<&DocIdMapping>,
ctx: &IndexingContext,
serializer: &mut FieldSerializer,
) -> io::Result<()> {
let mut buffer_lender = BufferLender::default();
for (_field, _path_id, term, addr) in term_addrs {
Self::serialize_one_term(term, *addr, &mut buffer_lender, ctx, serializer)?;
Self::serialize_one_term(term, *addr, doc_id_map, &mut buffer_lender, ctx, serializer)?;
}
Ok(())
}

View File

@@ -1,6 +1,7 @@
use common::read_u32_vint;
use stacker::{ExpUnrolledLinkedList, MemoryArena};
use crate::indexer::doc_id_mapping::DocIdMapping;
use crate::postings::FieldSerializer;
use crate::DocId;
@@ -70,6 +71,7 @@ pub(crate) trait Recorder: Copy + Default + Send + Sync + 'static {
fn serialize(
&self,
arena: &MemoryArena,
doc_id_map: Option<&DocIdMapping>,
serializer: &mut FieldSerializer<'_>,
buffer_lender: &mut BufferLender,
);
@@ -113,15 +115,26 @@ impl Recorder for DocIdRecorder {
fn serialize(
&self,
arena: &MemoryArena,
doc_id_map: Option<&DocIdMapping>,
serializer: &mut FieldSerializer<'_>,
buffer_lender: &mut BufferLender,
) {
let buffer = buffer_lender.lend_u8();
let (buffer, doc_ids) = buffer_lender.lend_all();
// TODO avoid reading twice.
self.stack.read_to_end(arena, buffer);
let iter = get_sum_reader(VInt32Reader::new(&buffer[..]));
for doc_id in iter {
serializer.write_doc(doc_id, 0u32, &[][..]);
if let Some(doc_id_map) = doc_id_map {
let iter = get_sum_reader(VInt32Reader::new(&buffer[..]));
doc_ids.extend(iter.map(|old_doc_id| doc_id_map.get_new_doc_id(old_doc_id)));
doc_ids.sort_unstable();
for doc in doc_ids {
serializer.write_doc(*doc, 0u32, &[][..]);
}
} else {
let iter = get_sum_reader(VInt32Reader::new(&buffer[..]));
for doc_id in iter {
serializer.write_doc(doc_id, 0u32, &[][..]);
}
}
}
@@ -181,18 +194,35 @@ impl Recorder for TermFrequencyRecorder {
fn serialize(
&self,
arena: &MemoryArena,
doc_id_map: Option<&DocIdMapping>,
serializer: &mut FieldSerializer<'_>,
buffer_lender: &mut BufferLender,
) {
let buffer = buffer_lender.lend_u8();
self.stack.read_to_end(arena, buffer);
let mut u32_it = VInt32Reader::new(&buffer[..]);
let mut prev_doc = 0;
while let Some(delta_doc_id) = u32_it.next() {
let doc_id = prev_doc + delta_doc_id;
prev_doc = doc_id;
let term_freq = u32_it.next().unwrap_or(self.current_tf);
serializer.write_doc(doc_id, term_freq, &[][..]);
if let Some(doc_id_map) = doc_id_map {
let mut doc_id_and_tf = vec![];
let mut prev_doc = 0;
while let Some(delta_doc_id) = u32_it.next() {
let doc_id = prev_doc + delta_doc_id;
prev_doc = doc_id;
let term_freq = u32_it.next().unwrap_or(self.current_tf);
doc_id_and_tf.push((doc_id_map.get_new_doc_id(doc_id), term_freq));
}
doc_id_and_tf.sort_unstable_by_key(|&(doc_id, _)| doc_id);
for (doc_id, tf) in doc_id_and_tf {
serializer.write_doc(doc_id, tf, &[][..]);
}
} else {
let mut prev_doc = 0;
while let Some(delta_doc_id) = u32_it.next() {
let doc_id = prev_doc + delta_doc_id;
prev_doc = doc_id;
let term_freq = u32_it.next().unwrap_or(self.current_tf);
serializer.write_doc(doc_id, term_freq, &[][..]);
}
}
}
@@ -238,12 +268,14 @@ impl Recorder for TfAndPositionRecorder {
fn serialize(
&self,
arena: &MemoryArena,
doc_id_map: Option<&DocIdMapping>,
serializer: &mut FieldSerializer<'_>,
buffer_lender: &mut BufferLender,
) {
let (buffer_u8, buffer_positions) = buffer_lender.lend_all();
self.stack.read_to_end(arena, buffer_u8);
let mut u32_it = VInt32Reader::new(&buffer_u8[..]);
let mut doc_id_and_positions = vec![];
let mut prev_doc = 0;
while let Some(delta_doc_id) = u32_it.next() {
let doc_id = prev_doc + delta_doc_id;
@@ -262,7 +294,19 @@ impl Recorder for TfAndPositionRecorder {
}
}
}
serializer.write_doc(doc_id, buffer_positions.len() as u32, buffer_positions);
if let Some(doc_id_map) = doc_id_map {
// this simple variant to remap may consume to much memory
doc_id_and_positions
.push((doc_id_map.get_new_doc_id(doc_id), buffer_positions.to_vec()));
} else {
serializer.write_doc(doc_id, buffer_positions.len() as u32, buffer_positions);
}
}
if doc_id_map.is_some() {
doc_id_and_positions.sort_unstable_by_key(|&(doc_id, _)| doc_id);
for (doc_id, positions) in doc_id_and_positions {
serializer.write_doc(doc_id, positions.len() as u32, &positions);
}
}
}

View File

@@ -124,6 +124,7 @@ impl SegmentSpaceUsage {
FieldNorms => PerField(self.fieldnorms().clone()),
Terms => PerField(self.termdict().clone()),
SegmentComponent::Store => ComponentSpaceUsage::Store(self.store().clone()),
SegmentComponent::TempStore => ComponentSpaceUsage::Store(self.store().clone()),
Delete => Basic(self.deletes()),
}
}