From de19c93132c30f4c40c02ce1ab8ccf4cb1ff06ee Mon Sep 17 00:00:00 2001 From: Ming Ying Date: Thu, 9 Jan 2025 22:00:59 -0500 Subject: [PATCH] add reconsider_merge_policy to directory --- src/directory/directory.rs | 11 +++++++++++ src/directory/managed_directory.rs | 9 +++++++++ src/indexer/segment_updater.rs | 10 ++++++++++ 3 files changed, 30 insertions(+) diff --git a/src/directory/directory.rs b/src/directory/directory.rs index ec6ee1aa1..de67748a7 100644 --- a/src/directory/directory.rs +++ b/src/directory/directory.rs @@ -2,6 +2,7 @@ use crate::directory::directory_lock::Lock; use crate::directory::error::{DeleteError, LockError, OpenReadError, OpenWriteError}; use crate::directory::{FileHandle, FileSlice, WatchCallback, WatchHandle, WritePtr}; use crate::index::SegmentMetaInventory; +use crate::merge_policy::MergePolicy; use crate::IndexMeta; use std::any::Any; use std::collections::HashSet; @@ -265,6 +266,16 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static { "load_metas not implemented".to_string(), )) } + + // Allows the directory to change the writer's merge policy right before the merge happens + // This is useful for directories that need to change the merge policy based on how many segments were created + fn reconsider_merge_policy( + &self, + _metas: &IndexMeta, + _previous_metas: &IndexMeta, + ) -> Option> { + None + } } /// DirectoryClone diff --git a/src/directory/managed_directory.rs b/src/directory/managed_directory.rs index 59d6e9b50..185daed91 100644 --- a/src/directory/managed_directory.rs +++ b/src/directory/managed_directory.rs @@ -13,6 +13,7 @@ use crate::directory::{ DirectoryLock, FileHandle, FileSlice, GarbageCollectionResult, Lock, WatchCallback, WatchHandle, WritePtr, MANAGED_LOCK, META_LOCK, }; +use crate::merge_policy::MergePolicy; use crate::error::DataCorruption; use crate::index::SegmentMetaInventory; use crate::{Directory, IndexMeta}; @@ -349,6 +350,14 @@ impl Directory for ManagedDirectory { fn load_metas(&self, inventory: &SegmentMetaInventory) -> crate::Result { self.directory.load_metas(inventory) } + + fn reconsider_merge_policy( + &self, + metas: &IndexMeta, + previous_metas: &IndexMeta, + ) -> Option> { + self.directory.reconsider_merge_policy(metas, previous_metas) + } } impl Clone for ManagedDirectory { diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index 7d2b21ade..e23ad8cd0 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -466,6 +466,16 @@ impl SegmentUpdater { segment_updater.segment_manager.commit(segment_entries); segment_updater.save_metas(opstamp, payload, &previous_metas)?; let _ = garbage_collect_files(segment_updater.clone()); + + let index_meta = segment_updater.load_meta(); + if let Some(new_merge_policy) = segment_updater + .index + .directory() + .reconsider_merge_policy(&index_meta, &previous_metas) + { + segment_updater.set_merge_policy(new_merge_policy); + } + segment_updater.consider_merge_options(); Ok(opstamp) })