From 176f67a26654878b6924ae68986b405cb2f3db84 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Wed, 23 Jan 2019 09:53:33 +0900 Subject: [PATCH] Refactoring --- src/core/index.rs | 2 +- src/indexer/segment_updater.rs | 75 +++++++++++++++++++++------------- src/indexer/stamper.rs | 62 ++++++++++++++++++---------- 3 files changed, 88 insertions(+), 51 deletions(-) diff --git a/src/core/index.rs b/src/core/index.rs index 5c14f6dd1..3cf3657e9 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -150,7 +150,7 @@ impl Index { /// /// This will overwrite existing meta.json fn from_directory(mut directory: ManagedDirectory, schema: Schema) -> Result { - save_new_metas(schema.clone(), 0, directory.borrow_mut())?; + save_new_metas(schema.clone(), directory.borrow_mut())?; let metas = IndexMeta::with_schema(schema); Index::create_from_metas(directory, &metas) } diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index 7873b8c1b..13b1f33c0 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -30,7 +30,6 @@ use std::mem; use std::ops::DerefMut; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; -use std::sync::Mutex; use std::sync::RwLock; use std::thread; use std::thread::JoinHandle; @@ -45,8 +44,15 @@ use Result; /// and flushed. /// /// This method is not part of tantivy's public API -pub fn save_new_metas(schema: Schema, opstamp: u64, directory: &mut Directory) -> Result<()> { - save_metas(vec![], schema, opstamp, None, directory) +pub fn save_new_metas(schema: Schema, directory: &mut Directory) -> Result<()> { + save_metas( + &IndexMeta { + segments: Vec::new(), + schema, + opstamp: 0u64, + payload: None + }, + directory) } /// Save the index meta file. @@ -58,20 +64,17 @@ pub fn save_new_metas(schema: Schema, opstamp: u64, directory: &mut Directory) - /// and flushed. /// /// This method is not part of tantivy's public API -pub fn save_metas( - segment_metas: Vec, - schema: Schema, - opstamp: u64, - payload: Option, +fn save_metas( + metas: &IndexMeta, directory: &mut Directory, ) -> Result<()> { - let metas = IndexMeta { - segments: segment_metas, - schema, - opstamp, - payload, - }; - let mut buffer = serde_json::to_vec_pretty(&metas)?; +// let metas = IndexMeta { +// segments: segment_metas, +// schema, +// opstamp, +// payload, +// }; + let mut buffer = serde_json::to_vec_pretty(metas)?; writeln!(&mut buffer)?; directory.atomic_write(&META_FILEPATH, &buffer[..])?; debug!("Saved metas {:?}", serde_json::to_string_pretty(&metas)); @@ -131,7 +134,13 @@ fn perform_merge( } struct InnerSegmentUpdater { - commit_opstamp: Mutex, //< Current commit opstamp. + // we keep a copy of the current active IndexMeta to + // avoid loading the file everytime we need it in the + // `SegmentUpdater`. + // + // This should be up to date as all update happen through + // the unique active `SegmentUpdater`. + active_metas: RwLock>, pool: CpuPool, index: Index, segment_manager: SegmentManager, @@ -155,9 +164,9 @@ impl SegmentUpdater { .name_prefix("segment_updater") .pool_size(1) .create(); - let commit_opstamp = index.load_metas().expect("Failed to read opstamp").opstamp; + let index_meta = index.load_metas()?; Ok(SegmentUpdater(Arc::new(InnerSegmentUpdater { - commit_opstamp: Mutex::new(commit_opstamp), + active_metas: RwLock::new(Arc::new(index_meta)), pool, index, segment_manager, @@ -252,15 +261,18 @@ impl SegmentUpdater { // // Segment 1 from disk 1, Segment 1 from disk 2, etc. commited_segment_metas.sort_by_key(|segment_meta| -(segment_meta.max_doc() as i32)); - save_metas( - commited_segment_metas, - index.schema(), + let index_meta = IndexMeta { + segments: commited_segment_metas, + schema: index.schema(), opstamp, - commit_message, + payload: commit_message + }; + save_metas( + &index_meta, directory.box_clone().borrow_mut(), ) .expect("Could not save metas."); - *self.0.commit_opstamp.lock().unwrap() = opstamp; + self.store_meta(&index_meta); } } @@ -296,13 +308,20 @@ impl SegmentUpdater { pub fn start_merge(&self, segment_ids: &[SegmentId]) -> Result> { let segment_ids_vec = segment_ids.to_vec(); - let commit_opstamp = *self.0.commit_opstamp.lock().unwrap(); + let commit_opstamp = self.load_metas().opstamp; self.run_async(move |segment_updater| { segment_updater.start_merge_impl(&segment_ids_vec[..], commit_opstamp) }) .wait()? } + fn store_meta(&self, index_meta: &IndexMeta) { + *self.0.active_metas.write().unwrap() = Arc::new(index_meta.clone()); + } + fn load_metas(&self) -> Arc { + self.0.active_metas.read().unwrap().clone() + } + // `segment_ids` is required to be non-empty. fn start_merge_impl( &self, @@ -394,7 +413,7 @@ impl SegmentUpdater { segment_ids: merge_candidate.0, }) .collect::>(); - let commit_opstamp = *self.0.commit_opstamp.lock().unwrap(); + let commit_opstamp = self.load_metas().opstamp; let committed_merge_candidates = merge_policy .compute_merge_candidates(&committed_segments) .into_iter() @@ -444,7 +463,7 @@ impl SegmentUpdater { info!("End merge {:?}", after_merge_segment_entry.meta()); let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone(); if let Some(delete_operation) = delete_cursor.get() { - let committed_opstamp = *segment_updater.0.commit_opstamp.lock().unwrap(); + let committed_opstamp = segment_updater.load_metas().opstamp; if delete_operation.opstamp < committed_opstamp { let index = &segment_updater.0.index; let segment = index.segment(after_merge_segment_entry.meta().clone()); @@ -473,8 +492,8 @@ impl SegmentUpdater { .end_merge(&before_merge_segment_ids, after_merge_segment_entry); segment_updater.consider_merge_options(); info!("save metas"); - let previous_metas = segment_updater.0.index.load_metas().unwrap(); - segment_updater.save_metas(previous_metas.opstamp, previous_metas.payload); + let previous_metas = segment_updater.load_metas(); + segment_updater.save_metas(previous_metas.opstamp, previous_metas.payload.clone()); segment_updater.garbage_collect_files_exec(); }) .wait() diff --git a/src/indexer/stamper.rs b/src/indexer/stamper.rs index 430607032..f3fd1f943 100644 --- a/src/indexer/stamper.rs +++ b/src/indexer/stamper.rs @@ -1,50 +1,68 @@ +use std::sync::Arc; +use std::sync::atomic::Ordering; + + // AtomicU64 have not landed in stable. // For the moment let's just use AtomicUsize on // x86/64 bit platform, and a mutex on other platform. - -#[cfg(target = "x86_64")] +#[cfg(target_arch = "x86_64")] mod archicture_impl { use std::sync::atomic::{AtomicUsize, Ordering}; - use std::sync::Arc; - #[derive(Clone, Default)] - pub struct Stamper(Arc); + #[derive(Default)] + pub struct AtomicU64Ersatz(AtomicUsize); - impl Stamper { - pub fn new(first_opstamp: u64) -> Stamper { - Stamper(Arc::new(AtomicU64::new(first_opstamp))) + impl AtomicU64Ersatz { + pub fn new(first_opstamp: u64) -> AtomicU64Ersatz { + AtomicU64Ersatz(AtomicUsize::new(first_opstamp as usize)) } - pub fn stamp(&self) -> u64 { - self.0.fetch_add(1u64, Ordering::SeqCst) as u64 + pub fn fetch_add(&self, val: u64, order: Ordering) -> u64 { + self.0.fetch_add(val as usize, order) as u64 } } } -#[cfg(not(target = "x86_64"))] +#[cfg(not(target_arch = "x86_64"))] mod archicture_impl { - use std::sync::{Arc, Mutex}; + /// Under other architecture, we rely on a mutex. + use std::sync::Mutex; + use std::sync::atomic::Ordering; - #[derive(Clone, Default)] - pub struct Stamper(Arc>); + #[derive(Default)] + pub struct AtomicU64Ersatz(Mutex); - impl Stamper { - pub fn new(first_opstamp: u64) -> Stamper { - Stamper(Arc::new(Mutex::new(first_opstamp))) + impl AtomicU64Ersatz { + pub fn new(first_opstamp: u64) -> AtomicU64Ersatz { + AtomicU64Ersatz(AtomicUsize::new(first_opstamp)) } - pub fn stamp(&self) -> u64 { - let mut guard = self.0.lock().expect("Failed to lock the stamper"); - let previous_val = *guard; - *guard = previous_val + 1; + pub fn fetch_add(&self, val: u64, _order: Ordering) -> u64 { + let lock = self.0.lock().unwrap(); + let previous_val = *lock; + *lock = previous_val + 1; previous_val } } } -pub use self::archicture_impl::Stamper; +use self::archicture_impl::AtomicU64Ersatz; + +#[derive(Clone, Default)] +pub struct Stamper(Arc); + +impl Stamper { + pub fn new(first_opstamp: u64) -> Stamper { + Stamper(Arc::new(AtomicU64Ersatz::new(first_opstamp))) + } + + pub fn stamp(&self) -> u64 { + self.0.fetch_add(1u64, Ordering::SeqCst) as u64 + } +} + #[cfg(test)] mod test {