diff --git a/Cargo.toml b/Cargo.toml index b7c7468b5..9b03bacde 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,7 +41,6 @@ owning_ref = "0.4" stable_deref_trait = "1.0.0" rust-stemmers = "1.1" downcast-rs = { version="1.0" } -matches = "0.1" bitpacking = "0.6" census = "0.2" fnv = "1.0.6" @@ -58,6 +57,7 @@ winapi = "0.2" [dev-dependencies] rand = "0.6" maplit = "1" +matches = "0.1.8" [profile.release] opt-level = 3 diff --git a/src/core/executor.rs b/src/core/executor.rs index ce5fd18b7..281e96b91 100644 --- a/src/core/executor.rs +++ b/src/core/executor.rs @@ -123,15 +123,14 @@ mod tests { } } -} - -#[test] -fn test_map_multithread() { - let result: Vec = Executor::multi_thread(3, "search-test") - .map(|i| Ok(i * 2), 0..10) - .unwrap(); - assert_eq!(result.len(), 10); - for i in 0..10 { - assert_eq!(result[i], i * 2); + #[test] + fn test_map_multithread() { + let result: Vec = Executor::multi_thread(3, "search-test") + .map(|i| Ok(i * 2), 0..10) + .unwrap(); + assert_eq!(result.len(), 10); + for i in 0..10 { + assert_eq!(result[i], i * 2); + } } } diff --git a/src/core/pool.rs b/src/core/pool.rs index ccac81321..b24d0b3ed 100644 --- a/src/core/pool.rs +++ b/src/core/pool.rs @@ -1,4 +1,5 @@ -use crossbeam::queue::MsQueue; +use crossbeam::crossbeam_channel::unbounded; +use crossbeam::{Receiver, RecvError, Sender}; use std::ops::{Deref, DerefMut}; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; @@ -9,6 +10,37 @@ pub struct GenerationItem { item: T, } +/// Queue implementation for the Object Pool below +/// Uses the unbounded Linked-List type queue from crossbeam-channel +/// Splits the Queue into sender and receiver +struct Queue { + sender: Sender, + receiver: Receiver, +} + +impl Queue { + fn new() -> Self { + let (s, r) = unbounded(); + Queue { + sender: s, + receiver: r, + } + } + + /// Sender trait returns a Result type, which is ignored. + /// The Result is not handled at the moment + fn push(&self, elem: T) { + self.sender + .send(elem) + .expect("Sending an item to crossbeam-queue shouldn't fail"); + } + + /// Relies on the underlying crossbeam-channel Receiver + /// to block on empty queue + fn pop(&self) -> Result { + self.receiver.recv() + } +} /// An object pool /// @@ -16,14 +48,14 @@ pub struct GenerationItem { /// Object are wrapped in a `LeasedItem` wrapper and are /// released automatically back into the pool on `Drop`. pub struct Pool { - queue: Arc>>, + queue: Arc>>, freshest_generation: AtomicUsize, next_generation: AtomicUsize, } impl Pool { pub fn new() -> Pool { - let queue = Arc::new(MsQueue::new()); + let queue = Arc::new(Queue::new()); Pool { queue, freshest_generation: AtomicUsize::default(), @@ -77,7 +109,7 @@ impl Pool { pub fn acquire(&self) -> LeasedItem { let generation = self.generation(); loop { - let gen_item = self.queue.pop(); + let gen_item = self.queue.pop().unwrap(); if gen_item.generation >= generation { return LeasedItem { gen_item: Some(gen_item), @@ -93,7 +125,7 @@ impl Pool { pub struct LeasedItem { gen_item: Option>, - recycle_queue: Arc>>, + recycle_queue: Arc>>, } impl Deref for LeasedItem { @@ -130,6 +162,7 @@ impl Drop for LeasedItem { mod tests { use super::Pool; + use super::Queue; use std::iter; #[test] @@ -146,4 +179,47 @@ mod tests { assert_eq!(*pool.acquire(), 11); } } + + #[test] + fn test_queue() { + let q = Queue::new(); + let elem = 5; + q.push(elem); + let res = q.pop(); + assert_eq!(res.unwrap(), elem); + } + + #[test] + fn test_pool_dont_panic_on_empty_pop() { + // When the object pool is exhausted, it shouldn't panic on pop() + use std::sync::Arc; + use std::{thread, time}; + + // Wrap the pool in an Arc, same way as its used in `core/index.rs` + let pool = Arc::new(Pool::new()); + // clone pools outside the move scope of each new thread + let pool1 = Arc::clone(&pool); + let pool2 = Arc::clone(&pool); + let elements_for_pool = vec![1, 2]; + pool.publish_new_generation(elements_for_pool); + + let mut threads = vec![]; + let sleep_dur = time::Duration::from_millis(10); + // spawn one more thread than there are elements in the pool + threads.push(thread::spawn(move || { + // leasing to make sure it's not dropped before sleep is called + let _leased_searcher = &pool.acquire(); + thread::sleep(sleep_dur); + })); + threads.push(thread::spawn(move || { + // leasing to make sure it's not dropped before sleep is called + let _leased_searcher = &pool1.acquire(); + thread::sleep(sleep_dur); + })); + threads.push(thread::spawn(move || { + // leasing to make sure it's not dropped before sleep is called + let _leased_searcher = &pool2.acquire(); + thread::sleep(sleep_dur); + })); + } } diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index 9f8e9577f..2580bedda 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -62,7 +62,7 @@ pub fn save_new_metas(schema: Schema, directory: &mut Directory) -> Result<()> { /// Save the index meta file. /// This operation is atomic: /// Either -// - it fails, in which case an error is returned, +/// - it fails, in which case an error is returned, /// and the `meta.json` remains untouched, /// - it success, and `meta.json` is written /// and flushed.