mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-03 15:52:55 +00:00
LogMergePolicy knob del_docs_percentage_before_merge (#1238)
Add a knob to LogMergePolicy to always merge segments that exceed a threshold of deleted docs Closes #115
This commit is contained in:
@@ -5,6 +5,7 @@ Tantivy 0.17
|
|||||||
- Bugfix that could in theory impact durability in theory on some filesystems [#1224](https://github.com/quickwit-inc/tantivy/issues/1224)
|
- Bugfix that could in theory impact durability in theory on some filesystems [#1224](https://github.com/quickwit-inc/tantivy/issues/1224)
|
||||||
- Reduce the number of fsync calls [#1225](https://github.com/quickwit-inc/tantivy/issues/1225)
|
- Reduce the number of fsync calls [#1225](https://github.com/quickwit-inc/tantivy/issues/1225)
|
||||||
- Schema now offers not indexing fieldnorms (@lpouget) [#922](https://github.com/quickwit-inc/tantivy/issues/922)
|
- Schema now offers not indexing fieldnorms (@lpouget) [#922](https://github.com/quickwit-inc/tantivy/issues/922)
|
||||||
|
- LogMergePolicy now triggers merges if the ratio of deleted documents reaches a threshold (@shikhar) [#115](https://github.com/quickwit-inc/tantivy/issues/115)
|
||||||
|
|
||||||
Tantivy 0.16.2
|
Tantivy 0.16.2
|
||||||
================================
|
================================
|
||||||
|
|||||||
@@ -189,6 +189,10 @@ impl SegmentMeta {
|
|||||||
|
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: Opstamp) -> SegmentMeta {
|
pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: Opstamp) -> SegmentMeta {
|
||||||
|
assert!(
|
||||||
|
num_deleted_docs <= self.max_doc(),
|
||||||
|
"There cannot be more deleted docs than there are docs."
|
||||||
|
);
|
||||||
let delete_meta = DeleteMeta {
|
let delete_meta = DeleteMeta {
|
||||||
num_deleted_docs,
|
num_deleted_docs,
|
||||||
opstamp,
|
opstamp,
|
||||||
|
|||||||
@@ -974,7 +974,7 @@ mod tests {
|
|||||||
assert_eq!(
|
assert_eq!(
|
||||||
format!("{:?}", index_writer.get_merge_policy()),
|
format!("{:?}", index_writer.get_merge_policy()),
|
||||||
"LogMergePolicy { min_num_segments: 8, max_docs_before_merge: 10000000, min_layer_size: 10000, \
|
"LogMergePolicy { min_num_segments: 8, max_docs_before_merge: 10000000, min_layer_size: 10000, \
|
||||||
level_log_size: 0.75 }"
|
level_log_size: 0.75, del_docs_ratio_before_merge: 1.0 }"
|
||||||
);
|
);
|
||||||
let merge_policy = Box::new(NoMergePolicy::default());
|
let merge_policy = Box::new(NoMergePolicy::default());
|
||||||
index_writer.set_merge_policy(merge_policy);
|
index_writer.set_merge_policy(merge_policy);
|
||||||
|
|||||||
@@ -2,12 +2,15 @@ use super::merge_policy::{MergeCandidate, MergePolicy};
|
|||||||
use crate::core::SegmentMeta;
|
use crate::core::SegmentMeta;
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use std::cmp;
|
use std::cmp;
|
||||||
use std::f64;
|
|
||||||
|
|
||||||
const DEFAULT_LEVEL_LOG_SIZE: f64 = 0.75;
|
const DEFAULT_LEVEL_LOG_SIZE: f64 = 0.75;
|
||||||
const DEFAULT_MIN_LAYER_SIZE: u32 = 10_000;
|
const DEFAULT_MIN_LAYER_SIZE: u32 = 10_000;
|
||||||
const DEFAULT_MIN_NUM_SEGMENTS_IN_MERGE: usize = 8;
|
const DEFAULT_MIN_NUM_SEGMENTS_IN_MERGE: usize = 8;
|
||||||
const DEFAULT_MAX_DOCS_BEFORE_MERGE: usize = 10_000_000;
|
const DEFAULT_MAX_DOCS_BEFORE_MERGE: usize = 10_000_000;
|
||||||
|
// The default value of 1 means that deletes are not taken in account when
|
||||||
|
// identifying merge candidates. This is not a very sensible default: it was
|
||||||
|
// set like that for backward compatibility and might change in the near future.
|
||||||
|
const DEFAULT_DEL_DOCS_RATIO_BEFORE_MERGE: f32 = 1.0f32;
|
||||||
|
|
||||||
/// `LogMergePolicy` tries to merge segments that have a similar number of
|
/// `LogMergePolicy` tries to merge segments that have a similar number of
|
||||||
/// documents.
|
/// documents.
|
||||||
@@ -17,6 +20,7 @@ pub struct LogMergePolicy {
|
|||||||
max_docs_before_merge: usize,
|
max_docs_before_merge: usize,
|
||||||
min_layer_size: u32,
|
min_layer_size: u32,
|
||||||
level_log_size: f64,
|
level_log_size: f64,
|
||||||
|
del_docs_ratio_before_merge: f32,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl LogMergePolicy {
|
impl LogMergePolicy {
|
||||||
@@ -52,19 +56,49 @@ impl LogMergePolicy {
|
|||||||
pub fn set_level_log_size(&mut self, level_log_size: f64) {
|
pub fn set_level_log_size(&mut self, level_log_size: f64) {
|
||||||
self.level_log_size = level_log_size;
|
self.level_log_size = level_log_size;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Set the ratio of deleted documents in a segment to tolerate.
|
||||||
|
///
|
||||||
|
/// If it is exceeded by any segment at a log level, a merge
|
||||||
|
/// will be triggered for that level.
|
||||||
|
///
|
||||||
|
/// If there is a single segment at a level, we effectively end up expunging
|
||||||
|
/// deleted documents from it.
|
||||||
|
///
|
||||||
|
/// # Panics
|
||||||
|
///
|
||||||
|
/// Panics if del_docs_ratio_before_merge is not within (0..1].
|
||||||
|
pub fn set_del_docs_ratio_before_merge(&mut self, del_docs_ratio_before_merge: f32) {
|
||||||
|
assert!(del_docs_ratio_before_merge <= 1.0f32);
|
||||||
|
assert!(del_docs_ratio_before_merge > 0f32);
|
||||||
|
self.del_docs_ratio_before_merge = del_docs_ratio_before_merge;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn has_segment_above_deletes_threshold(&self, level: &[&SegmentMeta]) -> bool {
|
||||||
|
level
|
||||||
|
.iter()
|
||||||
|
.any(|segment| deletes_ratio(segment) > self.del_docs_ratio_before_merge)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn deletes_ratio(segment: &SegmentMeta) -> f32 {
|
||||||
|
if segment.max_doc() == 0 {
|
||||||
|
return 0f32;
|
||||||
|
}
|
||||||
|
segment.num_deleted_docs() as f32 / segment.max_doc() as f32
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MergePolicy for LogMergePolicy {
|
impl MergePolicy for LogMergePolicy {
|
||||||
fn compute_merge_candidates(&self, segments: &[SegmentMeta]) -> Vec<MergeCandidate> {
|
fn compute_merge_candidates(&self, segments: &[SegmentMeta]) -> Vec<MergeCandidate> {
|
||||||
let mut size_sorted_segments = segments
|
let size_sorted_segments = segments
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|segment_meta| segment_meta.num_docs() <= (self.max_docs_before_merge as u32))
|
.filter(|seg| seg.num_docs() <= (self.max_docs_before_merge as u32))
|
||||||
|
.sorted_by_key(|seg| std::cmp::Reverse(seg.max_doc()))
|
||||||
.collect::<Vec<&SegmentMeta>>();
|
.collect::<Vec<&SegmentMeta>>();
|
||||||
|
|
||||||
if size_sorted_segments.len() <= 1 {
|
if size_sorted_segments.is_empty() {
|
||||||
return vec![];
|
return vec![];
|
||||||
}
|
}
|
||||||
size_sorted_segments.sort_by_key(|seg| std::cmp::Reverse(seg.num_docs()));
|
|
||||||
|
|
||||||
let mut current_max_log_size = f64::MAX;
|
let mut current_max_log_size = f64::MAX;
|
||||||
let mut levels = vec![];
|
let mut levels = vec![];
|
||||||
@@ -82,7 +116,10 @@ impl MergePolicy for LogMergePolicy {
|
|||||||
|
|
||||||
levels
|
levels
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|level| level.len() >= self.min_num_segments)
|
.filter(|level| {
|
||||||
|
level.len() >= self.min_num_segments
|
||||||
|
|| self.has_segment_above_deletes_threshold(level)
|
||||||
|
})
|
||||||
.map(|segments| MergeCandidate(segments.iter().map(|&seg| seg.id()).collect()))
|
.map(|segments| MergeCandidate(segments.iter().map(|&seg| seg.id()).collect()))
|
||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
@@ -95,6 +132,7 @@ impl Default for LogMergePolicy {
|
|||||||
max_docs_before_merge: DEFAULT_MAX_DOCS_BEFORE_MERGE,
|
max_docs_before_merge: DEFAULT_MAX_DOCS_BEFORE_MERGE,
|
||||||
min_layer_size: DEFAULT_MIN_LAYER_SIZE,
|
min_layer_size: DEFAULT_MIN_LAYER_SIZE,
|
||||||
level_log_size: DEFAULT_LEVEL_LOG_SIZE,
|
level_log_size: DEFAULT_LEVEL_LOG_SIZE,
|
||||||
|
del_docs_ratio_before_merge: DEFAULT_DEL_DOCS_RATIO_BEFORE_MERGE,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -288,4 +326,49 @@ mod tests {
|
|||||||
assert_eq!(result_list[0].0[1], test_input[4].id());
|
assert_eq!(result_list[0].0[1], test_input[4].id());
|
||||||
assert_eq!(result_list[0].0[2], test_input[5].id());
|
assert_eq!(result_list[0].0[2], test_input[5].id());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_merge_single_segment_with_deletes_below_threshold() {
|
||||||
|
let mut test_merge_policy = test_merge_policy();
|
||||||
|
test_merge_policy.set_del_docs_ratio_before_merge(0.25f32);
|
||||||
|
let test_input = vec![create_random_segment_meta(40_000).with_delete_meta(10_000, 1)];
|
||||||
|
let merge_candidates = test_merge_policy.compute_merge_candidates(&test_input);
|
||||||
|
assert!(merge_candidates.is_empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_merge_single_segment_with_deletes_above_threshold() {
|
||||||
|
let mut test_merge_policy = test_merge_policy();
|
||||||
|
test_merge_policy.set_del_docs_ratio_before_merge(0.25f32);
|
||||||
|
let test_input = vec![create_random_segment_meta(40_000).with_delete_meta(10_001, 1)];
|
||||||
|
let merge_candidates = test_merge_policy.compute_merge_candidates(&test_input);
|
||||||
|
assert_eq!(merge_candidates.len(), 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_merge_segments_with_deletes_above_threshold_all_in_level() {
|
||||||
|
let mut test_merge_policy = test_merge_policy();
|
||||||
|
test_merge_policy.set_del_docs_ratio_before_merge(0.25f32);
|
||||||
|
let test_input = vec![
|
||||||
|
create_random_segment_meta(40_000).with_delete_meta(10_001, 1),
|
||||||
|
create_random_segment_meta(40_000),
|
||||||
|
];
|
||||||
|
let merge_candidates = test_merge_policy.compute_merge_candidates(&test_input);
|
||||||
|
assert_eq!(merge_candidates.len(), 1);
|
||||||
|
assert_eq!(merge_candidates[0].0.len(), 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_merge_segments_with_deletes_above_threshold_different_level_not_involved() {
|
||||||
|
let mut test_merge_policy = test_merge_policy();
|
||||||
|
test_merge_policy.set_del_docs_ratio_before_merge(0.25f32);
|
||||||
|
let test_input = vec![
|
||||||
|
create_random_segment_meta(100),
|
||||||
|
create_random_segment_meta(40_000).with_delete_meta(10_001, 1),
|
||||||
|
];
|
||||||
|
let merge_candidates = test_merge_policy.compute_merge_candidates(&test_input);
|
||||||
|
assert_eq!(merge_candidates.len(), 1);
|
||||||
|
assert_eq!(merge_candidates[0].0.len(), 1);
|
||||||
|
assert_eq!(merge_candidates[0].0[0], test_input[1].id());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user