MsQueue to channel (#495)

* Format

Made the docstring consistent
remove empty line

* Move matches to dev deps

* Replace MsQueue with an unbounded crossbeam-channel

Questions:
queue.push ignores Result return

How to test pop() calls, if they block

* Format

Made the docstring consistent
remove empty line

* Unwrap the Result of queue.pop

* Addressed Paul's review

wrap the Result-returning send call with expect()

implemented the test not to fail after popping from empty queue

removed references to the Michael-Scott Queue

formatted
This commit is contained in:
petr-tik
2019-02-23 00:06:50 +00:00
committed by Paul Masurel
parent 788b3803d9
commit 9451fd5b09
4 changed files with 92 additions and 17 deletions

View File

@@ -41,7 +41,6 @@ owning_ref = "0.4"
stable_deref_trait = "1.0.0"
rust-stemmers = "1.1"
downcast-rs = { version="1.0" }
matches = "0.1"
bitpacking = "0.6"
census = "0.2"
fnv = "1.0.6"
@@ -58,6 +57,7 @@ winapi = "0.2"
[dev-dependencies]
rand = "0.6"
maplit = "1"
matches = "0.1.8"
[profile.release]
opt-level = 3

View File

@@ -123,15 +123,14 @@ mod tests {
}
}
}
#[test]
fn test_map_multithread() {
let result: Vec<usize> = Executor::multi_thread(3, "search-test")
.map(|i| Ok(i * 2), 0..10)
.unwrap();
assert_eq!(result.len(), 10);
for i in 0..10 {
assert_eq!(result[i], i * 2);
#[test]
fn test_map_multithread() {
let result: Vec<usize> = Executor::multi_thread(3, "search-test")
.map(|i| Ok(i * 2), 0..10)
.unwrap();
assert_eq!(result.len(), 10);
for i in 0..10 {
assert_eq!(result[i], i * 2);
}
}
}

View File

@@ -1,4 +1,5 @@
use crossbeam::queue::MsQueue;
use crossbeam::crossbeam_channel::unbounded;
use crossbeam::{Receiver, RecvError, Sender};
use std::ops::{Deref, DerefMut};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
@@ -9,6 +10,37 @@ pub struct GenerationItem<T> {
item: T,
}
/// Queue implementation for the Object Pool below
/// Uses the unbounded Linked-List type queue from crossbeam-channel
/// Splits the Queue into sender and receiver
struct Queue<T> {
sender: Sender<T>,
receiver: Receiver<T>,
}
impl<T> Queue<T> {
fn new() -> Self {
let (s, r) = unbounded();
Queue {
sender: s,
receiver: r,
}
}
/// Sender trait returns a Result type, which is ignored.
/// The Result is not handled at the moment
fn push(&self, elem: T) {
self.sender
.send(elem)
.expect("Sending an item to crossbeam-queue shouldn't fail");
}
/// Relies on the underlying crossbeam-channel Receiver
/// to block on empty queue
fn pop(&self) -> Result<T, RecvError> {
self.receiver.recv()
}
}
/// An object pool
///
@@ -16,14 +48,14 @@ pub struct GenerationItem<T> {
/// Object are wrapped in a `LeasedItem` wrapper and are
/// released automatically back into the pool on `Drop`.
pub struct Pool<T> {
queue: Arc<MsQueue<GenerationItem<T>>>,
queue: Arc<Queue<GenerationItem<T>>>,
freshest_generation: AtomicUsize,
next_generation: AtomicUsize,
}
impl<T> Pool<T> {
pub fn new() -> Pool<T> {
let queue = Arc::new(MsQueue::new());
let queue = Arc::new(Queue::new());
Pool {
queue,
freshest_generation: AtomicUsize::default(),
@@ -77,7 +109,7 @@ impl<T> Pool<T> {
pub fn acquire(&self) -> LeasedItem<T> {
let generation = self.generation();
loop {
let gen_item = self.queue.pop();
let gen_item = self.queue.pop().unwrap();
if gen_item.generation >= generation {
return LeasedItem {
gen_item: Some(gen_item),
@@ -93,7 +125,7 @@ impl<T> Pool<T> {
pub struct LeasedItem<T> {
gen_item: Option<GenerationItem<T>>,
recycle_queue: Arc<MsQueue<GenerationItem<T>>>,
recycle_queue: Arc<Queue<GenerationItem<T>>>,
}
impl<T> Deref for LeasedItem<T> {
@@ -130,6 +162,7 @@ impl<T> Drop for LeasedItem<T> {
mod tests {
use super::Pool;
use super::Queue;
use std::iter;
#[test]
@@ -146,4 +179,47 @@ mod tests {
assert_eq!(*pool.acquire(), 11);
}
}
#[test]
fn test_queue() {
let q = Queue::new();
let elem = 5;
q.push(elem);
let res = q.pop();
assert_eq!(res.unwrap(), elem);
}
#[test]
fn test_pool_dont_panic_on_empty_pop() {
// When the object pool is exhausted, it shouldn't panic on pop()
use std::sync::Arc;
use std::{thread, time};
// Wrap the pool in an Arc, same way as its used in `core/index.rs`
let pool = Arc::new(Pool::new());
// clone pools outside the move scope of each new thread
let pool1 = Arc::clone(&pool);
let pool2 = Arc::clone(&pool);
let elements_for_pool = vec![1, 2];
pool.publish_new_generation(elements_for_pool);
let mut threads = vec![];
let sleep_dur = time::Duration::from_millis(10);
// spawn one more thread than there are elements in the pool
threads.push(thread::spawn(move || {
// leasing to make sure it's not dropped before sleep is called
let _leased_searcher = &pool.acquire();
thread::sleep(sleep_dur);
}));
threads.push(thread::spawn(move || {
// leasing to make sure it's not dropped before sleep is called
let _leased_searcher = &pool1.acquire();
thread::sleep(sleep_dur);
}));
threads.push(thread::spawn(move || {
// leasing to make sure it's not dropped before sleep is called
let _leased_searcher = &pool2.acquire();
thread::sleep(sleep_dur);
}));
}
}

View File

@@ -62,7 +62,7 @@ pub fn save_new_metas(schema: Schema, directory: &mut Directory) -> Result<()> {
/// Save the index meta file.
/// This operation is atomic:
/// Either
// - it fails, in which case an error is returned,
/// - it fails, in which case an error is returned,
/// and the `meta.json` remains untouched,
/// - it success, and `meta.json` is written
/// and flushed.