Issue/471 (#481)

* Closes 471

Removing writing_segments in the segment manager as it is now useless.
Removing the target merged segment id as it is useless as well.

* RAII for tracking which segment is in merge.

Closes #471

* fmt

* Using Inventory::default().
This commit is contained in:
Paul Masurel
2019-01-27 12:18:59 +09:00
committed by GitHub
parent 097eaf4aa6
commit bf94fd77db
11 changed files with 182 additions and 291 deletions

View File

@@ -42,7 +42,7 @@ rust-stemmers = "1"
downcast = { version="0.9" }
matches = "0.1"
bitpacking = "0.5"
census = "0.1"
census = "0.2"
fnv = "1.0.6"
owned-read = "0.4"
failure = "0.1"

View File

@@ -92,6 +92,9 @@ impl ManagedDirectory {
///
/// * `living_files` - List of files that are still used by the index.
///
/// The use a callback ensures that the list of living_files is computed
/// while we hold the lock on meta.
///
/// This method does not panick nor returns errors.
/// If a file cannot be deleted (for permission reasons for instance)
/// an error is simply logged, and the file remains in the list of managed

View File

@@ -25,6 +25,7 @@ use schema::Document;
use schema::IndexRecordOption;
use schema::Term;
use std::mem;
use std::sync::Arc;
use std::thread;
use std::thread::JoinHandle;
use Result;
@@ -366,13 +367,16 @@ impl IndexWriter {
.add_segment(self.generation, segment_entry);
}
/// *Experimental & Advanced API* Creates a new segment.
/// and marks it as currently in write.
/// Creates a new segment.
///
/// This method is useful only for users trying to do complex
/// operations, like converting an index format to another.
///
/// It is safe to start writing file associated to the new `Segment`.
/// These will not be garbage collected as long as an instance object of
/// `SegmentMeta` object associated to the new `Segment` is "alive".
pub fn new_segment(&self) -> Segment {
self.segment_updater.new_segment()
self.index.new_segment()
}
/// Spawns a new worker thread for indexing.
@@ -387,6 +391,7 @@ impl IndexWriter {
let mut delete_cursor = self.delete_queue.cursor();
let mem_budget = self.heap_size_in_bytes_per_thread;
let index = self.index.clone();
let join_handle: JoinHandle<Result<()>> = thread::Builder::new()
.name(format!(
"thrd-tantivy-index{}-gen{}",
@@ -412,7 +417,7 @@ impl IndexWriter {
// was dropped.
return Ok(());
}
let segment = segment_updater.new_segment();
let segment = index.new_segment();
index_documents(
mem_budget,
&segment,
@@ -429,7 +434,7 @@ impl IndexWriter {
}
/// Accessor to the merge policy.
pub fn get_merge_policy(&self) -> Box<MergePolicy> {
pub fn get_merge_policy(&self) -> Arc<Box<MergePolicy>> {
self.segment_updater.get_merge_policy()
}
@@ -733,7 +738,7 @@ mod tests {
index_writer.add_document(doc!(text_field=>"b"));
index_writer.add_document(doc!(text_field=>"c"));
}
assert_eq!(index_writer.commit().unwrap(), 3u64);
assert!(index_writer.commit().is_ok());
index.load_searchers().unwrap();
assert_eq!(num_docs_containing("a"), 0);
assert_eq!(num_docs_containing("b"), 1);

View File

@@ -0,0 +1,64 @@
use census::{Inventory, TrackedObject};
use std::collections::HashSet;
use SegmentId;
#[derive(Default)]
pub struct MergeOperationInventory(Inventory<InnerMergeOperation>);
impl MergeOperationInventory {
pub fn segment_in_merge(&self) -> HashSet<SegmentId> {
let mut segment_in_merge = HashSet::default();
for merge_op in self.0.list() {
for &segment_id in &merge_op.segment_ids {
segment_in_merge.insert(segment_id);
}
}
segment_in_merge
}
}
/// A `MergeOperation` has two role.
/// It carries all of the information required to describe a merge :
/// - `target_opstamp` is the opstamp up to which we want to consume the
/// delete queue and reflect their deletes.
/// - `segment_ids` is the list of segment to be merged.
///
/// The second role is to ensure keep track of the fact that these
/// segments are in merge and avoid starting a merge operation that
/// may conflict with this one.
///
/// This works by tracking merge operations. When considering computing
/// merge candidates, we simply list tracked merge operations and remove
/// their segments from possible merge candidates.
pub struct MergeOperation {
inner: TrackedObject<InnerMergeOperation>,
}
struct InnerMergeOperation {
target_opstamp: u64,
segment_ids: Vec<SegmentId>,
}
impl MergeOperation {
pub fn new(
inventory: &MergeOperationInventory,
target_opstamp: u64,
segment_ids: Vec<SegmentId>,
) -> MergeOperation {
let inner_merge_operation = InnerMergeOperation {
target_opstamp,
segment_ids,
};
MergeOperation {
inner: inventory.0.track(inner_merge_operation),
}
}
pub fn target_opstamp(&self) -> u64 {
self.inner.target_opstamp
}
pub fn segment_ids(&self) -> &[SegmentId] {
&self.inner.segment_ids[..]
}
}

View File

@@ -11,7 +11,7 @@ pub struct MergeCandidate(pub Vec<SegmentId>);
///
/// Every time a the list of segments changes, the segment updater
/// asks the merge policy if some segments should be merged.
pub trait MergePolicy: MergePolicyClone + marker::Send + marker::Sync + Debug {
pub trait MergePolicy: marker::Send + marker::Sync + Debug {
/// Given the list of segment metas, returns the list of merge candidates.
///
/// This call happens on the segment updater thread, and will block
@@ -19,21 +19,6 @@ pub trait MergePolicy: MergePolicyClone + marker::Send + marker::Sync + Debug {
fn compute_merge_candidates(&self, segments: &[SegmentMeta]) -> Vec<MergeCandidate>;
}
/// MergePolicyClone
pub trait MergePolicyClone {
/// Returns a boxed clone of the MergePolicy.
fn box_clone(&self) -> Box<MergePolicy>;
}
impl<T> MergePolicyClone for T
where
T: 'static + MergePolicy + Clone,
{
fn box_clone(&self) -> Box<MergePolicy> {
Box::new(self.clone())
}
}
/// Never merge segments.
#[derive(Debug, Clone)]
pub struct NoMergePolicy;

View File

@@ -3,6 +3,7 @@ mod directory_lock;
mod doc_opstamp_mapping;
pub mod index_writer;
mod log_merge_policy;
mod merge_operation;
pub mod merge_policy;
pub mod merger;
pub mod operation;
@@ -17,12 +18,12 @@ mod stamper;
pub(crate) use self::directory_lock::DirectoryLock;
pub use self::directory_lock::LockType;
pub use self::index_writer::IndexWriter;
pub use self::log_merge_policy::LogMergePolicy;
pub use self::merge_operation::{MergeOperation, MergeOperationInventory};
pub use self::merge_policy::{MergeCandidate, MergePolicy, NoMergePolicy};
pub use self::prepared_commit::PreparedCommit;
pub use self::segment_entry::{SegmentEntry, SegmentState};
pub use self::segment_entry::SegmentEntry;
pub use self::segment_manager::SegmentManager;
pub use self::segment_serializer::SegmentSerializer;
pub use self::segment_writer::SegmentWriter;

View File

@@ -4,21 +4,6 @@ use core::SegmentMeta;
use indexer::delete_queue::DeleteCursor;
use std::fmt;
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum SegmentState {
Ready,
InMerge,
}
impl SegmentState {
pub fn letter_code(self) -> char {
match self {
SegmentState::InMerge => 'M',
SegmentState::Ready => 'R',
}
}
}
/// A segment entry describes the state of
/// a given segment, at a given instant.
///
@@ -35,7 +20,6 @@ impl SegmentState {
#[derive(Clone)]
pub struct SegmentEntry {
meta: SegmentMeta,
state: SegmentState,
delete_bitset: Option<BitSet>,
delete_cursor: DeleteCursor,
}
@@ -49,7 +33,6 @@ impl SegmentEntry {
) -> SegmentEntry {
SegmentEntry {
meta: segment_meta,
state: SegmentState::Ready,
delete_bitset,
delete_cursor,
}
@@ -72,14 +55,6 @@ impl SegmentEntry {
&mut self.delete_cursor
}
/// Return the `SegmentEntry`.
///
/// The state describes whether the segment is available for
/// a merge or not.
pub fn state(&self) -> SegmentState {
self.state
}
/// Returns the segment id.
pub fn segment_id(&self) -> SegmentId {
self.meta.id()
@@ -89,33 +64,10 @@ impl SegmentEntry {
pub fn meta(&self) -> &SegmentMeta {
&self.meta
}
/// Mark the `SegmentEntry` as in merge.
///
/// Only segments that are not already
/// in a merge are elligible for future merge.
pub fn start_merge(&mut self) {
self.state = SegmentState::InMerge;
}
/// Cancel a merge
///
/// If a merge fails, it is important to switch
/// the segment back to a idle state, so that it
/// may be elligible for future merges.
pub fn cancel_merge(&mut self) {
self.state = SegmentState::Ready;
}
/// Returns true iff a segment should
/// be considered for a merge.
pub fn is_ready(&self) -> bool {
self.state == SegmentState::Ready
}
}
impl fmt::Debug for SegmentEntry {
fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
write!(formatter, "SegmentEntry({:?}, {:?})", self.meta, self.state)
write!(formatter, "SegmentEntry({:?})", self.meta)
}
}

View File

@@ -16,7 +16,6 @@ use Result as TantivyResult;
struct SegmentRegisters {
uncommitted: SegmentRegister,
committed: SegmentRegister,
writing: HashSet<SegmentId>,
}
/// The segment manager stores the list of segments
@@ -41,12 +40,17 @@ impl Debug for SegmentManager {
}
pub fn get_mergeable_segments(
in_merge_segment_ids: &HashSet<SegmentId>,
segment_manager: &SegmentManager,
) -> (Vec<SegmentMeta>, Vec<SegmentMeta>) {
let registers_lock = segment_manager.read();
(
registers_lock.committed.get_mergeable_segments(),
registers_lock.uncommitted.get_mergeable_segments(),
registers_lock
.committed
.get_mergeable_segments(in_merge_segment_ids),
registers_lock
.uncommitted
.get_mergeable_segments(in_merge_segment_ids),
)
}
@@ -59,7 +63,6 @@ impl SegmentManager {
registers: RwLock::new(SegmentRegisters {
uncommitted: SegmentRegister::default(),
committed: SegmentRegister::new(segment_metas, delete_cursor),
writing: HashSet::new(),
}),
}
}
@@ -72,12 +75,6 @@ impl SegmentManager {
segment_entries
}
/// Returns the overall number of segments in the `SegmentManager`
pub fn num_segments(&self) -> usize {
let registers_lock = self.read();
registers_lock.committed.len() + registers_lock.uncommitted.len()
}
/// List the files that are useful to the index.
///
/// This does not include lock files, or files that are obsolete
@@ -136,25 +133,22 @@ impl SegmentManager {
/// the `segment_ids` are not either all committed or all
/// uncommitted.
pub fn start_merge(&self, segment_ids: &[SegmentId]) -> TantivyResult<Vec<SegmentEntry>> {
let mut registers_lock = self.write();
let registers_lock = self.read();
let mut segment_entries = vec![];
if registers_lock.uncommitted.contains_all(segment_ids) {
for segment_id in segment_ids {
let segment_entry = registers_lock.uncommitted
.start_merge(segment_id)
.get(segment_id)
.expect("Segment id not found {}. Should never happen because of the contains all if-block.");
segment_entries.push(segment_entry);
}
} else if registers_lock.committed.contains_all(segment_ids) {
for segment_id in segment_ids {
let segment_entry = registers_lock.committed
.start_merge(segment_id)
.get(segment_id)
.expect("Segment id not found {}. Should never happen because of the contains all if-block.");
segment_entries.push(segment_entry);
}
for segment_id in segment_ids {
registers_lock.committed.start_merge(segment_id);
}
} else {
let error_msg = "Merge operation sent for segments that are not \
all uncommited or commited."
@@ -164,50 +158,8 @@ impl SegmentManager {
Ok(segment_entries)
}
pub fn cancel_merge(
&self,
before_merge_segment_ids: &[SegmentId],
after_merge_segment_id: SegmentId,
) {
let mut registers_lock = self.write();
// we mark all segments are ready for merge.
{
let target_segment_register: &mut SegmentRegister;
target_segment_register = {
if registers_lock
.uncommitted
.contains_all(before_merge_segment_ids)
{
&mut registers_lock.uncommitted
} else if registers_lock
.committed
.contains_all(before_merge_segment_ids)
{
&mut registers_lock.committed
} else {
warn!("couldn't find segment in SegmentManager");
return;
}
};
for segment_id in before_merge_segment_ids {
target_segment_register.cancel_merge(segment_id);
}
}
// ... and we make sure the target segment entry
// can be garbage collected.
registers_lock.writing.remove(&after_merge_segment_id);
}
pub fn write_segment(&self, segment_id: SegmentId) {
let mut registers_lock = self.write();
registers_lock.writing.insert(segment_id);
}
pub fn add_segment(&self, segment_entry: SegmentEntry) {
let mut registers_lock = self.write();
registers_lock.writing.remove(&segment_entry.segment_id());
registers_lock.uncommitted.add_segment_entry(segment_entry);
}
@@ -217,10 +169,6 @@ impl SegmentManager {
after_merge_segment_entry: SegmentEntry,
) {
let mut registers_lock = self.write();
registers_lock
.writing
.remove(&after_merge_segment_entry.segment_id());
let target_register: &mut SegmentRegister = {
if registers_lock
.uncommitted

View File

@@ -3,6 +3,7 @@ use core::SegmentMeta;
use indexer::delete_queue::DeleteCursor;
use indexer::segment_entry::SegmentEntry;
use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt::{self, Debug, Formatter};
/// The segment register keeps track
@@ -21,8 +22,8 @@ pub struct SegmentRegister {
impl Debug for SegmentRegister {
fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
write!(f, "SegmentRegister(")?;
for (k, v) in &self.segment_states {
write!(f, "{}:{}, ", k.short_uuid_string(), v.state().letter_code())?;
for k in self.segment_states.keys() {
write!(f, "{}, ", k.short_uuid_string())?;
}
write!(f, ")")?;
Ok(())
@@ -34,14 +35,13 @@ impl SegmentRegister {
self.segment_states.clear();
}
pub fn len(&self) -> usize {
self.segment_states.len()
}
pub fn get_mergeable_segments(&self) -> Vec<SegmentMeta> {
pub fn get_mergeable_segments(
&self,
in_merge_segment_ids: &HashSet<SegmentId>,
) -> Vec<SegmentMeta> {
self.segment_states
.values()
.filter(|segment_entry| segment_entry.is_ready())
.filter(|segment_entry| !in_merge_segment_ids.contains(&segment_entry.segment_id()))
.map(|segment_entry| segment_entry.meta().clone())
.collect()
}
@@ -60,7 +60,7 @@ impl SegmentRegister {
segment_ids
}
pub fn contains_all(&mut self, segment_ids: &[SegmentId]) -> bool {
pub fn contains_all(&self, segment_ids: &[SegmentId]) -> bool {
segment_ids
.iter()
.all(|segment_id| self.segment_states.contains_key(segment_id))
@@ -75,20 +75,10 @@ impl SegmentRegister {
self.segment_states.remove(segment_id);
}
pub fn cancel_merge(&mut self, segment_id: &SegmentId) {
pub fn get(&self, segment_id: &SegmentId) -> Option<SegmentEntry> {
self.segment_states
.get_mut(segment_id)
.expect("Received a merge notification for a segment that is not registered")
.cancel_merge();
}
pub fn start_merge(&mut self, segment_id: &SegmentId) -> Option<SegmentEntry> {
if let Some(segment_entry) = self.segment_states.get_mut(segment_id) {
segment_entry.start_merge();
Some(segment_entry.clone())
} else {
None
}
.get(segment_id)
.map(|segment_entry| segment_entry.clone())
}
pub fn new(segment_metas: Vec<SegmentMeta>, delete_cursor: &DeleteCursor) -> SegmentRegister {
@@ -100,11 +90,6 @@ impl SegmentRegister {
}
SegmentRegister { segment_states }
}
#[cfg(test)]
pub fn segment_entry(&self, segment_id: &SegmentId) -> Option<SegmentEntry> {
self.segment_states.get(segment_id).cloned()
}
}
#[cfg(test)]
@@ -113,7 +98,6 @@ mod tests {
use core::SegmentId;
use core::SegmentMeta;
use indexer::delete_queue::*;
use indexer::SegmentState;
fn segment_ids(segment_register: &SegmentRegister) -> Vec<SegmentId> {
segment_register
@@ -137,42 +121,12 @@ mod tests {
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None);
segment_register.add_segment_entry(segment_entry);
}
assert_eq!(
segment_register
.segment_entry(&segment_id_a)
.unwrap()
.state(),
SegmentState::Ready
);
assert_eq!(segment_ids(&segment_register), vec![segment_id_a]);
{
let segment_meta = SegmentMeta::new(segment_id_b, 0u32);
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None);
segment_register.add_segment_entry(segment_entry);
}
assert_eq!(
segment_register
.segment_entry(&segment_id_b)
.unwrap()
.state(),
SegmentState::Ready
);
segment_register.start_merge(&segment_id_a);
segment_register.start_merge(&segment_id_b);
assert_eq!(
segment_register
.segment_entry(&segment_id_a)
.unwrap()
.state(),
SegmentState::InMerge
);
assert_eq!(
segment_register
.segment_entry(&segment_id_b)
.unwrap()
.state(),
SegmentState::InMerge
);
segment_register.remove_segment(&segment_id_a);
segment_register.remove_segment(&segment_id_b);
{

View File

@@ -16,8 +16,10 @@ use futures_cpupool::CpuFuture;
use futures_cpupool::CpuPool;
use indexer::delete_queue::DeleteCursor;
use indexer::index_writer::advance_deletes;
use indexer::merge_operation::MergeOperationInventory;
use indexer::merger::IndexMerger;
use indexer::stamper::Stamper;
use indexer::MergeOperation;
use indexer::SegmentEntry;
use indexer::SegmentSerializer;
use indexer::{DefaultMergePolicy, MergePolicy};
@@ -25,6 +27,7 @@ use schema::Schema;
use serde_json;
use std::borrow::BorrowMut;
use std::collections::HashMap;
use std::collections::HashSet;
use std::io::Write;
use std::mem;
use std::ops::DerefMut;
@@ -66,13 +69,8 @@ pub fn save_new_metas(schema: Schema, directory: &mut Directory) -> Result<()> {
///
/// This method is not part of tantivy's public API
fn save_metas(metas: &IndexMeta, directory: &mut Directory) -> Result<()> {
// let metas = IndexMeta {
// segments: segment_metas,
// schema,
// opstamp,
// payload,
// };
let mut buffer = serde_json::to_vec_pretty(metas)?;
// Just adding a new line at the end of the buffer.
writeln!(&mut buffer)?;
directory.atomic_write(&META_FILEPATH, &buffer[..])?;
debug!("Saved metas {:?}", serde_json::to_string_pretty(&metas));
@@ -84,21 +82,21 @@ fn save_metas(metas: &IndexMeta, directory: &mut Directory) -> Result<()> {
//
// All this processing happens on a single thread
// consuming a common queue.
//
// We voluntarily pass a merge_operation ref to guarantee that
// the merge_operation is alive during the process
#[derive(Clone)]
pub struct SegmentUpdater(Arc<InnerSegmentUpdater>);
struct MergeOperation {
pub target_opstamp: u64,
pub segment_ids: Vec<SegmentId>,
}
fn perform_merge(
merge_operation: &MergeOperation,
index: &Index,
mut segment_entries: Vec<SegmentEntry>,
mut merged_segment: Segment,
target_opstamp: u64,
) -> Result<SegmentEntry> {
let target_opstamp = merge_operation.target_opstamp();
// first we need to apply deletes to our segment.
let mut merged_segment = index.new_segment();
// TODO add logging
let schema = index.schema();
@@ -142,12 +140,13 @@ struct InnerSegmentUpdater {
pool: CpuPool,
index: Index,
segment_manager: SegmentManager,
merge_policy: RwLock<Box<MergePolicy>>,
merge_policy: RwLock<Arc<Box<MergePolicy>>>,
merging_thread_id: AtomicUsize,
merging_threads: RwLock<HashMap<usize, JoinHandle<Result<()>>>>,
generation: AtomicUsize,
killed: AtomicBool,
stamper: Stamper,
merge_operations: MergeOperationInventory,
}
impl SegmentUpdater {
@@ -168,28 +167,23 @@ impl SegmentUpdater {
pool,
index,
segment_manager,
merge_policy: RwLock::new(Box::new(DefaultMergePolicy::default())),
merge_policy: RwLock::new(Arc::new(Box::new(DefaultMergePolicy::default()))),
merging_thread_id: AtomicUsize::default(),
merging_threads: RwLock::new(HashMap::new()),
generation: AtomicUsize::default(),
killed: AtomicBool::new(false),
stamper,
merge_operations: Default::default(),
})))
}
pub fn new_segment(&self) -> Segment {
let new_segment = self.0.index.new_segment();
let segment_id = new_segment.id();
self.0.segment_manager.write_segment(segment_id);
new_segment
}
pub fn get_merge_policy(&self) -> Box<MergePolicy> {
self.0.merge_policy.read().unwrap().box_clone()
pub fn get_merge_policy(&self) -> Arc<Box<MergePolicy>> {
self.0.merge_policy.read().unwrap().clone()
}
pub fn set_merge_policy(&self, merge_policy: Box<MergePolicy>) {
*self.0.merge_policy.write().unwrap() = merge_policy;
let arc_merge_policy = Arc::new(merge_policy);
*self.0.merge_policy.write().unwrap() = arc_merge_policy;
}
fn get_merging_thread_id(&self) -> usize {
@@ -302,12 +296,14 @@ impl SegmentUpdater {
}
pub fn start_merge(&self, segment_ids: &[SegmentId]) -> Result<Receiver<SegmentMeta>> {
let segment_ids_vec = segment_ids.to_vec();
let commit_opstamp = self.load_metas().opstamp;
self.run_async(move |segment_updater| {
segment_updater.start_merge_impl(&segment_ids_vec[..], commit_opstamp)
})
.wait()?
let merge_operation = MergeOperation::new(
&self.0.merge_operations,
commit_opstamp,
segment_ids.to_vec(),
);
self.run_async(move |segment_updater| segment_updater.start_merge_impl(merge_operation))
.wait()?
}
fn store_meta(&self, index_meta: &IndexMeta) {
@@ -318,22 +314,25 @@ impl SegmentUpdater {
}
// `segment_ids` is required to be non-empty.
fn start_merge_impl(
&self,
segment_ids: &[SegmentId],
target_opstamp: u64,
) -> Result<Receiver<SegmentMeta>> {
assert!(!segment_ids.is_empty(), "Segment_ids cannot be empty.");
fn start_merge_impl(&self, merge_operation: MergeOperation) -> Result<Receiver<SegmentMeta>> {
assert!(
!merge_operation.segment_ids().is_empty(),
"Segment_ids cannot be empty."
);
let segment_updater_clone = self.clone();
let segment_entries: Vec<SegmentEntry> = self.0.segment_manager.start_merge(segment_ids)?;
let segment_entries: Vec<SegmentEntry> = self
.0
.segment_manager
.start_merge(merge_operation.segment_ids())?;
let segment_ids_vec = segment_ids.to_vec();
// let segment_ids_vec = merge_operation.segment_ids.to_vec();
let merging_thread_id = self.get_merging_thread_id();
info!(
"Starting merge thread #{} - {:?}",
merging_thread_id, segment_ids
merging_thread_id,
merge_operation.segment_ids()
);
let (merging_future_send, merging_future_recv) = oneshot();
@@ -342,20 +341,17 @@ impl SegmentUpdater {
.name(format!("mergingthread-{}", merging_thread_id))
.spawn(move || {
// first we need to apply deletes to our segment.
let merged_segment = segment_updater_clone.new_segment();
let merged_segment_id = merged_segment.id();
let merge_result = perform_merge(
&merge_operation,
&segment_updater_clone.0.index,
segment_entries,
merged_segment,
target_opstamp,
);
match merge_result {
Ok(after_merge_segment_entry) => {
let merged_segment_meta = after_merge_segment_entry.meta().clone();
segment_updater_clone
.end_merge(segment_ids_vec, after_merge_segment_entry)
.end_merge(merge_operation, after_merge_segment_entry)
.expect("Segment updater thread is corrupted.");
// the future may fail if the listener of the oneshot future
@@ -366,13 +362,18 @@ impl SegmentUpdater {
let _merging_future_res = merging_future_send.send(merged_segment_meta);
}
Err(e) => {
warn!("Merge of {:?} was cancelled: {:?}", segment_ids_vec, e);
warn!(
"Merge of {:?} was cancelled: {:?}",
merge_operation.segment_ids(),
e
);
// ... cancel merge
if cfg!(test) {
panic!("Merge failed.");
}
segment_updater_clone.cancel_merge(&segment_ids_vec, merged_segment_id);
// merging_future_send will be dropped, sending an error to the future.
// As `merge_operation` will be dropped, the segment in merge state will
// be available for merge again.
// `merging_future_send` will be dropped, sending an error to the future.
}
}
segment_updater_clone
@@ -393,37 +394,34 @@ impl SegmentUpdater {
}
fn consider_merge_options(&self) {
let merge_segment_ids: HashSet<SegmentId> = self.0.merge_operations.segment_in_merge();
let (committed_segments, uncommitted_segments) =
get_mergeable_segments(&self.0.segment_manager);
get_mergeable_segments(&merge_segment_ids, &self.0.segment_manager);
// Committed segments cannot be merged with uncommitted_segments.
// We therefore consider merges using these two sets of segments independently.
let merge_policy = self.get_merge_policy();
let current_opstamp = self.0.stamper.stamp();
let mut merge_candidates = merge_policy
let mut merge_candidates: Vec<MergeOperation> = merge_policy
.compute_merge_candidates(&uncommitted_segments)
.into_iter()
.map(|merge_candidate| MergeOperation {
target_opstamp: current_opstamp,
segment_ids: merge_candidate.0,
.map(|merge_candidate| {
MergeOperation::new(&self.0.merge_operations, current_opstamp, merge_candidate.0)
})
.collect::<Vec<_>>();
.collect();
let commit_opstamp = self.load_metas().opstamp;
let committed_merge_candidates = merge_policy
.compute_merge_candidates(&committed_segments)
.into_iter()
.map(|merge_candidate| MergeOperation {
target_opstamp: commit_opstamp,
segment_ids: merge_candidate.0,
.map(|merge_candidate| {
MergeOperation::new(&self.0.merge_operations, commit_opstamp, merge_candidate.0)
})
.collect::<Vec<_>>();
merge_candidates.extend(committed_merge_candidates.into_iter());
for MergeOperation {
target_opstamp,
segment_ids,
} in merge_candidates
{
match self.start_merge_impl(&segment_ids, target_opstamp) {
for merge_operation in merge_candidates {
match self.start_merge_impl(merge_operation) {
Ok(merge_future) => {
if let Err(e) = merge_future.fuse().poll() {
error!("The merge task failed quickly after starting: {:?}", e);
@@ -439,19 +437,9 @@ impl SegmentUpdater {
}
}
fn cancel_merge(
&self,
before_merge_segment_ids: &[SegmentId],
after_merge_segment_entry: SegmentId,
) {
self.0
.segment_manager
.cancel_merge(before_merge_segment_ids, after_merge_segment_entry);
}
fn end_merge(
&self,
before_merge_segment_ids: Vec<SegmentId>,
merge_operation: MergeOperation,
mut after_merge_segment_entry: SegmentEntry,
) -> Result<()> {
self.run_async(move |segment_updater| {
@@ -467,16 +455,15 @@ impl SegmentUpdater {
{
error!(
"Merge of {:?} was cancelled (advancing deletes failed): {:?}",
before_merge_segment_ids, e
merge_operation.segment_ids(),
e
);
// ... cancel merge
if cfg!(test) {
panic!("Merge failed.");
}
segment_updater.cancel_merge(
&before_merge_segment_ids,
after_merge_segment_entry.segment_id(),
);
// ... cancel merge
// `merge_operations` are tracked. As it is dropped, the
// the segment_ids will be available again for merge.
return;
}
}
@@ -484,7 +471,7 @@ impl SegmentUpdater {
segment_updater
.0
.segment_manager
.end_merge(&before_merge_segment_ids, after_merge_segment_entry);
.end_merge(merge_operation.segment_ids(), after_merge_segment_entry);
segment_updater.consider_merge_options();
info!("save metas");
let previous_metas = segment_updater.load_metas();
@@ -510,32 +497,25 @@ impl SegmentUpdater {
/// Obsolete files will eventually be cleaned up
/// by the directory garbage collector.
pub fn wait_merging_thread(&self) -> Result<()> {
let mut num_segments: usize;
loop {
num_segments = self.0.segment_manager.num_segments();
let mut new_merging_threads = HashMap::new();
{
let merging_threads: HashMap<usize, JoinHandle<Result<()>>> = {
let mut merging_threads = self.0.merging_threads.write().unwrap();
mem::swap(&mut new_merging_threads, merging_threads.deref_mut());
mem::replace(merging_threads.deref_mut(), HashMap::new())
};
if merging_threads.is_empty() {
return Ok(());
}
debug!("wait merging thread {}", new_merging_threads.len());
for (_, merging_thread_handle) in new_merging_threads {
debug!("wait merging thread {}", merging_threads.len());
for (_, merging_thread_handle) in merging_threads {
merging_thread_handle
.join()
.map(|_| ())
.map_err(|_| TantivyError::ErrorInThread("Merging thread failed.".into()))?;
}
// Our merging thread may have queued their completed
// Our merging thread may have queued their completed merged segment.
// Let's wait for that too.
self.run_async(move |_| {}).wait()?;
let new_num_segments = self.0.segment_manager.num_segments();
if new_num_segments >= num_segments {
break;
}
}
Ok(())
}
}

View File

@@ -50,7 +50,6 @@ where
self.token_mut().text.make_ascii_lowercase();
} else {
to_lowercase_unicode(&mut self.tail.token_mut().text, &mut self.buffer);
mem::swap(&mut self.tail.token_mut().text, &mut self.buffer);
}
true