From 311022c8694675d2f8f3fa86768c2e61811aeb36 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Wed, 30 Mar 2016 08:46:58 +0900 Subject: [PATCH] multithreading with wait --- src/core/directory.rs | 1 - src/core/index.rs | 4 ++-- src/core/writer.rs | 17 ++++++++--------- 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/src/core/directory.rs b/src/core/directory.rs index 2f4d02a4c..493d9e01f 100644 --- a/src/core/directory.rs +++ b/src/core/directory.rs @@ -15,7 +15,6 @@ use atomicwrites; use std::sync::Arc; use std::sync::RwLock; use tempdir::TempDir; -use std::cell::RefCell; use std::ops::Deref; use std::path::{Path, PathBuf}; diff --git a/src/core/index.rs b/src/core/index.rs index 2e0d0ebe2..d50338755 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -136,13 +136,13 @@ impl Index { // TODO find a rusty way to hide that, while keeping // it visible for IndexWriters. - pub fn publish_segment(&mut self, segment: Segment) -> io::Result<()> { + pub fn publish_segment(&mut self, segment: &Segment) -> io::Result<()> { self.metas.write().unwrap().segments.push(segment.segment_id.clone()); // TODO use logs self.save_metas() } - pub fn sync(&mut self, segment: Segment) -> io::Result<()> { + pub fn sync(&mut self, segment: &Segment) -> io::Result<()> { for component in [SegmentComponent::POSTINGS, SegmentComponent::TERMS].iter() { let path = segment.relative_path(component); let directory = try!(self.ro_directory()); diff --git a/src/core/writer.rs b/src/core/writer.rs index 48a228919..86118c529 100644 --- a/src/core/writer.rs +++ b/src/core/writer.rs @@ -1,7 +1,6 @@ use core::schema::*; use core::codec::*; use std::io; -use std::rc::Rc; use core::index::Index; use core::analyzer::SimpleTokenizer; use core::index::SerializableSegment; @@ -36,16 +35,17 @@ impl IndexWriter { let (queue_input, queue_output): (SyncSender, Receiver) = mpsc::sync_channel(10_000); let queue_output_sendable = Arc::new(Mutex::new(queue_output)); let threads = (0..num_threads).map(|thread_id| { + let queue_output_clone = queue_output_sendable.clone(); - let index_clone = index.clone(); + let mut index_clone = index.clone(); let schema_clone = schema.clone(); + thread::spawn(move || { let mut docs_remaining = true; while docs_remaining { let segment = index_clone.new_segment(); - let mut segment_writer = SegmentWriter::for_segment(segment, &schema_clone).unwrap(); - println!("thread_id {}", thread_id); - for i in 0..500 { + let mut segment_writer = SegmentWriter::for_segment(segment.clone(), &schema_clone).unwrap(); + for i in 0..225_000 { let doc: ArcDoc; { let queue = queue_output_clone.lock().unwrap(); @@ -55,7 +55,6 @@ impl IndexWriter { } Err(_) => { docs_remaining = false; - println!("err"); break; } } @@ -63,10 +62,9 @@ impl IndexWriter { // TODO stop unwrapping that one. segment_writer.add_document(&*doc, &schema_clone).unwrap(); } - println!("finalize {}", thread_id); segment_writer.finalize().unwrap(); - // try!(self.directory.sync(segment.clone())); - // try!(self.directory.publish_segment(segment.clone())); + index_clone.sync(&segment).unwrap(); + index_clone.publish_segment(&segment).unwrap(); // segment_writer.commit().unwrap(); } }) @@ -84,6 +82,7 @@ impl IndexWriter { } pub fn wait(self,) { + drop(self.queue_input); for thread in self.threads { thread.join(); }