mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-06-30 14:20:42 +00:00
Compare commits
2 Commits
mallets/so
...
trinity.po
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e7ad06f42f | ||
|
|
d7f69903fa |
@@ -81,6 +81,11 @@ fn bench_agg(mut group: InputGroup<Index>) {
|
||||
register!(group, composite_histogram);
|
||||
register!(group, composite_histogram_calendar);
|
||||
|
||||
// multi_terms aggregation benchmarks
|
||||
register!(group, multi_terms_status_with_zipf_1000);
|
||||
register!(group, multi_terms_zipf_1000_with_status);
|
||||
register!(group, multi_terms_status_with_zipf_1000_sub_agg);
|
||||
|
||||
register!(group, cardinality_agg);
|
||||
register!(group, cardinality_agg_high_card);
|
||||
register!(group, cardinality_agg_low_card);
|
||||
@@ -568,6 +573,58 @@ fn composite_histogram_calendar(index: &Index) {
|
||||
execute_agg(index, agg_req);
|
||||
}
|
||||
|
||||
/// multi_terms equivalent of terms_status_with_terms_zipf_1000_sub_agg:
|
||||
/// flat GroupBy(status, zipf_1000) vs nested terms(status) -> terms(zipf_1000)
|
||||
fn multi_terms_status_with_zipf_1000(index: &Index) {
|
||||
let agg_req = json!({
|
||||
"mt": {
|
||||
"multi_terms": {
|
||||
"terms": [
|
||||
{"field": "text_few_terms_status"},
|
||||
{"field": "text_1000_terms_zipf"}
|
||||
],
|
||||
"size": 10
|
||||
}
|
||||
}
|
||||
});
|
||||
execute_agg(index, agg_req);
|
||||
}
|
||||
|
||||
/// multi_terms equivalent of terms_zipf_1000_with_terms_status_sub_agg:
|
||||
/// flat GroupBy(zipf_1000, status) vs nested terms(zipf_1000) -> terms(status)
|
||||
fn multi_terms_zipf_1000_with_status(index: &Index) {
|
||||
let agg_req = json!({
|
||||
"mt": {
|
||||
"multi_terms": {
|
||||
"terms": [
|
||||
{"field": "text_1000_terms_zipf"},
|
||||
{"field": "text_few_terms_status"}
|
||||
],
|
||||
"size": 100
|
||||
}
|
||||
}
|
||||
});
|
||||
execute_agg(index, agg_req);
|
||||
}
|
||||
|
||||
/// multi_terms on the same field pair as the nested benchmarks, with an avg sub-aggregation
|
||||
fn multi_terms_status_with_zipf_1000_sub_agg(index: &Index) {
|
||||
let agg_req = json!({
|
||||
"mt": {
|
||||
"multi_terms": {
|
||||
"terms": [
|
||||
{"field": "text_few_terms_status"},
|
||||
{"field": "text_1000_terms_zipf"}
|
||||
]
|
||||
},
|
||||
"aggs": {
|
||||
"average_f64": { "avg": { "field": "score_f64" } }
|
||||
}
|
||||
}
|
||||
});
|
||||
execute_agg(index, agg_req);
|
||||
}
|
||||
|
||||
fn execute_agg(index: &Index, agg_req: serde_json::Value) {
|
||||
let agg_req: Aggregations = serde_json::from_value(agg_req).unwrap();
|
||||
let collector = get_collector(agg_req);
|
||||
|
||||
@@ -13,8 +13,9 @@ use crate::aggregation::bucket::{
|
||||
build_segment_filter_collector, build_segment_histogram_collector,
|
||||
build_segment_range_collector, CompositeAggReqData, CompositeAggregation,
|
||||
CompositeSourceAccessors, FilterAggReqData, HistogramAggReqData, HistogramBounds,
|
||||
IncludeExcludeParam, MissingTermAggReqData, RangeAggReqData, TermMissingAgg, TermsAggReqData,
|
||||
TermsAggregation, TermsAggregationInternal,
|
||||
IncludeExcludeParam, MissingTermAggReqData, MultiTermsAggReqData, MultiTermsAggregation,
|
||||
MultiTermsFieldAccessors, RangeAggReqData, SegmentMultiTermsCollector, TermMissingAgg,
|
||||
TermsAggReqData, TermsAggregation, TermsAggregationInternal,
|
||||
};
|
||||
use crate::aggregation::metric::{
|
||||
build_segment_stats_collector, AverageAggregation, CardinalityAggReqData,
|
||||
@@ -76,6 +77,10 @@ impl AggregationsSegmentCtx {
|
||||
self.per_request.composite_req_data.push(data);
|
||||
self.per_request.composite_req_data.len() - 1
|
||||
}
|
||||
pub(crate) fn push_multi_terms_req_data(&mut self, data: MultiTermsAggReqData) -> usize {
|
||||
self.per_request.multi_terms_req_data.push(data);
|
||||
self.per_request.multi_terms_req_data.len() - 1
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn get_term_req_data(&self, idx: usize) -> &TermsAggReqData {
|
||||
@@ -125,6 +130,8 @@ pub struct PerRequestAggSegCtx {
|
||||
pub missing_term_req_data: Vec<MissingTermAggReqData>,
|
||||
/// CompositeAggReqData contains the request data for a composite aggregation.
|
||||
pub composite_req_data: Vec<CompositeAggReqData>,
|
||||
/// MultiTermsAggReqData contains the request data for a multi_terms aggregation.
|
||||
pub multi_terms_req_data: Vec<MultiTermsAggReqData>,
|
||||
|
||||
/// Request tree used to build collectors.
|
||||
pub agg_tree: Vec<AggRefNode>,
|
||||
@@ -177,6 +184,11 @@ impl PerRequestAggSegCtx {
|
||||
.iter()
|
||||
.map(|t| t.get_memory_consumption())
|
||||
.sum::<usize>()
|
||||
+ self
|
||||
.multi_terms_req_data
|
||||
.iter()
|
||||
.map(|t| t.get_memory_consumption())
|
||||
.sum::<usize>()
|
||||
+ self.agg_tree.len() * std::mem::size_of::<AggRefNode>()
|
||||
}
|
||||
|
||||
@@ -194,6 +206,7 @@ impl PerRequestAggSegCtx {
|
||||
AggKind::Range => self.range_req_data[idx].name.as_str(),
|
||||
AggKind::Filter => self.filter_req_data[idx].name.as_str(),
|
||||
AggKind::Composite => self.composite_req_data[idx].name.as_str(),
|
||||
AggKind::MultiTerms => self.multi_terms_req_data[idx].name.as_str(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -347,6 +360,9 @@ pub(crate) fn build_segment_agg_collector(
|
||||
req, node,
|
||||
)?,
|
||||
)),
|
||||
AggKind::MultiTerms => Ok(Box::new(SegmentMultiTermsCollector::from_req_and_validate(
|
||||
req, node,
|
||||
)?)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -378,6 +394,7 @@ pub enum AggKind {
|
||||
Range,
|
||||
Filter,
|
||||
Composite,
|
||||
MultiTerms,
|
||||
}
|
||||
|
||||
impl AggKind {
|
||||
@@ -394,6 +411,7 @@ impl AggKind {
|
||||
AggKind::Range => "Range",
|
||||
AggKind::Filter => "Filter",
|
||||
AggKind::Composite => "Composite",
|
||||
AggKind::MultiTerms => "MultiTerms",
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -649,6 +667,14 @@ fn build_nodes(
|
||||
&req.sub_aggregation,
|
||||
composite_req,
|
||||
)?]),
|
||||
AggregationVariants::MultiTerms(multi_terms_req) => Ok(vec![build_multi_terms_node(
|
||||
agg_name,
|
||||
reader,
|
||||
segment_ordinal,
|
||||
data,
|
||||
&req.sub_aggregation,
|
||||
multi_terms_req,
|
||||
)?]),
|
||||
AggregationVariants::Filter(filter_req) => {
|
||||
// Build the query and evaluator upfront
|
||||
let schema = reader.schema();
|
||||
@@ -707,6 +733,111 @@ fn build_composite_node(
|
||||
})
|
||||
}
|
||||
|
||||
fn build_multi_terms_node(
|
||||
agg_name: &str,
|
||||
reader: &SegmentReader,
|
||||
segment_ordinal: SegmentOrdinal,
|
||||
data: &mut AggregationsSegmentCtx,
|
||||
sub_aggs: &Aggregations,
|
||||
req: &MultiTermsAggregation,
|
||||
) -> crate::Result<AggRefNode> {
|
||||
use crate::aggregation::bucket::KeyElem;
|
||||
|
||||
if req.terms.is_empty() {
|
||||
return Err(crate::TantivyError::InvalidArgument(
|
||||
"multi_terms aggregation requires at least one field".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
let mut fields = Vec::with_capacity(req.terms.len());
|
||||
|
||||
for field_def in &req.terms {
|
||||
let field_name = &field_def.field;
|
||||
let str_dict_column = reader.fast_fields().str(field_name)?;
|
||||
|
||||
// Collect all columns for this field (handles JSON multi-type fields).
|
||||
let columns = get_term_agg_accessors(reader, field_name, &field_def.missing)?;
|
||||
|
||||
// Precompute the missing KeyElem (or None -> drop combo).
|
||||
let missing_key_elem = if let Some(missing) = &field_def.missing {
|
||||
match missing {
|
||||
Key::Str(missing_str) => {
|
||||
match columns.iter().position(|(_, ct)| *ct == ColumnType::Str) {
|
||||
Some(idx) => {
|
||||
match str_dict_column
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.dictionary()
|
||||
.term_ord(missing_str.as_bytes())?
|
||||
{
|
||||
Some(ord) => Some(KeyElem::new(idx as u32, ord)),
|
||||
None => Some(KeyElem::synthetic_missing()),
|
||||
}
|
||||
}
|
||||
None => Some(KeyElem::synthetic_missing()),
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
// Non-string missing: find the column whose type best matches the
|
||||
// missing key. Prefer an exact-type match; fall back to any numeric
|
||||
// column so cross-type coercions (e.g. Key::F64 on an I64 column)
|
||||
// still work.
|
||||
let preferred_type = match missing {
|
||||
Key::F64(_) => ColumnType::F64,
|
||||
Key::I64(_) => ColumnType::I64,
|
||||
Key::U64(_) => ColumnType::U64,
|
||||
Key::Str(_) => unreachable!("handled by Key::Str arm"),
|
||||
};
|
||||
let idx = columns
|
||||
.iter()
|
||||
.position(|(_, ct)| *ct == preferred_type)
|
||||
.or_else(|| {
|
||||
columns
|
||||
.iter()
|
||||
.position(|(_, ct)| ct.numerical_type().is_some())
|
||||
});
|
||||
match idx {
|
||||
Some(idx) => {
|
||||
let (col, col_type) = &columns[idx];
|
||||
get_missing_val_as_u64_lenient(
|
||||
*col_type,
|
||||
col.max_value(),
|
||||
missing,
|
||||
field_name,
|
||||
)?
|
||||
.map(|sentinel| KeyElem::new(idx as u32, sentinel))
|
||||
}
|
||||
None => Some(KeyElem::synthetic_missing()),
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
fields.push(MultiTermsFieldAccessors {
|
||||
columns,
|
||||
str_dict_column,
|
||||
missing: field_def.missing.clone(),
|
||||
missing_key_elem,
|
||||
field: field_name.clone(),
|
||||
});
|
||||
}
|
||||
|
||||
let idx = data.push_multi_terms_req_data(MultiTermsAggReqData {
|
||||
name: agg_name.to_string(),
|
||||
req: req.clone(),
|
||||
fields,
|
||||
sub_aggregations: sub_aggs.clone(),
|
||||
});
|
||||
let children = build_children(sub_aggs, reader, segment_ordinal, data)?;
|
||||
Ok(AggRefNode {
|
||||
kind: AggKind::MultiTerms,
|
||||
idx_in_req_data: idx,
|
||||
children,
|
||||
})
|
||||
}
|
||||
|
||||
fn build_children(
|
||||
aggs: &Aggregations,
|
||||
reader: &SegmentReader,
|
||||
|
||||
@@ -33,7 +33,7 @@ use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::bucket::{
|
||||
CompositeAggregation, DateHistogramAggregationReq, FilterAggregation, HistogramAggregation,
|
||||
RangeAggregation, TermsAggregation,
|
||||
MultiTermsAggregation, RangeAggregation, TermsAggregation,
|
||||
};
|
||||
use super::metric::{
|
||||
AverageAggregation, CardinalityAggregationReq, CountAggregation, ExtendedStatsAggregation,
|
||||
@@ -202,6 +202,9 @@ pub enum AggregationVariants {
|
||||
/// Multi-dimensional, paginable bucket aggregation.
|
||||
#[serde(rename = "composite")]
|
||||
Composite(CompositeAggregation),
|
||||
/// Bucket aggregation over unique combinations of values across multiple term fields.
|
||||
#[serde(rename = "multi_terms")]
|
||||
MultiTerms(MultiTermsAggregation),
|
||||
|
||||
// Metric aggregation types
|
||||
/// Computes the average of the extracted values.
|
||||
@@ -253,6 +256,9 @@ impl AggregationVariants {
|
||||
.iter()
|
||||
.map(|source| source.field())
|
||||
.collect(),
|
||||
AggregationVariants::MultiTerms(mt) => {
|
||||
mt.terms.iter().map(|t| t.field.as_str()).collect()
|
||||
}
|
||||
AggregationVariants::Average(avg) => vec![avg.field_name()],
|
||||
AggregationVariants::Count(count) => vec![count.field_name()],
|
||||
AggregationVariants::Max(max) => vec![max.field_name()],
|
||||
@@ -293,6 +299,12 @@ impl AggregationVariants {
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
pub(crate) fn as_multi_terms(&self) -> Option<&MultiTermsAggregation> {
|
||||
match &self {
|
||||
AggregationVariants::MultiTerms(mt) => Some(mt),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
pub(crate) fn as_percentile(&self) -> Option<&PercentilesAggregationReq> {
|
||||
match &self {
|
||||
AggregationVariants::Percentiles(percentile_req) => Some(percentile_req),
|
||||
|
||||
@@ -152,12 +152,25 @@ pub enum BucketResult {
|
||||
///
|
||||
/// See [`TermsAggregation`](super::bucket::TermsAggregation)
|
||||
buckets: Vec<BucketEntry>,
|
||||
/// The number of documents that didn’t make it into to TOP N due to shard_size or size
|
||||
/// The number of documents that didn't make it into to TOP N due to shard_size or size
|
||||
sum_other_doc_count: u64,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
/// The upper bound error for the doc count of each term.
|
||||
doc_count_error_upper_bound: Option<u64>,
|
||||
},
|
||||
/// This is the multi_terms result -- placed AFTER Terms so that a zero-bucket result
|
||||
/// deserializes as Terms (the more common case). Non-empty MultiTerms still deserializes
|
||||
/// correctly because its array `key` fails Terms' scalar `key` check first. The only known
|
||||
/// ambiguity is an empty MultiTerms result decoding as Terms (deserialization only).
|
||||
MultiTerms {
|
||||
/// The buckets (one per unique combination of field values).
|
||||
buckets: Vec<MultiTermsBucketEntry>,
|
||||
/// The number of documents that didn't make it into the TOP N.
|
||||
sum_other_doc_count: u64,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
/// The upper bound error for the doc count of each term combination.
|
||||
doc_count_error_upper_bound: Option<u64>,
|
||||
},
|
||||
/// This is the filter result - a single bucket with sub-aggregations
|
||||
Filter(FilterBucketResult),
|
||||
/// This is the composite result
|
||||
@@ -179,6 +192,11 @@ impl BucketResult {
|
||||
BucketResult::Histogram { buckets } => {
|
||||
buckets.iter().map(|bucket| bucket.get_bucket_count()).sum()
|
||||
}
|
||||
BucketResult::MultiTerms {
|
||||
buckets,
|
||||
sum_other_doc_count: _,
|
||||
doc_count_error_upper_bound: _,
|
||||
} => buckets.iter().map(|bucket| bucket.get_bucket_count()).sum(),
|
||||
BucketResult::Terms {
|
||||
buckets,
|
||||
sum_other_doc_count: _,
|
||||
@@ -272,6 +290,35 @@ impl GetDocCount for BucketEntry {
|
||||
}
|
||||
}
|
||||
|
||||
/// Bucket entry for a [`multi_terms`](super::bucket::MultiTermsAggregation) aggregation.
|
||||
///
|
||||
/// The key is a vector of values (one per declared field), and `key_as_string` is the pipe-joined
|
||||
/// representation.
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
pub struct MultiTermsBucketEntry {
|
||||
/// Pipe-joined string representation of all key elements, e.g. `"rock|Product A"`.
|
||||
pub key_as_string: String,
|
||||
/// The composite key: one [`Key`] per field in declaration order.
|
||||
pub key: Vec<Key>,
|
||||
/// Number of documents in this bucket.
|
||||
pub doc_count: u64,
|
||||
/// Sub-aggregation results.
|
||||
#[serde(flatten)]
|
||||
pub sub_aggregation: AggregationResults,
|
||||
}
|
||||
|
||||
impl MultiTermsBucketEntry {
|
||||
pub(crate) fn get_bucket_count(&self) -> u64 {
|
||||
1 + self.sub_aggregation.get_bucket_count()
|
||||
}
|
||||
}
|
||||
|
||||
impl GetDocCount for MultiTermsBucketEntry {
|
||||
fn doc_count(&self) -> u64 {
|
||||
self.doc_count
|
||||
}
|
||||
}
|
||||
|
||||
/// This is the range entry for a bucket, which contains a key, count, and optionally
|
||||
/// sub-aggregations.
|
||||
///
|
||||
|
||||
@@ -25,6 +25,7 @@
|
||||
mod composite;
|
||||
mod filter;
|
||||
mod histogram;
|
||||
mod multi_terms;
|
||||
mod range;
|
||||
mod term_agg;
|
||||
mod term_missing_agg;
|
||||
@@ -35,6 +36,7 @@ use std::fmt;
|
||||
pub use composite::*;
|
||||
pub use filter::*;
|
||||
pub use histogram::*;
|
||||
pub use multi_terms::*;
|
||||
pub use range::*;
|
||||
use serde::{de, Deserialize, Deserializer, Serialize, Serializer};
|
||||
pub use term_agg::*;
|
||||
|
||||
1198
src/aggregation/bucket/multi_terms/mod.rs
Normal file
1198
src/aggregation/bucket/multi_terms/mod.rs
Normal file
File diff suppressed because it is too large
Load Diff
@@ -28,7 +28,7 @@ use super::{format_date, AggregationError, Key, SerializedKey};
|
||||
use crate::aggregation::agg_result::{
|
||||
AggregationResults, BucketEntries, BucketEntry, CompositeBucketEntry, FilterBucketResult,
|
||||
};
|
||||
use crate::aggregation::bucket::TermsAggregationInternal;
|
||||
use crate::aggregation::bucket::{IntermediateMultiTermsBucketResult, TermsAggregationInternal};
|
||||
use crate::aggregation::metric::CardinalityCollector;
|
||||
use crate::TantivyError;
|
||||
|
||||
@@ -82,7 +82,7 @@ impl From<IntermediateKey> for Key {
|
||||
}
|
||||
}
|
||||
IntermediateKey::F64(f) => Self::F64(f),
|
||||
IntermediateKey::Bool(f) => Self::U64(f as u64),
|
||||
IntermediateKey::Bool(f) => Self::Str(f.to_string()),
|
||||
IntermediateKey::U64(f) => Self::U64(f),
|
||||
IntermediateKey::I64(f) => Self::I64(f),
|
||||
}
|
||||
@@ -286,6 +286,11 @@ pub(crate) fn empty_from_req(req: &Aggregation) -> IntermediateAggregationResult
|
||||
buckets: IntermediateCompositeBucketResult::default(),
|
||||
})
|
||||
}
|
||||
MultiTerms(_) => {
|
||||
IntermediateAggregationResult::Bucket(IntermediateBucketResult::MultiTerms {
|
||||
buckets: Default::default(),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -499,6 +504,11 @@ pub enum IntermediateBucketResult {
|
||||
/// The composite buckets
|
||||
buckets: IntermediateCompositeBucketResult,
|
||||
},
|
||||
/// Multi-terms aggregation
|
||||
MultiTerms {
|
||||
/// The multi-terms buckets
|
||||
buckets: IntermediateMultiTermsBucketResult,
|
||||
},
|
||||
}
|
||||
|
||||
impl IntermediateBucketResult {
|
||||
@@ -601,6 +611,13 @@ impl IntermediateBucketResult {
|
||||
.expect("unexpected aggregation, expected composite aggregation");
|
||||
buckets.into_final_result(composite_req, req.sub_aggregation(), limits)
|
||||
}
|
||||
IntermediateBucketResult::MultiTerms { buckets } => {
|
||||
let multi_terms_req = req
|
||||
.agg
|
||||
.as_multi_terms()
|
||||
.expect("unexpected aggregation, expected multi_terms aggregation");
|
||||
buckets.into_final_result(multi_terms_req, req.sub_aggregation(), limits)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -677,6 +694,14 @@ impl IntermediateBucketResult {
|
||||
) => {
|
||||
composite_left.merge_fruits(composite_right)?;
|
||||
}
|
||||
(
|
||||
IntermediateBucketResult::MultiTerms { buckets: mt_left },
|
||||
IntermediateBucketResult::MultiTerms { buckets: mt_right },
|
||||
) => {
|
||||
merge_maps(&mut mt_left.entries, mt_right.entries)?;
|
||||
mt_left.sum_other_doc_count += mt_right.sum_other_doc_count;
|
||||
mt_left.doc_count_error_upper_bound += mt_right.doc_count_error_upper_bound;
|
||||
}
|
||||
(IntermediateBucketResult::Range(_), _) => {
|
||||
panic!("try merge on different types")
|
||||
}
|
||||
@@ -692,6 +717,9 @@ impl IntermediateBucketResult {
|
||||
(IntermediateBucketResult::Composite { .. }, _) => {
|
||||
panic!("try merge on different types")
|
||||
}
|
||||
(IntermediateBucketResult::MultiTerms { .. }, _) => {
|
||||
panic!("try merge on different types")
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -7,20 +7,16 @@ use super::SegmentWriter;
|
||||
use crate::schema::{Field, Schema};
|
||||
use crate::{DocAddress, DocId, IndexSortByField, TantivyError};
|
||||
|
||||
/// Describes how the document ID mapping was produced during a merge.
|
||||
#[derive(Copy, Clone, Eq, PartialEq)]
|
||||
pub enum MappingType {
|
||||
/// Segments are concatenated in order with no deletes; doc IDs are contiguous ranges.
|
||||
Stacked,
|
||||
/// Segments are concatenated in order but some documents have been deleted and are skipped.
|
||||
StackedWithDeletes,
|
||||
/// Documents have been reordered, for instance by a sort field or by caller-provided order.
|
||||
Shuffled,
|
||||
}
|
||||
|
||||
/// Struct to provide mapping from new doc_id to old doc_id and segment.
|
||||
#[derive(Clone)]
|
||||
pub struct SegmentDocIdMapping {
|
||||
pub(crate) struct SegmentDocIdMapping {
|
||||
pub(crate) new_doc_id_to_old_doc_addr: Vec<DocAddress>,
|
||||
pub(crate) alive_bitsets: Vec<Option<ReadOnlyBitSet>>,
|
||||
mapping_type: MappingType,
|
||||
@@ -39,24 +35,6 @@ impl SegmentDocIdMapping {
|
||||
}
|
||||
}
|
||||
|
||||
/// Build a `Shuffled` mapping from an explicit permutation of [`DocAddress`]es.
|
||||
///
|
||||
/// `new_doc_id_to_old_doc_addr[new_id]` gives the source segment and doc id for
|
||||
/// the document that should appear at position `new_id` in the merged segment.
|
||||
/// `alive_bitsets` must contain one entry per source segment, in the same order
|
||||
/// as the segments passed to [`IndexMerger::open_with_custom_alive_set`].
|
||||
pub fn new_shuffled(
|
||||
new_doc_id_to_old_doc_addr: Vec<DocAddress>,
|
||||
alive_bitsets: Vec<Option<ReadOnlyBitSet>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
new_doc_id_to_old_doc_addr,
|
||||
alive_bitsets,
|
||||
mapping_type: MappingType::Shuffled,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the [`MappingType`] that describes how this mapping was constructed.
|
||||
pub fn mapping_type(&self) -> MappingType {
|
||||
self.mapping_type
|
||||
}
|
||||
@@ -93,7 +71,6 @@ pub struct DocIdMapping {
|
||||
}
|
||||
|
||||
impl DocIdMapping {
|
||||
/// Constructs a [`DocIdMapping`] from a vector mapping each new doc ID to its old doc ID.
|
||||
pub fn from_new_id_to_old_id(new_doc_id_to_old: Vec<DocId>) -> Self {
|
||||
let max_doc = new_doc_id_to_old.len();
|
||||
let old_max_doc = new_doc_id_to_old
|
||||
@@ -125,7 +102,6 @@ impl DocIdMapping {
|
||||
self.new_doc_id_to_old.iter().cloned()
|
||||
}
|
||||
|
||||
/// Returns a slice mapping each old doc ID to its corresponding new doc ID.
|
||||
pub fn old_to_new_ids(&self) -> &[DocId] {
|
||||
&self.old_doc_id_to_new[..]
|
||||
}
|
||||
@@ -137,11 +113,9 @@ impl DocIdMapping {
|
||||
.map(|old_doc| els[*old_doc as usize])
|
||||
.collect()
|
||||
}
|
||||
/// Returns the number of new doc IDs in this mapping.
|
||||
pub fn num_new_doc_ids(&self) -> usize {
|
||||
self.new_doc_id_to_old.len()
|
||||
}
|
||||
/// Returns the number of old doc IDs covered by this mapping.
|
||||
pub fn num_old_doc_ids(&self) -> usize {
|
||||
self.old_doc_id_to_new.len()
|
||||
}
|
||||
|
||||
@@ -113,7 +113,6 @@ fn estimate_total_num_tokens(readers: &[SegmentReader], field: Field) -> crate::
|
||||
Ok(total_num_tokens)
|
||||
}
|
||||
|
||||
/// Merges multiple index segments into one segment.
|
||||
pub struct IndexMerger {
|
||||
index_settings: IndexSettings,
|
||||
schema: Schema,
|
||||
@@ -219,7 +218,6 @@ impl IndexMerger {
|
||||
.any(|doc_id| col.first(doc_id).is_none())
|
||||
}
|
||||
|
||||
/// Opens an [`IndexMerger`] over the given segments using their existing delete sets.
|
||||
pub fn open(
|
||||
schema: Schema,
|
||||
index_settings: IndexSettings,
|
||||
@@ -241,9 +239,6 @@ impl IndexMerger {
|
||||
// This can be used to merge but also apply an additional filter.
|
||||
// One use case is demux, which is basically taking a list of
|
||||
// segments and partitions them e.g. by a value in a field.
|
||||
/// Opens an [`IndexMerger`] with a custom alive set per segment.
|
||||
///
|
||||
/// Each entry in `alive_bitset_opt` corresponds to the segment at the same ordinal.
|
||||
pub fn open_with_custom_alive_set(
|
||||
schema: Schema,
|
||||
index_settings: IndexSettings,
|
||||
@@ -952,7 +947,7 @@ impl IndexMerger {
|
||||
///
|
||||
/// # Returns
|
||||
/// The number of documents in the resulting segment.
|
||||
pub fn write(&self, serializer: SegmentSerializer) -> crate::Result<u32> {
|
||||
pub fn write(&self, mut serializer: SegmentSerializer) -> crate::Result<u32> {
|
||||
let doc_id_mapping = if let Some(sort_by_field) = self.index_settings.sort_by_field.as_ref()
|
||||
{
|
||||
if self.is_disjunct_and_sorted_on_sort_property(sort_by_field)? {
|
||||
@@ -963,27 +958,6 @@ impl IndexMerger {
|
||||
} else {
|
||||
self.get_doc_id_from_concatenated_data()?
|
||||
};
|
||||
self.write_with_mapping(serializer, doc_id_mapping)
|
||||
}
|
||||
|
||||
/// Like [`IndexMerger::write`], but uses the caller-supplied `doc_id_mapping` instead of
|
||||
/// deriving one from an index sort field.
|
||||
///
|
||||
/// The mapping must cover all live documents across every segment passed to
|
||||
/// [`IndexMerger::open_with_custom_alive_set`].
|
||||
pub fn write_with_doc_id_mapping(
|
||||
&self,
|
||||
serializer: SegmentSerializer,
|
||||
doc_id_mapping: SegmentDocIdMapping,
|
||||
) -> crate::Result<u32> {
|
||||
self.write_with_mapping(serializer, doc_id_mapping)
|
||||
}
|
||||
|
||||
fn write_with_mapping(
|
||||
&self,
|
||||
mut serializer: SegmentSerializer,
|
||||
doc_id_mapping: SegmentDocIdMapping,
|
||||
) -> crate::Result<u32> {
|
||||
debug!("write-fieldnorms");
|
||||
if let Some(fieldnorms_serializer) = serializer.extract_fieldnorms_serializer() {
|
||||
self.write_fieldnorms(fieldnorms_serializer, &doc_id_mapping)?;
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
pub(crate) mod delete_queue;
|
||||
pub(crate) mod path_to_unordered_id;
|
||||
|
||||
pub mod doc_id_mapping;
|
||||
pub(crate) mod doc_id_mapping;
|
||||
mod doc_opstamp_mapping;
|
||||
mod flat_map_with_buffer;
|
||||
pub(crate) mod index_writer;
|
||||
@@ -17,8 +17,7 @@ pub(crate) mod indexing_term;
|
||||
mod log_merge_policy;
|
||||
mod merge_operation;
|
||||
pub(crate) mod merge_policy;
|
||||
/// Segment merger APIs for combining multiple existing segments.
|
||||
pub mod merger;
|
||||
pub(crate) mod merger;
|
||||
mod merger_sorted_index_test;
|
||||
pub(crate) mod operation;
|
||||
pub(crate) mod prepared_commit;
|
||||
@@ -34,19 +33,15 @@ mod stamper;
|
||||
use crossbeam_channel as channel;
|
||||
use smallvec::SmallVec;
|
||||
|
||||
pub use self::doc_id_mapping::SegmentDocIdMapping;
|
||||
pub use self::index_writer::{advance_deletes, IndexWriter, IndexWriterOptions};
|
||||
pub use self::log_merge_policy::LogMergePolicy;
|
||||
pub use self::merge_operation::MergeOperation;
|
||||
pub use self::merge_policy::{MergeCandidate, MergePolicy, NoMergePolicy};
|
||||
pub use self::merger::IndexMerger;
|
||||
pub use self::operation::{AddOperation, DeleteOperation, UserOperation};
|
||||
pub use self::prepared_commit::PreparedCommit;
|
||||
pub use self::segment_entry::SegmentEntry;
|
||||
pub(crate) use self::segment_serializer::SegmentSerializer;
|
||||
pub use self::segment_updater::{
|
||||
merge_filtered_segments, merge_indices, merge_segments_with_doc_id_mapping,
|
||||
};
|
||||
pub use self::segment_updater::{merge_filtered_segments, merge_indices};
|
||||
pub use self::segment_writer::SegmentWriter;
|
||||
pub use self::single_segment_index_writer::SingleSegmentIndexWriter;
|
||||
|
||||
|
||||
@@ -11,7 +11,6 @@ use crate::store::StoreWriter;
|
||||
pub struct SegmentSerializer {
|
||||
segment: Segment,
|
||||
pub(crate) store_writer: StoreWriter,
|
||||
store_is_temp: bool,
|
||||
fast_field_write: WritePtr,
|
||||
fieldnorms_serializer: Option<FieldNormsSerializer>,
|
||||
postings_serializer: InvertedIndexSerializer,
|
||||
@@ -19,19 +18,14 @@ pub struct SegmentSerializer {
|
||||
|
||||
impl SegmentSerializer {
|
||||
/// Creates a new `SegmentSerializer`.
|
||||
pub fn for_segment(segment: Segment, is_in_merge: bool) -> crate::Result<SegmentSerializer> {
|
||||
pub fn for_segment(
|
||||
mut segment: Segment,
|
||||
is_in_merge: bool,
|
||||
) -> crate::Result<SegmentSerializer> {
|
||||
// If the segment is going to be sorted, we stream the docs first to a temporary file.
|
||||
// In the merge case this is not necessary because we can kmerge the already sorted
|
||||
// segments
|
||||
let remapping_required = segment.index().settings().sort_by_field.is_some() && !is_in_merge;
|
||||
Self::for_segment_with_remapping_required(segment, remapping_required)
|
||||
}
|
||||
|
||||
/// Creates a new `SegmentSerializer` with an explicit remapping requirement.
|
||||
pub fn for_segment_with_remapping_required(
|
||||
mut segment: Segment,
|
||||
remapping_required: bool,
|
||||
) -> crate::Result<SegmentSerializer> {
|
||||
let settings = segment.index().settings().clone();
|
||||
let store_writer = if remapping_required {
|
||||
let store_write = segment.open_write(SegmentComponent::TempStore)?;
|
||||
@@ -63,7 +57,6 @@ impl SegmentSerializer {
|
||||
Ok(SegmentSerializer {
|
||||
segment,
|
||||
store_writer,
|
||||
store_is_temp: remapping_required,
|
||||
fast_field_write,
|
||||
fieldnorms_serializer: Some(fieldnorms_serializer),
|
||||
postings_serializer,
|
||||
@@ -83,10 +76,6 @@ impl SegmentSerializer {
|
||||
&mut self.segment
|
||||
}
|
||||
|
||||
pub fn store_is_temp(&self) -> bool {
|
||||
self.store_is_temp
|
||||
}
|
||||
|
||||
/// Accessor to the `PostingsSerializer`.
|
||||
pub fn get_postings_serializer(&mut self) -> &mut InvertedIndexSerializer {
|
||||
&mut self.postings_serializer
|
||||
|
||||
@@ -15,7 +15,6 @@ use crate::directory::{Directory, DirectoryClone, GarbageCollectionResult};
|
||||
use crate::fastfield::AliveBitSet;
|
||||
use crate::index::{Index, IndexMeta, IndexSettings, Segment, SegmentId, SegmentMeta};
|
||||
use crate::indexer::delete_queue::DeleteCursor;
|
||||
use crate::indexer::doc_id_mapping::SegmentDocIdMapping;
|
||||
use crate::indexer::index_writer::advance_deletes;
|
||||
use crate::indexer::merge_operation::MergeOperationInventory;
|
||||
use crate::indexer::merger::IndexMerger;
|
||||
@@ -256,81 +255,6 @@ pub fn merge_filtered_segments<T: Into<Box<dyn Directory>>>(
|
||||
Ok(merged_index)
|
||||
}
|
||||
|
||||
/// Like [`merge_filtered_segments`], but uses a caller-supplied [`SegmentDocIdMapping`]
|
||||
/// to control the final document order.
|
||||
///
|
||||
/// The mapping should be built from the same segments, in the same order, passed here.
|
||||
///
|
||||
/// # Warning
|
||||
/// Same caveats as [`merge_filtered_segments`]: no live `IndexWriter` is allowed.
|
||||
#[doc(hidden)]
|
||||
pub fn merge_segments_with_doc_id_mapping<T: Into<Box<dyn Directory>>>(
|
||||
segments: &[Segment],
|
||||
target_settings: IndexSettings,
|
||||
filter_doc_ids: Vec<Option<AliveBitSet>>,
|
||||
doc_id_mapping: SegmentDocIdMapping,
|
||||
output_directory: T,
|
||||
) -> crate::Result<Index> {
|
||||
if segments.is_empty() {
|
||||
return Err(crate::TantivyError::InvalidArgument(
|
||||
"No segments given to merge".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
let target_schema = segments[0].schema();
|
||||
|
||||
if segments
|
||||
.iter()
|
||||
.skip(1)
|
||||
.any(|segment| segment.schema() != target_schema)
|
||||
{
|
||||
return Err(crate::TantivyError::InvalidArgument(
|
||||
"Attempt to merge different schema indices".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
let mut merged_index = Index::create(
|
||||
output_directory,
|
||||
target_schema.clone(),
|
||||
target_settings.clone(),
|
||||
)?;
|
||||
let merged_segment = merged_index.new_segment();
|
||||
let merged_segment_id = merged_segment.id();
|
||||
let merger = IndexMerger::open_with_custom_alive_set(
|
||||
merged_index.schema(),
|
||||
merged_index.settings().clone(),
|
||||
segments,
|
||||
filter_doc_ids,
|
||||
)?;
|
||||
let segment_serializer = SegmentSerializer::for_segment(merged_segment, true)?;
|
||||
let num_docs = merger.write_with_doc_id_mapping(segment_serializer, doc_id_mapping)?;
|
||||
|
||||
let segment_meta = merged_index.new_segment_meta(merged_segment_id, num_docs);
|
||||
|
||||
let stats = format!(
|
||||
"Segments Merge (external reordering): [{}]",
|
||||
segments
|
||||
.iter()
|
||||
.fold(String::new(), |sum, current| format!(
|
||||
"{sum}{} ",
|
||||
current.meta().id().uuid_string()
|
||||
))
|
||||
.trim_end()
|
||||
);
|
||||
|
||||
let index_meta = IndexMeta {
|
||||
index_settings: target_settings,
|
||||
segments: vec![segment_meta],
|
||||
schema: target_schema,
|
||||
opstamp: 0u64,
|
||||
payload: Some(stats),
|
||||
};
|
||||
|
||||
save_metas(&index_meta, merged_index.directory_mut())?;
|
||||
|
||||
Ok(merged_index)
|
||||
}
|
||||
|
||||
pub(crate) struct InnerSegmentUpdater {
|
||||
// we keep a copy of the current active IndexMeta to
|
||||
// avoid loading the file every time we need it in the
|
||||
|
||||
@@ -18,9 +18,7 @@ use crate::postings::{
|
||||
use crate::schema::document::{Document, Value};
|
||||
use crate::schema::{FieldEntry, FieldType, Schema, DATE_TIME_PRECISION_INDEXED};
|
||||
use crate::store::{StoreReader, StoreWriter};
|
||||
use crate::tokenizer::{
|
||||
FacetTokenizer, PreTokenizedStream, TextAnalyzer, Tokenizer, TokenizerManager,
|
||||
};
|
||||
use crate::tokenizer::{FacetTokenizer, PreTokenizedStream, TextAnalyzer, Tokenizer};
|
||||
use crate::{DocId, Opstamp, TantivyError};
|
||||
|
||||
/// Computes the initial size of the hash table.
|
||||
@@ -92,42 +90,8 @@ impl SegmentWriter {
|
||||
let schema = segment.schema();
|
||||
let tokenizer_manager = segment.index().tokenizers().clone();
|
||||
let tokenizer_manager_fast_field = segment.index().fast_field_tokenizer().clone();
|
||||
let segment_serializer = SegmentSerializer::for_segment(segment, false)?;
|
||||
Self::for_segment_serializer(
|
||||
memory_budget_in_bytes,
|
||||
schema,
|
||||
tokenizer_manager,
|
||||
tokenizer_manager_fast_field,
|
||||
segment_serializer,
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) fn for_segment_with_provided_doc_id_mapping(
|
||||
memory_budget_in_bytes: usize,
|
||||
segment: Segment,
|
||||
) -> crate::Result<Self> {
|
||||
let schema = segment.schema();
|
||||
let tokenizer_manager = segment.index().tokenizers().clone();
|
||||
let tokenizer_manager_fast_field = segment.index().fast_field_tokenizer().clone();
|
||||
let segment_serializer =
|
||||
SegmentSerializer::for_segment_with_remapping_required(segment, true)?;
|
||||
Self::for_segment_serializer(
|
||||
memory_budget_in_bytes,
|
||||
schema,
|
||||
tokenizer_manager,
|
||||
tokenizer_manager_fast_field,
|
||||
segment_serializer,
|
||||
)
|
||||
}
|
||||
|
||||
fn for_segment_serializer(
|
||||
memory_budget_in_bytes: usize,
|
||||
schema: Schema,
|
||||
tokenizer_manager: TokenizerManager,
|
||||
tokenizer_manager_fast_field: TokenizerManager,
|
||||
segment_serializer: SegmentSerializer,
|
||||
) -> crate::Result<Self> {
|
||||
let table_size = compute_initial_table_size(memory_budget_in_bytes)?;
|
||||
let segment_serializer = SegmentSerializer::for_segment(segment, false)?;
|
||||
let per_field_postings_writers = PerFieldPostingsWriter::for_schema(&schema);
|
||||
let per_field_text_analyzers = schema
|
||||
.fields()
|
||||
@@ -185,31 +149,6 @@ impl SegmentWriter {
|
||||
.clone()
|
||||
.map(|sort_by_field| get_doc_id_mapping_from_field(sort_by_field, &self))
|
||||
.transpose()?;
|
||||
self.finalize_with_mapping(mapping.as_ref())
|
||||
}
|
||||
|
||||
/// Lay on disk the current content of the `SegmentWriter`, using a caller-provided document
|
||||
/// order.
|
||||
///
|
||||
/// `new_doc_id_to_old_doc_id[new_id]` is the old document id of the document that should be
|
||||
/// serialized at `new_id`.
|
||||
pub fn finalize_with_doc_id_mapping(
|
||||
mut self,
|
||||
new_doc_id_to_old_doc_id: Vec<DocId>,
|
||||
) -> crate::Result<Vec<u64>> {
|
||||
if new_doc_id_to_old_doc_id.len() != self.max_doc as usize {
|
||||
return Err(crate::TantivyError::InvalidArgument(format!(
|
||||
"provided doc id mapping length {} does not match segment max_doc {}",
|
||||
new_doc_id_to_old_doc_id.len(),
|
||||
self.max_doc
|
||||
)));
|
||||
}
|
||||
self.fieldnorms_writer.fill_up_to_max_doc(self.max_doc);
|
||||
let mapping = DocIdMapping::from_new_id_to_old_id(new_doc_id_to_old_doc_id);
|
||||
self.finalize_with_mapping(Some(&mapping))
|
||||
}
|
||||
|
||||
fn finalize_with_mapping(self, mapping: Option<&DocIdMapping>) -> crate::Result<Vec<u64>> {
|
||||
remap_and_write(
|
||||
self.schema,
|
||||
&self.per_field_postings_writers,
|
||||
@@ -217,10 +156,9 @@ impl SegmentWriter {
|
||||
self.fast_field_writers,
|
||||
&self.fieldnorms_writer,
|
||||
self.segment_serializer,
|
||||
self.max_doc,
|
||||
mapping,
|
||||
mapping.as_ref(),
|
||||
)?;
|
||||
let doc_opstamps = remap_doc_opstamps(self.doc_opstamps, mapping);
|
||||
let doc_opstamps = remap_doc_opstamps(self.doc_opstamps, mapping.as_ref());
|
||||
Ok(doc_opstamps)
|
||||
}
|
||||
|
||||
@@ -482,7 +420,6 @@ fn remap_and_write(
|
||||
fast_field_writers: FastFieldsWriter,
|
||||
fieldnorms_writer: &FieldNormsWriter,
|
||||
mut serializer: SegmentSerializer,
|
||||
max_doc: DocId,
|
||||
doc_id_map: Option<&DocIdMapping>,
|
||||
) -> crate::Result<()> {
|
||||
debug!("remap-and-write");
|
||||
@@ -504,10 +441,9 @@ fn remap_and_write(
|
||||
debug!("fastfield-serialize");
|
||||
fast_field_writers.serialize(serializer.get_fast_field_write(), doc_id_map)?;
|
||||
|
||||
// Finalize the temp docstore and create the final store. With a mapping, the final store
|
||||
// reflects the new doc id order; without one, it preserves the insertion order.
|
||||
if serializer.store_is_temp() {
|
||||
debug!("rewrite-docstore");
|
||||
// finalize temp docstore and create version, which reflects the doc_id_map
|
||||
if let Some(doc_id_map) = doc_id_map {
|
||||
debug!("resort-docstore");
|
||||
let store_write = serializer
|
||||
.segment_mut()
|
||||
.open_write(SegmentComponent::Store)?;
|
||||
@@ -527,10 +463,7 @@ fn remap_and_write(
|
||||
1, /* The docstore is configured to have one doc per block, and each doc is
|
||||
* accessed only once: we don't need caching. */
|
||||
)?;
|
||||
let old_doc_ids = doc_id_map
|
||||
.map(|doc_id_map| doc_id_map.iter_old_doc_ids().collect::<Vec<_>>())
|
||||
.unwrap_or_else(|| (0..max_doc).collect::<Vec<_>>());
|
||||
for old_doc_id in old_doc_ids {
|
||||
for old_doc_id in doc_id_map.iter_old_doc_ids() {
|
||||
let doc_bytes = store_read.get_document_bytes(old_doc_id)?;
|
||||
serializer.get_store_writer().store_bytes(&doc_bytes)?;
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@ use crate::indexer::operation::AddOperation;
|
||||
use crate::indexer::segment_updater::save_metas;
|
||||
use crate::indexer::SegmentWriter;
|
||||
use crate::schema::document::Document;
|
||||
use crate::{Directory, DocId, Index, IndexMeta, Opstamp, Segment, TantivyDocument};
|
||||
use crate::{Directory, Index, IndexMeta, Opstamp, Segment, TantivyDocument};
|
||||
|
||||
#[doc(hidden)]
|
||||
pub struct SingleSegmentIndexWriter<D: Document = TantivyDocument> {
|
||||
@@ -17,8 +17,7 @@ pub struct SingleSegmentIndexWriter<D: Document = TantivyDocument> {
|
||||
impl<D: Document> SingleSegmentIndexWriter<D> {
|
||||
pub fn new(index: Index, mem_budget: usize) -> crate::Result<Self> {
|
||||
let segment = index.new_segment();
|
||||
let segment_writer =
|
||||
SegmentWriter::for_segment_with_provided_doc_id_mapping(mem_budget, segment.clone())?;
|
||||
let segment_writer = SegmentWriter::for_segment(mem_budget, segment.clone())?;
|
||||
Ok(Self {
|
||||
segment_writer,
|
||||
segment,
|
||||
@@ -41,123 +40,17 @@ impl<D: Document> SingleSegmentIndexWriter<D> {
|
||||
pub fn finalize(self) -> crate::Result<Index> {
|
||||
let max_doc = self.segment_writer.max_doc();
|
||||
self.segment_writer.finalize()?;
|
||||
finalize_segment(self.segment, max_doc)
|
||||
}
|
||||
|
||||
/// Finalizes this single-segment index using a caller-provided document order.
|
||||
///
|
||||
/// `new_doc_id_to_old_doc_id[new_id]` is the old insertion doc id of the document that should
|
||||
/// be serialized at `new_id`.
|
||||
pub fn finalize_with_doc_id_mapping(
|
||||
self,
|
||||
new_doc_id_to_old_doc_id: Vec<DocId>,
|
||||
) -> crate::Result<Index> {
|
||||
let max_doc = self.segment_writer.max_doc();
|
||||
self.segment_writer
|
||||
.finalize_with_doc_id_mapping(new_doc_id_to_old_doc_id)?;
|
||||
finalize_segment(self.segment, max_doc)
|
||||
}
|
||||
}
|
||||
|
||||
fn finalize_segment(segment: Segment, max_doc: DocId) -> crate::Result<Index> {
|
||||
let segment: Segment = segment.with_max_doc(max_doc);
|
||||
let index = segment.index();
|
||||
let index_meta = IndexMeta {
|
||||
index_settings: index.settings().clone(),
|
||||
segments: vec![segment.meta().clone()],
|
||||
schema: index.schema(),
|
||||
opstamp: 0,
|
||||
payload: None,
|
||||
};
|
||||
save_metas(&index_meta, index.directory())?;
|
||||
index.directory().sync_directory()?;
|
||||
Ok(segment.index().clone())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::collector::TopDocs;
|
||||
use crate::directory::RamDirectory;
|
||||
use crate::query::QueryParser;
|
||||
use crate::schema::{
|
||||
IndexRecordOption, NumericOptions, Schema, TextFieldIndexing, TextOptions, Value, STORED,
|
||||
};
|
||||
use crate::{Index, ReloadPolicy, TantivyDocument};
|
||||
|
||||
#[test]
|
||||
fn test_finalize_with_doc_id_mapping() -> crate::Result<()> {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let id_field = schema_builder.add_u64_field("id", NumericOptions::default().set_fast());
|
||||
let text_field = schema_builder.add_text_field(
|
||||
"text",
|
||||
TextOptions::default().set_stored().set_indexing_options(
|
||||
TextFieldIndexing::default()
|
||||
.set_index_option(IndexRecordOption::WithFreqs)
|
||||
.set_fieldnorms(true),
|
||||
),
|
||||
);
|
||||
let stored_field = schema_builder.add_text_field("stored", STORED);
|
||||
let schema = schema_builder.build();
|
||||
let mut writer = Index::builder()
|
||||
.schema(schema)
|
||||
.single_segment_index_writer(RamDirectory::create(), 15_000_000)?;
|
||||
writer.add_document(doc!(
|
||||
id_field => 10u64,
|
||||
text_field => "alpha beta",
|
||||
stored_field => "old-0",
|
||||
))?;
|
||||
writer.add_document(doc!(
|
||||
id_field => 20u64,
|
||||
text_field => "alpha",
|
||||
stored_field => "old-1",
|
||||
))?;
|
||||
writer.add_document(doc!(
|
||||
id_field => 30u64,
|
||||
text_field => "beta",
|
||||
stored_field => "old-2",
|
||||
))?;
|
||||
|
||||
let index = writer.finalize_with_doc_id_mapping(vec![2, 0, 1])?;
|
||||
let reader = index
|
||||
.reader_builder()
|
||||
.reload_policy(ReloadPolicy::Manual)
|
||||
.try_into()?;
|
||||
let searcher = reader.searcher();
|
||||
let segment_reader = searcher.segment_reader(0);
|
||||
|
||||
let fast_field = segment_reader
|
||||
.fast_fields()
|
||||
.u64("id")?
|
||||
.first_or_default_col(0);
|
||||
assert_eq!(fast_field.get_val(0), 30u64);
|
||||
assert_eq!(fast_field.get_val(1), 10u64);
|
||||
assert_eq!(fast_field.get_val(2), 20u64);
|
||||
|
||||
let fieldnorm_reader = segment_reader.get_fieldnorms_reader(text_field)?;
|
||||
assert_eq!(fieldnorm_reader.fieldnorm(0), 1);
|
||||
assert_eq!(fieldnorm_reader.fieldnorm(1), 2);
|
||||
assert_eq!(fieldnorm_reader.fieldnorm(2), 1);
|
||||
|
||||
let mut stored_values = Vec::new();
|
||||
for doc_id in 0..segment_reader.max_doc() {
|
||||
let doc: TantivyDocument = segment_reader.get_store_reader(1024)?.get(doc_id)?;
|
||||
let stored_value = doc
|
||||
.get_first(stored_field)
|
||||
.and_then(|value| value.as_str())
|
||||
.unwrap();
|
||||
stored_values.push(stored_value.to_string());
|
||||
}
|
||||
assert_eq!(stored_values, ["old-2", "old-0", "old-1"]);
|
||||
|
||||
let query = QueryParser::for_index(&index, vec![text_field]).parse_query("beta")?;
|
||||
let top_docs: Vec<(_, _)> =
|
||||
searcher.search(&query, &TopDocs::with_limit(10).order_by_score())?;
|
||||
let doc_ids = top_docs
|
||||
.into_iter()
|
||||
.map(|(_, doc_address)| doc_address.doc_id)
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(doc_ids, [0, 1]);
|
||||
|
||||
Ok(())
|
||||
let segment: Segment = self.segment.with_max_doc(max_doc);
|
||||
let index = segment.index();
|
||||
let index_meta = IndexMeta {
|
||||
index_settings: index.settings().clone(),
|
||||
segments: vec![segment.meta().clone()],
|
||||
schema: index.schema(),
|
||||
opstamp: 0,
|
||||
payload: None,
|
||||
};
|
||||
save_metas(&index_meta, index.directory())?;
|
||||
index.directory().sync_directory()?;
|
||||
Ok(segment.index().clone())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -229,10 +229,7 @@ pub use crate::index::{
|
||||
Index, IndexBuilder, IndexMeta, IndexSettings, IndexSortByField, InvertedIndexReader, Order,
|
||||
Segment, SegmentMeta, SegmentReader,
|
||||
};
|
||||
pub use crate::indexer::{
|
||||
merge_segments_with_doc_id_mapping, IndexMerger, IndexWriter, SegmentDocIdMapping,
|
||||
SingleSegmentIndexWriter,
|
||||
};
|
||||
pub use crate::indexer::{IndexWriter, SingleSegmentIndexWriter};
|
||||
pub use crate::schema::{Document, TantivyDocument, Term};
|
||||
|
||||
/// Index format version.
|
||||
|
||||
Reference in New Issue
Block a user