multithreading with wait

This commit is contained in:
Paul Masurel
2016-03-30 08:46:58 +09:00
parent 06d255b9c9
commit 311022c869
3 changed files with 10 additions and 12 deletions

View File

@@ -15,7 +15,6 @@ use atomicwrites;
use std::sync::Arc;
use std::sync::RwLock;
use tempdir::TempDir;
use std::cell::RefCell;
use std::ops::Deref;
use std::path::{Path, PathBuf};

View File

@@ -136,13 +136,13 @@ impl Index {
// TODO find a rusty way to hide that, while keeping
// it visible for IndexWriters.
pub fn publish_segment(&mut self, segment: Segment) -> io::Result<()> {
pub fn publish_segment(&mut self, segment: &Segment) -> io::Result<()> {
self.metas.write().unwrap().segments.push(segment.segment_id.clone());
// TODO use logs
self.save_metas()
}
pub fn sync(&mut self, segment: Segment) -> io::Result<()> {
pub fn sync(&mut self, segment: &Segment) -> io::Result<()> {
for component in [SegmentComponent::POSTINGS, SegmentComponent::TERMS].iter() {
let path = segment.relative_path(component);
let directory = try!(self.ro_directory());

View File

@@ -1,7 +1,6 @@
use core::schema::*;
use core::codec::*;
use std::io;
use std::rc::Rc;
use core::index::Index;
use core::analyzer::SimpleTokenizer;
use core::index::SerializableSegment;
@@ -36,16 +35,17 @@ impl IndexWriter {
let (queue_input, queue_output): (SyncSender<ArcDoc>, Receiver<ArcDoc>) = mpsc::sync_channel(10_000);
let queue_output_sendable = Arc::new(Mutex::new(queue_output));
let threads = (0..num_threads).map(|thread_id| {
let queue_output_clone = queue_output_sendable.clone();
let index_clone = index.clone();
let mut index_clone = index.clone();
let schema_clone = schema.clone();
thread::spawn(move || {
let mut docs_remaining = true;
while docs_remaining {
let segment = index_clone.new_segment();
let mut segment_writer = SegmentWriter::for_segment(segment, &schema_clone).unwrap();
println!("thread_id {}", thread_id);
for i in 0..500 {
let mut segment_writer = SegmentWriter::for_segment(segment.clone(), &schema_clone).unwrap();
for i in 0..225_000 {
let doc: ArcDoc;
{
let queue = queue_output_clone.lock().unwrap();
@@ -55,7 +55,6 @@ impl IndexWriter {
}
Err(_) => {
docs_remaining = false;
println!("err");
break;
}
}
@@ -63,10 +62,9 @@ impl IndexWriter {
// TODO stop unwrapping that one.
segment_writer.add_document(&*doc, &schema_clone).unwrap();
}
println!("finalize {}", thread_id);
segment_writer.finalize().unwrap();
// try!(self.directory.sync(segment.clone()));
// try!(self.directory.publish_segment(segment.clone()));
index_clone.sync(&segment).unwrap();
index_clone.publish_segment(&segment).unwrap();
// segment_writer.commit().unwrap();
}
})
@@ -84,6 +82,7 @@ impl IndexWriter {
}
pub fn wait(self,) {
drop(self.queue_input);
for thread in self.threads {
thread.join();
}