From 42756c7474317c2b1aa0610278b54c0bb0c70fb9 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Fri, 15 Nov 2019 18:35:31 +0900 Subject: [PATCH] Removing futures-cpupool and upgrading to futures-0.3 --- Cargo.toml | 3 +- src/core/index.rs | 71 +++++---- src/directory/managed_directory.rs | 48 ++++-- src/directory/mmap_directory.rs | 15 +- src/directory/mod.rs | 15 ++ src/directory/ram_directory.rs | 2 +- src/directory/tests.rs | 49 +++--- src/directory/watch_event_router.rs | 68 +++++---- src/indexer/index_writer.rs | 44 +++--- src/indexer/merger.rs | 79 ++-------- src/indexer/segment_updater.rs | 222 ++++++++++++++++------------ src/reader/mod.rs | 5 + src/reader/pool.rs | 66 +++++++-- tests/failpoints/mod.rs | 4 +- 14 files changed, 375 insertions(+), 316 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a63ac511a..4048a9354 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,8 +36,7 @@ notify = {version="4", optional=true} bit-set = "0.5" uuid = { version = "0.8", features = ["v4", "serde"] } crossbeam = "0.7" -futures = "0.1" -futures-cpupool = "0.1" +futures = {version = "0.3", features=["thread-pool"] } owning_ref = "0.4" stable_deref_trait = "1.0.0" rust-stemmers = "1.1" diff --git a/src/core/index.rs b/src/core/index.rs index e908dc953..d5eca574a 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -388,12 +388,9 @@ mod tests { use crate::directory::RAMDirectory; use crate::schema::Field; use crate::schema::{Schema, INDEXED, TEXT}; - use crate::Index; use crate::IndexReader; - use crate::IndexWriter; use crate::ReloadPolicy; - use std::thread; - use std::time::Duration; + use crate::{Directory, Index}; #[test] fn test_indexer_for_field() { @@ -471,14 +468,14 @@ mod tests { .try_into() .unwrap(); assert_eq!(reader.searcher().num_docs(), 0); - let mut writer = index.writer_with_num_threads(1, 3_000_000).unwrap(); - test_index_on_commit_reload_policy_aux(field, &mut writer, &reader); + test_index_on_commit_reload_policy_aux(field, &index, &reader); } #[cfg(feature = "mmap")] mod mmap_specific { use super::*; + use crate::Directory; use std::path::PathBuf; use tempfile::TempDir; @@ -489,22 +486,20 @@ mod tests { let tempdir = TempDir::new().unwrap(); let tempdir_path = PathBuf::from(tempdir.path()); let index = Index::create_in_dir(&tempdir_path, schema).unwrap(); - let mut writer = index.writer_with_num_threads(1, 3_000_000).unwrap(); - writer.commit().unwrap(); let reader = index .reader_builder() .reload_policy(ReloadPolicy::OnCommit) .try_into() .unwrap(); assert_eq!(reader.searcher().num_docs(), 0); - test_index_on_commit_reload_policy_aux(field, &mut writer, &reader); + test_index_on_commit_reload_policy_aux(field, &index, &reader); } #[test] fn test_index_manual_policy_mmap() { let schema = throw_away_schema(); let field = schema.get_field("num_likes").unwrap(); - let index = Index::create_from_tempdir(schema).unwrap(); + let mut index = Index::create_from_tempdir(schema).unwrap(); let mut writer = index.writer_with_num_threads(1, 3_000_000).unwrap(); writer.commit().unwrap(); let reader = index @@ -514,8 +509,12 @@ mod tests { .unwrap(); assert_eq!(reader.searcher().num_docs(), 0); writer.add_document(doc!(field=>1u64)); + let (sender, receiver) = crossbeam::channel::unbounded(); + let _handle = index.directory_mut().watch(Box::new(move || { + let _ = sender.send(()); + })); writer.commit().unwrap(); - thread::sleep(Duration::from_millis(500)); + assert!(receiver.recv().is_ok()); assert_eq!(reader.searcher().num_docs(), 0); reader.reload().unwrap(); assert_eq!(reader.searcher().num_docs(), 1); @@ -535,39 +534,26 @@ mod tests { .try_into() .unwrap(); assert_eq!(reader.searcher().num_docs(), 0); - let mut writer = write_index.writer_with_num_threads(1, 3_000_000).unwrap(); - test_index_on_commit_reload_policy_aux(field, &mut writer, &reader); + test_index_on_commit_reload_policy_aux(field, &write_index, &reader); } } - fn test_index_on_commit_reload_policy_aux( - field: Field, - writer: &mut IndexWriter, - reader: &IndexReader, - ) { + fn test_index_on_commit_reload_policy_aux(field: Field, index: &Index, reader: &IndexReader) { + let mut reader_index = reader.index(); + let (sender, receiver) = crossbeam::channel::unbounded(); + let _watch_handle = reader_index.directory_mut().watch(Box::new(move || { + let _ = sender.send(()); + })); + let mut writer = index.writer_with_num_threads(1, 3_000_000).unwrap(); assert_eq!(reader.searcher().num_docs(), 0); writer.add_document(doc!(field=>1u64)); writer.commit().unwrap(); - let mut count = 0; - for _ in 0..100 { - count = reader.searcher().num_docs(); - if count > 0 { - break; - } - thread::sleep(Duration::from_millis(100)); - } - assert_eq!(count, 1); + assert!(receiver.recv().is_ok()); + assert_eq!(reader.searcher().num_docs(), 1); writer.add_document(doc!(field=>2u64)); writer.commit().unwrap(); - let mut count = 0; - for _ in 0..10 { - count = reader.searcher().num_docs(); - if count > 1 { - break; - } - thread::sleep(Duration::from_millis(100)); - } - assert_eq!(count, 2); + assert!(receiver.recv().is_ok()); + assert_eq!(reader.searcher().num_docs(), 2); } // This test will not pass on windows, because windows @@ -584,9 +570,13 @@ mod tests { for i in 0u64..8_000u64 { writer.add_document(doc!(field => i)); } + let (sender, receiver) = crossbeam::channel::unbounded(); + let _handle = directory.watch(Box::new(move || { + let _ = sender.send(()); + })); writer.commit().unwrap(); let mem_right_after_commit = directory.total_mem_usage(); - thread::sleep(Duration::from_millis(1_000)); + assert!(receiver.recv().is_ok()); let reader = index .reader_builder() .reload_policy(ReloadPolicy::Manual) @@ -600,6 +590,11 @@ mod tests { reader.reload().unwrap(); let searcher = reader.searcher(); assert_eq!(searcher.num_docs(), 8_000); - assert!(mem_right_after_merge_finished < mem_right_after_commit); + assert!( + mem_right_after_merge_finished < mem_right_after_commit, + "(mem after merge){} is expected < (mem before merge){}", + mem_right_after_merge_finished, + mem_right_after_commit + ); } } diff --git a/src/directory/managed_directory.rs b/src/directory/managed_directory.rs index 2954b48de..8351136f7 100644 --- a/src/directory/managed_directory.rs +++ b/src/directory/managed_directory.rs @@ -2,6 +2,7 @@ use crate::core::MANAGED_FILEPATH; use crate::directory::error::{DeleteError, IOError, LockError, OpenReadError, OpenWriteError}; use crate::directory::footer::{Footer, FooterProxy}; use crate::directory::DirectoryLock; +use crate::directory::GarbageCollectionResult; use crate::directory::Lock; use crate::directory::META_LOCK; use crate::directory::{ReadOnlySource, WritePtr}; @@ -104,7 +105,10 @@ impl ManagedDirectory { /// If a file cannot be deleted (for permission reasons for instance) /// an error is simply logged, and the file remains in the list of managed /// files. - pub fn garbage_collect HashSet>(&mut self, get_living_files: L) { + pub fn garbage_collect HashSet>( + &mut self, + get_living_files: L, + ) -> crate::Result { info!("Garbage collect"); let mut files_to_delete = vec![]; @@ -130,19 +134,25 @@ impl ManagedDirectory { // 2) writer change meta.json (for instance after a merge or a commit) // 3) gc kicks in. // 4) gc removes a file that was useful for process B, before process B opened it. - if let Ok(_meta_lock) = self.acquire_lock(&META_LOCK) { - let living_files = get_living_files(); - for managed_path in &meta_informations_rlock.managed_paths { - if !living_files.contains(managed_path) { - files_to_delete.push(managed_path.clone()); + match self.acquire_lock(&META_LOCK) { + Ok(_meta_lock) => { + let living_files = get_living_files(); + for managed_path in &meta_informations_rlock.managed_paths { + if !living_files.contains(managed_path) { + files_to_delete.push(managed_path.clone()); + } } } - } else { - error!("Failed to acquire lock for GC"); + Err(err) => { + error!("Failed to acquire lock for GC"); + return Err(crate::Error::from(err)); + } } } + let mut failed_to_delete_files = vec![]; let mut deleted_files = vec![]; + for file_to_delete in files_to_delete { match self.delete(&file_to_delete) { Ok(_) => { @@ -152,9 +162,10 @@ impl ManagedDirectory { Err(file_error) => { match file_error { DeleteError::FileDoesNotExist(_) => { - deleted_files.push(file_to_delete); + deleted_files.push(file_to_delete.clone()); } DeleteError::IOError(_) => { + failed_to_delete_files.push(file_to_delete.clone()); if !cfg!(target_os = "windows") { // On windows, delete is expected to fail if the file // is mmapped. @@ -177,10 +188,13 @@ impl ManagedDirectory { for delete_file in &deleted_files { managed_paths_write.remove(delete_file); } - if save_managed_paths(self.directory.as_mut(), &meta_informations_wlock).is_err() { - error!("Failed to save the list of managed files."); - } + save_managed_paths(self.directory.as_mut(), &meta_informations_wlock)?; } + + Ok(GarbageCollectionResult { + deleted_files, + failed_to_delete_files, + }) } /// Registers a file as managed @@ -328,7 +342,7 @@ mod tests_mmap_specific { assert!(managed_directory.exists(test_path1)); assert!(managed_directory.exists(test_path2)); let living_files: HashSet = [test_path1.to_owned()].iter().cloned().collect(); - managed_directory.garbage_collect(|| living_files); + assert!(managed_directory.garbage_collect(|| living_files).is_ok()); assert!(managed_directory.exists(test_path1)); assert!(!managed_directory.exists(test_path2)); } @@ -338,7 +352,7 @@ mod tests_mmap_specific { assert!(managed_directory.exists(test_path1)); assert!(!managed_directory.exists(test_path2)); let living_files: HashSet = HashSet::new(); - managed_directory.garbage_collect(|| living_files); + assert!(managed_directory.garbage_collect(|| living_files).is_ok()); assert!(!managed_directory.exists(test_path1)); assert!(!managed_directory.exists(test_path2)); } @@ -360,7 +374,9 @@ mod tests_mmap_specific { assert!(managed_directory.exists(test_path1)); let _mmap_read = managed_directory.open_read(test_path1).unwrap(); - managed_directory.garbage_collect(|| living_files.clone()); + assert!(managed_directory + .garbage_collect(|| living_files.clone()) + .is_ok()); if cfg!(target_os = "windows") { // On Windows, gc should try and fail the file as it is mmapped. assert!(managed_directory.exists(test_path1)); @@ -368,7 +384,7 @@ mod tests_mmap_specific { drop(_mmap_read); // The file should still be in the list of managed file and // eventually be deleted once mmap is released. - managed_directory.garbage_collect(|| living_files); + assert!(managed_directory.garbage_collect(|| living_files).is_ok()); assert!(!managed_directory.exists(test_path1)); } else { assert!(!managed_directory.exists(test_path1)); diff --git a/src/directory/mmap_directory.rs b/src/directory/mmap_directory.rs index 2f8ebc578..27909c540 100644 --- a/src/directory/mmap_directory.rs +++ b/src/directory/mmap_directory.rs @@ -174,7 +174,7 @@ impl WatcherWrapper { // We might want to be more accurate than this at one point. if let Some(filename) = changed_path.file_name() { if filename == *META_FILEPATH { - watcher_router_clone.broadcast(); + let _ = watcher_router_clone.broadcast(); } } } @@ -543,11 +543,9 @@ mod tests { use crate::ReloadPolicy; use std::fs; use std::sync::atomic::{AtomicUsize, Ordering}; - use std::thread; - use std::time::Duration; #[test] - fn test_open_non_existant_path() { + fn test_open_non_existent_path() { assert!(MmapDirectory::open(PathBuf::from("./nowhere")).is_err()); } @@ -640,13 +638,18 @@ mod tests { let tmp_dir = tempfile::TempDir::new().unwrap(); let tmp_dirpath = tmp_dir.path().to_owned(); let mut watch_wrapper = WatcherWrapper::new(&tmp_dirpath).unwrap(); - let tmp_file = tmp_dirpath.join("coucou"); + let tmp_file = tmp_dirpath.join(*META_FILEPATH); let _handle = watch_wrapper.watch(Box::new(move || { counter_clone.fetch_add(1, Ordering::SeqCst); })); + let (sender, receiver) = crossbeam::channel::unbounded(); + let _handle2 = watch_wrapper.watch(Box::new(move || { + let _ = sender.send(()); + })); assert_eq!(counter.load(Ordering::SeqCst), 0); fs::write(&tmp_file, b"whateverwilldo").unwrap(); - thread::sleep(Duration::new(0, 1_000u32)); + assert!(receiver.recv().is_ok()); + assert!(counter.load(Ordering::SeqCst) >= 1); } #[test] diff --git a/src/directory/mod.rs b/src/directory/mod.rs index 294beb9f0..5efd87be3 100644 --- a/src/directory/mod.rs +++ b/src/directory/mod.rs @@ -26,6 +26,21 @@ pub use self::read_only_source::ReadOnlySource; pub(crate) use self::watch_event_router::WatchCallbackList; pub use self::watch_event_router::{WatchCallback, WatchHandle}; use std::io::{self, BufWriter, Write}; +use std::path::PathBuf; + +/// Outcome of the Garbage collection +pub struct GarbageCollectionResult { + /// List of files that were deleted in this cycle + pub deleted_files: Vec, + /// List of files that were schedule to be deleted in this cycle, + /// but deletion did not work. This typically happens on windows, + /// as deleting a memory mapped file is forbidden. + /// + /// If a searcher is still held, a file cannot be deleted. + /// This is not considered a bug, the file will simply be deleted + /// in the next GC. + pub failed_to_delete_files: Vec, +} #[cfg(feature = "mmap")] pub use self::mmap_directory::MmapDirectory; diff --git a/src/directory/ram_directory.rs b/src/directory/ram_directory.rs index db19f9811..89fdc123f 100644 --- a/src/directory/ram_directory.rs +++ b/src/directory/ram_directory.rs @@ -195,7 +195,7 @@ impl Directory for RAMDirectory { vec_writer.write_all(data)?; vec_writer.flush()?; if path == Path::new(&*META_FILEPATH) { - self.fs.write().unwrap().watch_router.broadcast(); + let _ = self.fs.write().unwrap().watch_router.broadcast(); } Ok(()) } diff --git a/src/directory/tests.rs b/src/directory/tests.rs index 0aaa0ea1b..908da77f6 100644 --- a/src/directory/tests.rs +++ b/src/directory/tests.rs @@ -2,11 +2,9 @@ use super::*; use std::io::Write; use std::mem; use std::path::{Path, PathBuf}; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering; +use std::sync::atomic::Ordering::SeqCst; +use std::sync::atomic::{AtomicBool, AtomicUsize}; use std::sync::Arc; -use std::thread; -use std::time; use std::time::Duration; #[test] @@ -110,37 +108,38 @@ fn test_directory(directory: &mut dyn Directory) { } fn test_watch(directory: &mut dyn Directory) { + let num_progress: Arc = Default::default(); let counter: Arc = Default::default(); let counter_clone = counter.clone(); + let (sender, receiver) = crossbeam::channel::unbounded(); let watch_callback = Box::new(move || { - counter_clone.fetch_add(1, Ordering::SeqCst); + counter_clone.fetch_add(1, SeqCst); }); - assert!(directory - .atomic_write(Path::new("meta.json"), b"random_test_data") - .is_ok()); - thread::sleep(Duration::new(0, 10_000)); - assert_eq!(0, counter.load(Ordering::SeqCst)); - + // This callback is used to synchronize watching in our unit test. + // We bind it to a variable because the callback is removed when that + // handle is dropped. let watch_handle = directory.watch(watch_callback).unwrap(); + let _progress_listener = directory + .watch(Box::new(move || { + let val = num_progress.fetch_add(1, SeqCst); + let _ = sender.send(val); + })) + .unwrap(); + for i in 0..10 { - assert_eq!(i, counter.load(Ordering::SeqCst)); + assert_eq!(i, counter.load(SeqCst)); assert!(directory .atomic_write(Path::new("meta.json"), b"random_test_data_2") .is_ok()); - for _ in 0..1_000 { - if counter.load(Ordering::SeqCst) > i { - break; - } - thread::sleep(Duration::from_millis(10)); - } - assert_eq!(i + 1, counter.load(Ordering::SeqCst)); + assert_eq!(receiver.recv_timeout(Duration::from_millis(500)), Ok(i)); + assert_eq!(i + 1, counter.load(SeqCst)); } mem::drop(watch_handle); assert!(directory .atomic_write(Path::new("meta.json"), b"random_test_data") .is_ok()); - thread::sleep(Duration::from_millis(200)); - assert_eq!(10, counter.load(Ordering::SeqCst)); + assert!(receiver.recv_timeout(Duration::from_millis(500)).is_ok()); + assert_eq!(10, counter.load(SeqCst)); } fn test_lock_non_blocking(directory: &mut dyn Directory) { @@ -174,9 +173,11 @@ fn test_lock_blocking(directory: &mut dyn Directory) { is_blocking: true, }); assert!(lock_a_res.is_ok()); + let in_thread = Arc::new(AtomicBool::default()); + let in_thread_clone = in_thread.clone(); std::thread::spawn(move || { //< lock_a_res is sent to the thread. - std::thread::sleep(time::Duration::from_millis(10)); + in_thread_clone.store(true, SeqCst); // explicitely droping lock_a_res. It would have been sufficient to just force it // to be part of the move, but the intent seems clearer that way. drop(lock_a_res); @@ -190,13 +191,11 @@ fn test_lock_blocking(directory: &mut dyn Directory) { assert!(lock_a_res.is_err()); } { - // the blocking call should wait for at least 10ms. - let start = time::Instant::now(); let lock_a_res = directory.acquire_lock(&Lock { filepath: PathBuf::from("a.lock"), is_blocking: true, }); + assert!(in_thread.load(SeqCst)); assert!(lock_a_res.is_ok()); - assert!(start.elapsed().subsec_millis() >= 10); } } diff --git a/src/directory/watch_event_router.rs b/src/directory/watch_event_router.rs index 3c346db11..8b6acf1ce 100644 --- a/src/directory/watch_event_router.rs +++ b/src/directory/watch_event_router.rs @@ -1,3 +1,5 @@ +use futures::channel::oneshot; +use futures::{Future, TryFutureExt}; use std::sync::Arc; use std::sync::RwLock; use std::sync::Weak; @@ -47,14 +49,21 @@ impl WatchCallbackList { } /// Triggers all callbacks - pub fn broadcast(&self) { + pub fn broadcast(&self) -> impl Future { let callbacks = self.list_callback(); + let (sender, receiver) = oneshot::channel(); + let result = receiver.unwrap_or_else(|_| ()); + if callbacks.is_empty() { + let _ = sender.send(()); + return result; + } let spawn_res = std::thread::Builder::new() .name("watch-callbacks".to_string()) .spawn(move || { for callback in callbacks { callback(); } + let _ = sender.send(()); }); if let Err(err) = spawn_res { error!( @@ -62,19 +71,17 @@ impl WatchCallbackList { err ); } + result } } #[cfg(test)] mod tests { use crate::directory::WatchCallbackList; + use futures::executor::block_on; use std::mem; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; - use std::thread; - use std::time::Duration; - - const WAIT_TIME: u64 = 20; #[test] fn test_watch_event_router_simple() { @@ -84,22 +91,22 @@ mod tests { let inc_callback = Box::new(move || { counter_clone.fetch_add(1, Ordering::SeqCst); }); - watch_event_router.broadcast(); + block_on(watch_event_router.broadcast()); assert_eq!(0, counter.load(Ordering::SeqCst)); let handle_a = watch_event_router.subscribe(inc_callback); - thread::sleep(Duration::from_millis(WAIT_TIME)); assert_eq!(0, counter.load(Ordering::SeqCst)); - watch_event_router.broadcast(); - thread::sleep(Duration::from_millis(WAIT_TIME)); + block_on(watch_event_router.broadcast()); assert_eq!(1, counter.load(Ordering::SeqCst)); - watch_event_router.broadcast(); - watch_event_router.broadcast(); - watch_event_router.broadcast(); - thread::sleep(Duration::from_millis(WAIT_TIME)); + block_on(async { + ( + watch_event_router.broadcast().await, + watch_event_router.broadcast().await, + watch_event_router.broadcast().await, + ) + }); assert_eq!(4, counter.load(Ordering::SeqCst)); mem::drop(handle_a); - watch_event_router.broadcast(); - thread::sleep(Duration::from_millis(WAIT_TIME)); + block_on(watch_event_router.broadcast()); assert_eq!(4, counter.load(Ordering::SeqCst)); } @@ -115,20 +122,20 @@ mod tests { }; let handle_a = watch_event_router.subscribe(inc_callback(1)); let handle_a2 = watch_event_router.subscribe(inc_callback(10)); - thread::sleep(Duration::from_millis(WAIT_TIME)); assert_eq!(0, counter.load(Ordering::SeqCst)); - watch_event_router.broadcast(); - watch_event_router.broadcast(); - thread::sleep(Duration::from_millis(WAIT_TIME)); + block_on(async { + futures::join!( + watch_event_router.broadcast(), + watch_event_router.broadcast() + ) + }); assert_eq!(22, counter.load(Ordering::SeqCst)); mem::drop(handle_a); - watch_event_router.broadcast(); - thread::sleep(Duration::from_millis(WAIT_TIME)); + block_on(watch_event_router.broadcast()); assert_eq!(32, counter.load(Ordering::SeqCst)); mem::drop(handle_a2); - watch_event_router.broadcast(); - watch_event_router.broadcast(); - thread::sleep(Duration::from_millis(WAIT_TIME)); + block_on(watch_event_router.broadcast()); + block_on(watch_event_router.broadcast()); assert_eq!(32, counter.load(Ordering::SeqCst)); } @@ -142,14 +149,15 @@ mod tests { }); let handle_a = watch_event_router.subscribe(inc_callback); assert_eq!(0, counter.load(Ordering::SeqCst)); - watch_event_router.broadcast(); - watch_event_router.broadcast(); - thread::sleep(Duration::from_millis(WAIT_TIME)); + block_on(async { + let future1 = watch_event_router.broadcast(); + let future2 = watch_event_router.broadcast(); + futures::join!(future1, future2) + }); assert_eq!(2, counter.load(Ordering::SeqCst)); - thread::sleep(Duration::from_millis(WAIT_TIME)); mem::drop(handle_a); - watch_event_router.broadcast(); - thread::sleep(Duration::from_millis(WAIT_TIME)); + let _ = watch_event_router.broadcast(); + block_on(watch_event_router.broadcast()); assert_eq!(2, counter.load(Ordering::SeqCst)); } } diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 91f6f5d80..a851c0103 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -7,8 +7,8 @@ use crate::core::SegmentComponent; use crate::core::SegmentId; use crate::core::SegmentMeta; use crate::core::SegmentReader; -use crate::directory::DirectoryLock; use crate::directory::TerminatingWrite; +use crate::directory::{DirectoryLock, GarbageCollectionResult}; use crate::docset::DocSet; use crate::error::TantivyError; use crate::fastfield::write_delete_bitset; @@ -23,10 +23,9 @@ use crate::schema::Document; use crate::schema::IndexRecordOption; use crate::schema::Term; use crate::Opstamp; -use crate::Result; use bit_set::BitSet; use crossbeam::channel; -use futures::{Canceled, Future}; +use futures::future::Future; use smallvec::smallvec; use smallvec::SmallVec; use std::mem; @@ -72,7 +71,7 @@ pub struct IndexWriter { heap_size_in_bytes_per_thread: usize, - workers_join_handle: Vec>>, + workers_join_handle: Vec>>, operation_receiver: OperationReceiver, operation_sender: OperationSender, @@ -95,7 +94,7 @@ fn compute_deleted_bitset( delete_cursor: &mut DeleteCursor, doc_opstamps: &DocToOpstampMapping, target_opstamp: Opstamp, -) -> Result { +) -> crate::Result { let mut might_have_changed = false; while let Some(delete_op) = delete_cursor.get() { if delete_op.opstamp > target_opstamp { @@ -132,7 +131,7 @@ pub(crate) fn advance_deletes( mut segment: Segment, segment_entry: &mut SegmentEntry, target_opstamp: Opstamp, -) -> Result<()> { +) -> crate::Result<()> { { if segment_entry.meta().delete_opstamp() == Some(target_opstamp) { // We are already up-to-date here. @@ -181,7 +180,7 @@ fn index_documents( grouped_document_iterator: &mut dyn Iterator, segment_updater: &mut SegmentUpdater, mut delete_cursor: DeleteCursor, -) -> Result { +) -> crate::Result { let schema = segment.schema(); let mut segment_writer = SegmentWriter::for_segment(memory_budget, segment.clone(), &schema)?; @@ -236,7 +235,7 @@ fn apply_deletes( mut delete_cursor: &mut DeleteCursor, doc_opstamps: &[Opstamp], last_docstamp: Opstamp, -) -> Result>> { +) -> crate::Result>> { if delete_cursor.get().is_none() { // if there are no delete operation in the queue, no need // to even open the segment. @@ -281,7 +280,7 @@ impl IndexWriter { num_threads: usize, heap_size_in_bytes_per_thread: usize, directory_lock: DirectoryLock, - ) -> Result { + ) -> crate::Result { if heap_size_in_bytes_per_thread < HEAP_SIZE_MIN { let err_msg = format!( "The heap size per thread needs to be at least {}.", @@ -332,7 +331,7 @@ impl IndexWriter { /// If there are some merging threads, blocks until they all finish their work and /// then drop the `IndexWriter`. - pub fn wait_merging_threads(mut self) -> Result<()> { + pub fn wait_merging_threads(mut self) -> crate::Result<()> { // this will stop the indexing thread, // dropping the last reference to the segment_updater. drop(self.operation_sender); @@ -381,7 +380,7 @@ impl IndexWriter { /// Spawns a new worker thread for indexing. /// The thread consumes documents from the pipeline. - fn add_indexing_worker(&mut self) -> Result<()> { + fn add_indexing_worker(&mut self) -> crate::Result<()> { let document_receiver_clone = self.operation_receiver.clone(); let mut segment_updater = self.segment_updater.clone(); @@ -389,7 +388,7 @@ impl IndexWriter { let mem_budget = self.heap_size_in_bytes_per_thread; let index = self.index.clone(); - let join_handle: JoinHandle> = thread::Builder::new() + let join_handle: JoinHandle> = thread::Builder::new() .name(format!("thrd-tantivy-index{}", self.worker_id)) .spawn(move || { loop { @@ -440,17 +439,18 @@ impl IndexWriter { self.segment_updater.set_merge_policy(merge_policy); } - fn start_workers(&mut self) -> Result<()> { + fn start_workers(&mut self) -> crate::Result<()> { for _ in 0..self.num_threads { self.add_indexing_worker()?; } Ok(()) } - /// Detects and removes the files that - /// are not used by the index anymore. - pub fn garbage_collect_files(&self) -> Result<()> { - self.segment_updater.garbage_collect_files().wait() + /// Detects and removes the files that are not used by the index anymore. + pub fn garbage_collect_files( + &self, + ) -> impl Future> { + self.segment_updater.garbage_collect_files() } /// Deletes all documents from the index @@ -489,7 +489,7 @@ impl IndexWriter { /// Ok(()) /// } /// ``` - pub fn delete_all_documents(&self) -> Result { + pub fn delete_all_documents(&self) -> crate::Result { // Delete segments self.segment_updater.remove_all_segments(); // Return new stamp - reverted stamp @@ -503,7 +503,7 @@ impl IndexWriter { pub fn merge( &mut self, segment_ids: &[SegmentId], - ) -> Result> { + ) -> impl Future> { self.segment_updater.start_merge(segment_ids) } @@ -530,7 +530,7 @@ impl IndexWriter { /// state as it was after the last commit. /// /// The opstamp at the last commit is returned. - pub fn rollback(&mut self) -> Result { + pub fn rollback(&mut self) -> crate::Result { info!("Rolling back to opstamp {}", self.committed_opstamp); // marks the segment updater as killed. From now on, all // segment updates will be ignored. @@ -587,7 +587,7 @@ impl IndexWriter { /// It is also possible to add a payload to the `commit` /// using this API. /// See [`PreparedCommit::set_payload()`](PreparedCommit.html) - pub fn prepare_commit(&mut self) -> Result { + pub fn prepare_commit(&mut self) -> crate::Result { // Here, because we join all of the worker threads, // all of the segment update for this commit have been // sent. @@ -634,7 +634,7 @@ impl IndexWriter { /// Commit returns the `opstamp` of the last document /// that made it in the commit. /// - pub fn commit(&mut self) -> Result { + pub fn commit(&mut self) -> crate::Result { self.prepare_commit()?.commit() } diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 199780ffb..843eb2ca2 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -709,7 +709,7 @@ mod tests { use crate::IndexWriter; use crate::Searcher; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; - use futures::Future; + use futures::executor::block_on; use std::io::Cursor; #[test] @@ -792,11 +792,7 @@ mod tests { .searchable_segment_ids() .expect("Searchable segments failed."); let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap(); - index_writer - .merge(&segment_ids) - .expect("Failed to initiate merge") - .wait() - .expect("Merging failed"); + block_on(index_writer.merge(&segment_ids)).expect("Merging failed"); index_writer.wait_merging_threads().unwrap(); } { @@ -1040,11 +1036,7 @@ mod tests { let segment_ids = index .searchable_segment_ids() .expect("Searchable segments failed."); - index_writer - .merge(&segment_ids) - .expect("Failed to initiate merge") - .wait() - .expect("Merging failed"); + block_on(index_writer.merge(&segment_ids)).expect("Merging failed"); reader.reload().unwrap(); let searcher = reader.searcher(); assert_eq!(searcher.segment_readers().len(), 1); @@ -1139,11 +1131,7 @@ mod tests { let segment_ids = index .searchable_segment_ids() .expect("Searchable segments failed."); - index_writer - .merge(&segment_ids) - .expect("Failed to initiate merge") - .wait() - .expect("Merging failed"); + block_on(index_writer.merge(&segment_ids)).expect("Merging failed"); reader.reload().unwrap(); let searcher = reader.searcher(); @@ -1277,11 +1265,7 @@ mod tests { .searchable_segment_ids() .expect("Searchable segments failed."); let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap(); - index_writer - .merge(&segment_ids) - .expect("Failed to initiate merge") - .wait() - .expect("Merging failed"); + block_on(index_writer.merge(&segment_ids)).expect("Merging failed"); index_writer.wait_merging_threads().unwrap(); reader.reload().unwrap(); test_searcher( @@ -1336,11 +1320,7 @@ mod tests { let segment_ids = index .searchable_segment_ids() .expect("Searchable segments failed."); - index_writer - .merge(&segment_ids) - .expect("Failed to initiate merge") - .wait() - .expect("Merging failed"); + block_on(index_writer.merge(&segment_ids)).expect("Merging failed"); reader.reload().unwrap(); // commit has not been called yet. The document should still be // there. @@ -1361,22 +1341,18 @@ mod tests { let mut doc = Document::default(); doc.add_u64(int_field, 1); index_writer.add_document(doc.clone()); - index_writer.commit().expect("commit failed"); + assert!(index_writer.commit().is_ok()); index_writer.add_document(doc); - index_writer.commit().expect("commit failed"); + assert!(index_writer.commit().is_ok()); index_writer.delete_term(Term::from_field_u64(int_field, 1)); let segment_ids = index .searchable_segment_ids() .expect("Searchable segments failed."); - index_writer - .merge(&segment_ids) - .expect("Failed to initiate merge") - .wait() - .expect("Merging failed"); + assert!(block_on(index_writer.merge(&segment_ids)).is_ok()); // assert delete has not been committed - reader.reload().expect("failed to load searcher 1"); + assert!(reader.reload().is_ok()); let searcher = reader.searcher(); assert_eq!(searcher.num_docs(), 2); @@ -1415,12 +1391,12 @@ mod tests { index_doc(&mut index_writer, &[1, 5]); index_doc(&mut index_writer, &[3]); index_doc(&mut index_writer, &[17]); - index_writer.commit().expect("committed"); + assert!(index_writer.commit().is_ok()); index_doc(&mut index_writer, &[20]); - index_writer.commit().expect("committed"); + assert!(index_writer.commit().is_ok()); index_doc(&mut index_writer, &[28, 27]); index_doc(&mut index_writer, &[1_000]); - index_writer.commit().expect("committed"); + assert!(index_writer.commit().is_ok()); } let reader = index.reader().unwrap(); let searcher = reader.searcher(); @@ -1452,15 +1428,6 @@ mod tests { assert_eq!(&vals, &[17]); } - println!( - "{:?}", - searcher - .segment_readers() - .iter() - .map(|reader| reader.max_doc()) - .collect::>() - ); - { let segment = searcher.segment_reader(1u32); let ff_reader = segment.fast_fields().u64s(int_field).unwrap(); @@ -1484,27 +1451,13 @@ mod tests { .searchable_segment_ids() .expect("Searchable segments failed."); let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap(); - index_writer - .merge(&segment_ids) - .expect("Failed to initiate merge") - .wait() - .expect("Merging failed"); - index_writer - .wait_merging_threads() - .expect("Wait for merging threads"); + assert!(block_on(index_writer.merge(&segment_ids)).is_ok()); + assert!(index_writer.wait_merging_threads().is_ok()); } - reader.reload().expect("Load searcher"); + assert!(reader.reload().is_ok()); { let searcher = reader.searcher(); - println!( - "{:?}", - searcher - .segment_readers() - .iter() - .map(|reader| reader.max_doc()) - .collect::>() - ); let segment = searcher.segment_reader(0u32); let ff_reader = segment.fast_fields().u64s(int_field).unwrap(); diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index d3ba91af1..0669c64e7 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -6,7 +6,7 @@ use crate::core::SegmentId; use crate::core::SegmentMeta; use crate::core::SerializableSegment; use crate::core::META_FILEPATH; -use crate::directory::{Directory, DirectoryClone}; +use crate::directory::{Directory, DirectoryClone, GarbageCollectionResult}; use crate::error::TantivyError; use crate::indexer::delete_queue::DeleteCursor; use crate::indexer::index_writer::advance_deletes; @@ -19,13 +19,10 @@ use crate::indexer::SegmentSerializer; use crate::indexer::{DefaultMergePolicy, MergePolicy}; use crate::schema::Schema; use crate::Opstamp; -use crate::Result; -use futures::oneshot; -use futures::sync::oneshot::Receiver; -use futures::Future; -use futures_cpupool::Builder as CpuPoolBuilder; -use futures_cpupool::CpuFuture; -use futures_cpupool::CpuPool; +use futures::channel::oneshot; +use futures::executor::{block_on, ThreadPool, ThreadPoolBuilder}; +use futures::future::Future; +use futures::future::TryFutureExt; use serde_json; use std::borrow::BorrowMut; use std::collections::HashMap; @@ -49,7 +46,7 @@ use std::thread::JoinHandle; /// and flushed. /// /// This method is not part of tantivy's public API -pub fn save_new_metas(schema: Schema, directory: &mut dyn Directory) -> Result<()> { +pub fn save_new_metas(schema: Schema, directory: &mut dyn Directory) -> crate::Result<()> { save_metas( &IndexMeta { segments: Vec::new(), @@ -70,7 +67,7 @@ pub fn save_new_metas(schema: Schema, directory: &mut dyn Directory) -> Result<( /// and flushed. /// /// This method is not part of tantivy's public API -fn save_metas(metas: &IndexMeta, directory: &mut dyn Directory) -> Result<()> { +fn save_metas(metas: &IndexMeta, directory: &mut dyn Directory) -> crate::Result<()> { info!("save metas"); let mut buffer = serde_json::to_vec_pretty(metas)?; // Just adding a new line at the end of the buffer. @@ -95,7 +92,7 @@ fn perform_merge( merge_operation: &MergeOperation, index: &Index, mut segment_entries: Vec, -) -> Result { +) -> crate::Result { let target_opstamp = merge_operation.target_opstamp(); // first we need to apply deletes to our segment. @@ -132,6 +129,16 @@ fn perform_merge( Ok(after_merge_segment_entry) } +async fn garbage_collect_files( + segment_updater: SegmentUpdater, +) -> crate::Result { + info!("Running garbage collection"); + let mut index = segment_updater.0.index.clone(); + index + .directory_mut() + .garbage_collect(move || segment_updater.list_files()) +} + struct InnerSegmentUpdater { // we keep a copy of the current active IndexMeta to // avoid loading the file everytime we need it in the @@ -140,12 +147,12 @@ struct InnerSegmentUpdater { // This should be up to date as all update happen through // the unique active `SegmentUpdater`. active_metas: RwLock>, - pool: CpuPool, + pool: ThreadPool, index: Index, segment_manager: SegmentManager, merge_policy: RwLock>>, merging_thread_id: AtomicUsize, - merging_threads: RwLock>>>, + merging_threads: RwLock>>>, killed: AtomicBool, stamper: Stamper, merge_operations: MergeOperationInventory, @@ -156,13 +163,14 @@ impl SegmentUpdater { index: Index, stamper: Stamper, delete_cursor: &DeleteCursor, - ) -> Result { + ) -> crate::Result { let segments = index.searchable_segment_metas()?; let segment_manager = SegmentManager::from_segments(segments, delete_cursor); - let pool = CpuPoolBuilder::new() + let pool = ThreadPoolBuilder::new() .name_prefix("segment_updater") .pool_size(1) - .create(); + .create() + .unwrap(); // TODO fixme let index_meta = index.load_metas()?; Ok(SegmentUpdater(Arc::new(InnerSegmentUpdater { active_metas: RwLock::new(Arc::new(index_meta)), @@ -191,20 +199,28 @@ impl SegmentUpdater { self.0.merging_thread_id.fetch_add(1, Ordering::SeqCst) } - fn run_async T>( + fn schedule_future> + 'static + Send>( &self, f: F, - ) -> CpuFuture { - let me_clone = self.clone(); - self.0.pool.spawn_fn(move || Ok(f(me_clone))) + ) -> impl Future> { + let (sender, receiver) = oneshot::channel(); + self.0.pool.spawn_ok(async move { + let _ = sender.send(f.await); + }); + receiver.unwrap_or_else(|_| { + Err(crate::Error::SystemError( + "A segment_updater future did not success. This should never happen.".to_string(), + )) + }) } pub fn add_segment(&self, segment_entry: SegmentEntry) { - self.run_async(|segment_updater| { + let segment_updater = self.clone(); + let _ = self.schedule_future(async move { segment_updater.0.segment_manager.add_segment(segment_entry); - segment_updater.consider_merge_options(); - }) - .forget(); + segment_updater.consider_merge_options().await; + Ok(()) + }); } /// Orders `SegmentManager` to remove all segments @@ -224,7 +240,7 @@ impl SegmentUpdater { /// /// The method returns copies of the segment entries, /// updated with the delete information. - fn purge_deletes(&self, target_opstamp: Opstamp) -> Result> { + fn purge_deletes(&self, target_opstamp: Opstamp) -> crate::Result> { let mut segment_entries = self.0.segment_manager.segment_entries(); for segment_entry in &mut segment_entries { let segment = self.0.index.segment(segment_entry.meta().clone()); @@ -233,7 +249,11 @@ impl SegmentUpdater { Ok(segment_entries) } - pub fn save_metas(&self, opstamp: Opstamp, commit_message: Option) { + pub fn save_metas( + &self, + opstamp: Opstamp, + commit_message: Option, + ) -> crate::Result<()> { if self.is_alive() { let index = &self.0.index; let directory = index.directory(); @@ -259,16 +279,18 @@ impl SegmentUpdater { opstamp, payload: commit_message, }; - save_metas(&index_meta, directory.box_clone().borrow_mut()) - .expect("Could not save metas."); + // TODO add context to the error. + save_metas(&index_meta, directory.box_clone().borrow_mut())?; self.store_meta(&index_meta); } + Ok(()) } - pub fn garbage_collect_files(&self) -> CpuFuture<(), TantivyError> { - self.run_async(move |segment_updater| { - segment_updater.garbage_collect_files_exec(); - }) + pub fn garbage_collect_files( + &self, + ) -> impl Future> { + let garbage_collect_future = garbage_collect_files(self.clone()); + self.schedule_future(garbage_collect_future) } /// List the files that are useful to the index. @@ -284,92 +306,106 @@ impl SegmentUpdater { files } - fn garbage_collect_files_exec(&self) { - info!("Running garbage collection"); - let mut index = self.0.index.clone(); - index.directory_mut().garbage_collect(|| self.list_files()); - } - - pub fn commit(&self, opstamp: Opstamp, payload: Option) -> Result<()> { - self.run_async(move |segment_updater| { + pub fn schedule_commit( + &self, + opstamp: Opstamp, + payload: Option, + ) -> impl Future> { + let segment_updater: SegmentUpdater = self.clone(); + self.schedule_future(async move { if segment_updater.is_alive() { - let segment_entries = segment_updater - .purge_deletes(opstamp) - .expect("Failed purge deletes"); + let segment_entries = segment_updater.purge_deletes(opstamp)?; segment_updater.0.segment_manager.commit(segment_entries); - segment_updater.save_metas(opstamp, payload); - segment_updater.garbage_collect_files_exec(); - segment_updater.consider_merge_options(); + segment_updater.save_metas(opstamp, payload)?; + let _ = garbage_collect_files(segment_updater.clone()).await; + segment_updater.consider_merge_options().await; } + Ok(()) }) - .wait() } - pub fn start_merge(&self, segment_ids: &[SegmentId]) -> Result> { + pub fn commit(&self, opstamp: Opstamp, payload: Option) -> crate::Result<()> { + block_on(self.schedule_commit(opstamp, payload)) + } + + pub fn start_merge( + &self, + segment_ids: &[SegmentId], + ) -> impl Future> { let commit_opstamp = self.load_metas().opstamp; let merge_operation = MergeOperation::new( &self.0.merge_operations, commit_opstamp, segment_ids.to_vec(), ); - self.run_async(move |segment_updater| segment_updater.start_merge_impl(merge_operation)) - .wait()? + let segment_updater = self.clone(); + let start_merge_future = + self.schedule_future(async move { segment_updater.start_merge_impl(merge_operation) }); + async move { + let future = start_merge_future.await?; + future + .await + .map_err(|err| crate::Error::SystemError(format!("{:?}", err))) + } } fn store_meta(&self, index_meta: &IndexMeta) { *self.0.active_metas.write().unwrap() = Arc::new(index_meta.clone()); } + fn load_metas(&self) -> Arc { self.0.active_metas.read().unwrap().clone() } // `segment_ids` is required to be non-empty. - fn start_merge_impl(&self, merge_operation: MergeOperation) -> Result> { + fn start_merge_impl( + &self, + merge_operation: MergeOperation, + ) -> crate::Result>> { assert!( !merge_operation.segment_ids().is_empty(), "Segment_ids cannot be empty." ); - let segment_updater_clone = self.clone(); + let segment_updater = self.clone(); let segment_entries: Vec = self .0 .segment_manager .start_merge(merge_operation.segment_ids())?; - // let segment_ids_vec = merge_operation.segment_ids.to_vec(); - let merging_thread_id = self.get_merging_thread_id(); info!( "Starting merge thread #{} - {:?}", merging_thread_id, merge_operation.segment_ids() ); - let (merging_future_send, merging_future_recv) = oneshot(); + let (merging_future_send, merging_future_recv) = oneshot::channel(); + + // We acquire the lock preemptively as way to remove the following race condition : + // Thread starting and trying to remove the merging thread join handle before the + // the current thread has populated the join handle map. + let mut merging_threads_wlock = self.0.merging_threads.write().unwrap(); // first we need to apply deletes to our segment. let merging_join_handle = thread::Builder::new() .name(format!("mergingthread-{}", merging_thread_id)) .spawn(move || { // first we need to apply deletes to our segment. - let merge_result = perform_merge( - &merge_operation, - &segment_updater_clone.0.index, - segment_entries, - ); - + let merge_result: crate::Result = + perform_merge(&merge_operation, &segment_updater.0.index, segment_entries); match merge_result { Ok(after_merge_segment_entry) => { let merged_segment_meta = after_merge_segment_entry.meta().clone(); - segment_updater_clone + segment_updater .end_merge(merge_operation, after_merge_segment_entry) - .expect("Segment updater thread is corrupted."); + .expect("Segment updater thread is corrupted. Please report."); // the future may fail if the listener of the oneshot future // has been destroyed. // // This is not a problem here, so we just ignore any // possible error. - let _merging_future_res = merging_future_send.send(merged_segment_meta); + let _send_result = merging_future_send.send(merged_segment_meta); } Err(e) => { warn!( @@ -386,7 +422,7 @@ impl SegmentUpdater { // `merging_future_send` will be dropped, sending an error to the future. } } - segment_updater_clone + segment_updater .0 .merging_threads .write() @@ -394,16 +430,15 @@ impl SegmentUpdater { .remove(&merging_thread_id); Ok(()) }) - .expect("Failed to spawn a thread."); - self.0 - .merging_threads - .write() - .unwrap() - .insert(merging_thread_id, merging_join_handle); + .map_err(|err| { + crate::Error::ErrorInThread(format!("Failed to spawn thread. {:?}", err)) + })?; + + merging_threads_wlock.insert(merging_thread_id, merging_join_handle); Ok(merging_future_recv) } - fn consider_merge_options(&self) { + async fn consider_merge_options(&self) { let merge_segment_ids: HashSet = self.0.merge_operations.segment_in_merge(); let (committed_segments, uncommitted_segments) = get_mergeable_segments(&merge_segment_ids, &self.0.segment_manager); @@ -432,18 +467,11 @@ impl SegmentUpdater { merge_candidates.extend(committed_merge_candidates.into_iter()); for merge_operation in merge_candidates { - match self.start_merge_impl(merge_operation) { - Ok(merge_future) => { - if let Err(e) = merge_future.fuse().poll() { - error!("The merge task failed quickly after starting: {:?}", e); - } - } - Err(err) => { - warn!( - "Starting the merge failed for the following reason. This is not fatal. {}", - err - ); - } + if let Err(err) = self.start_merge_impl(merge_operation) { + warn!( + "Starting the merge failed for the following reason. This is not fatal. {}", + err + ); } } } @@ -452,8 +480,9 @@ impl SegmentUpdater { &self, merge_operation: MergeOperation, mut after_merge_segment_entry: SegmentEntry, - ) -> Result<()> { - self.run_async(move |segment_updater| { + ) -> crate::Result<()> { + let segment_updater = self.clone(); + let end_merge_future = self.schedule_future(async move { info!("End merge {:?}", after_merge_segment_entry.meta()); { let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone(); @@ -478,7 +507,7 @@ impl SegmentUpdater { // ... cancel merge // `merge_operations` are tracked. As it is dropped, the // the segment_ids will be available again for merge. - return; + return Err(e); } } } @@ -487,12 +516,15 @@ impl SegmentUpdater { .0 .segment_manager .end_merge(merge_operation.segment_ids(), after_merge_segment_entry); - segment_updater.consider_merge_options(); - segment_updater.save_metas(previous_metas.opstamp, previous_metas.payload.clone()); + // TODO no need to save meta if the merge was on uncommitted segments + segment_updater + .save_metas(previous_metas.opstamp, previous_metas.payload.clone())?; + segment_updater.consider_merge_options().await; } // we drop all possible handle to a now useless `SegmentMeta`. - segment_updater.garbage_collect_files_exec(); - }) - .wait() + let _ = garbage_collect_files(segment_updater).await; + Ok(()) + }); + block_on(end_merge_future) } /// Wait for current merging threads. @@ -510,9 +542,9 @@ impl SegmentUpdater { /// /// Obsolete files will eventually be cleaned up /// by the directory garbage collector. - pub fn wait_merging_thread(&self) -> Result<()> { + pub fn wait_merging_thread(&self) -> crate::Result<()> { loop { - let merging_threads: HashMap>> = { + let merging_threads: HashMap>> = { let mut merging_threads = self.0.merging_threads.write().unwrap(); mem::replace(merging_threads.deref_mut(), HashMap::new()) }; @@ -528,7 +560,7 @@ impl SegmentUpdater { } // Our merging thread may have queued their completed merged segment. // Let's wait for that too. - self.run_async(move |_| {}).wait()?; + block_on(self.schedule_future(async { Ok(()) }))?; } } } diff --git a/src/reader/mod.rs b/src/reader/mod.rs index f10aa9895..529d95c54 100644 --- a/src/reader/mod.rs +++ b/src/reader/mod.rs @@ -162,6 +162,11 @@ pub struct IndexReader { } impl IndexReader { + #[cfg(test)] + pub(crate) fn index(&self) -> Index { + self.inner.index.clone() + } + /// Update searchers so that they reflect the state of the last /// `.commit()`. /// diff --git a/src/reader/pool.rs b/src/reader/pool.rs index 0512e2886..38b6d92a8 100644 --- a/src/reader/pool.rs +++ b/src/reader/pool.rs @@ -167,7 +167,7 @@ mod tests { use super::Pool; use super::Queue; - use std::iter; + use std::{iter, mem}; #[test] fn test_pool() { @@ -197,33 +197,67 @@ mod tests { fn test_pool_dont_panic_on_empty_pop() { // When the object pool is exhausted, it shouldn't panic on pop() use std::sync::Arc; - use std::{thread, time}; + use std::thread; // Wrap the pool in an Arc, same way as its used in `core/index.rs` - let pool = Arc::new(Pool::new()); + let pool1 = Arc::new(Pool::new()); // clone pools outside the move scope of each new thread - let pool1 = Arc::clone(&pool); - let pool2 = Arc::clone(&pool); + let pool2 = Arc::clone(&pool1); + let pool3 = Arc::clone(&pool1); + let elements_for_pool = vec![1, 2]; - pool.publish_new_generation(elements_for_pool); + pool1.publish_new_generation(elements_for_pool); let mut threads = vec![]; - let sleep_dur = time::Duration::from_millis(10); // spawn one more thread than there are elements in the pool + + let (start_1_send, start_1_recv) = crossbeam::bounded(0); + let (start_2_send, start_2_recv) = crossbeam::bounded(0); + let (start_3_send, start_3_recv) = crossbeam::bounded(0); + + let (event_send1, event_recv) = crossbeam::unbounded(); + let event_send2 = event_send1.clone(); + let event_send3 = event_send1.clone(); + threads.push(thread::spawn(move || { - // leasing to make sure it's not dropped before sleep is called - let _leased_searcher = &pool.acquire(); - thread::sleep(sleep_dur); - })); - threads.push(thread::spawn(move || { - // leasing to make sure it's not dropped before sleep is called + assert_eq!(start_1_recv.recv(), Ok("start")); let _leased_searcher = &pool1.acquire(); - thread::sleep(sleep_dur); + assert!(event_send1.send("1 acquired").is_ok()); + assert_eq!(start_1_recv.recv(), Ok("stop")); + assert!(event_send1.send("1 stopped").is_ok()); + mem::drop(_leased_searcher); })); + threads.push(thread::spawn(move || { - // leasing to make sure it's not dropped before sleep is called + assert_eq!(start_2_recv.recv(), Ok("start")); let _leased_searcher = &pool2.acquire(); - thread::sleep(sleep_dur); + assert!(event_send2.send("2 acquired").is_ok()); + assert_eq!(start_2_recv.recv(), Ok("stop")); + mem::drop(_leased_searcher); + assert!(event_send2.send("2 stopped").is_ok()); })); + + threads.push(thread::spawn(move || { + assert_eq!(start_3_recv.recv(), Ok("start")); + let _leased_searcher = &pool3.acquire(); + assert!(event_send3.send("3 acquired").is_ok()); + assert_eq!(start_3_recv.recv(), Ok("stop")); + mem::drop(_leased_searcher); + assert!(event_send3.send("3 stopped").is_ok()); + })); + + assert!(start_1_send.send("start").is_ok()); + assert_eq!(event_recv.recv(), Ok("1 acquired")); + assert!(start_2_send.send("start").is_ok()); + assert_eq!(event_recv.recv(), Ok("2 acquired")); + assert!(start_3_send.send("start").is_ok()); + assert!(event_recv.try_recv().is_err()); + assert!(start_1_send.send("stop").is_ok()); + assert_eq!(event_recv.recv(), Ok("1 stopped")); + assert_eq!(event_recv.recv(), Ok("3 acquired")); + assert!(start_3_send.send("stop").is_ok()); + assert_eq!(event_recv.recv(), Ok("3 stopped")); + assert!(start_2_send.send("stop").is_ok()); + assert_eq!(event_recv.recv(), Ok("2 stopped")); } } diff --git a/tests/failpoints/mod.rs b/tests/failpoints/mod.rs index 311744972..658fadbc1 100644 --- a/tests/failpoints/mod.rs +++ b/tests/failpoints/mod.rs @@ -28,11 +28,11 @@ fn test_failpoints_managed_directory_gc_if_delete_fails() { // The initial 1*off is there to allow for the removal of the // lock file. fail::cfg("RAMDirectory::delete", "1*off->1*return").unwrap(); - managed_directory.garbage_collect(Default::default); + assert!(managed_directory.garbage_collect(Default::default).is_ok()); assert!(managed_directory.exists(test_path)); // running the gc a second time should remove the file. - managed_directory.garbage_collect(Default::default); + assert!(managed_directory.garbage_collect(Default::default).is_ok()); assert!( !managed_directory.exists(test_path), "The file should have been deleted"