From 3bc9978d95bd4971e76cb8bae8686c2fb4a05d27 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Wed, 12 Oct 2016 10:41:41 +0900 Subject: [PATCH] bug/4 Pretty bad race condition remaining 1 --- src/core/index.rs | 83 +++++++----------------------------- src/core/index_meta.rs | 47 +++++++++++++++++++++ src/core/mod.rs | 20 ++++----- src/core/pool.rs | 7 ++-- src/indexer/index_writer.rs | 84 +++++++++++++++++++++++++++++++------ 5 files changed, 143 insertions(+), 98 deletions(-) create mode 100644 src/core/index_meta.rs diff --git a/src/core/index.rs b/src/core/index.rs index 123ec14c1..64c9d89be 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -1,8 +1,7 @@ use Result; use Error; -use std::path::{PathBuf, Path}; +use std::path::Path; use schema::Schema; -use std::io::Write; use std::sync::Arc; use std::fmt; use rustc_serialize::json; @@ -16,49 +15,21 @@ use super::segment::Segment; use core::SegmentReader; use super::pool::Pool; use super::pool::LeasedItem; -use core::SegmentMeta; use indexer::SegmentManager; use indexer::{MergePolicy, SimpleMergePolicy}; +use core::IndexMeta; +use core::META_FILEPATH; use super::segment::create_segment; const NUM_SEARCHERS: usize = 12; - +/// Accessor to the index segment manager +/// +/// This method is not part of tantivy's public API pub fn get_segment_manager(index: &Index) -> Arc { index.segment_manager.clone() } - -/// MetaInformation about the `Index`. -/// -/// This object is serialized on disk in the `meta.json` file. -/// It keeps information about -/// * the searchable segments, -/// * the index docstamp -/// * the schema -/// -#[derive(Clone,Debug,RustcDecodable,RustcEncodable)] -pub struct IndexMeta { - committed_segments: Vec, - uncommitted_segments: Vec, - schema: Schema, - docstamp: u64, -} - -impl IndexMeta { - fn with_schema(schema: Schema) -> IndexMeta { - IndexMeta { - committed_segments: Vec::new(), - uncommitted_segments: Vec::new(), - schema: schema, - docstamp: 0u64, - } - } -} - -lazy_static! { - static ref META_FILEPATH: PathBuf = PathBuf::from("meta.json"); -} - + fn load_metas(directory: &Directory) -> Result { let meta_file = try!(directory.open_read(&META_FILEPATH)); @@ -70,10 +41,9 @@ fn load_metas(directory: &Directory) -> Result { Ok(loaded_meta) } -// pub fn commit(index: &mut Index, docstamp: u64) { -// index.docstamp = docstamp; -// index.segment_manager.commit(); -// } +pub fn set_metas(index: &mut Index, docstamp: u64) { + index.docstamp = docstamp; +} /// Tantivy's Search Index pub struct Index { @@ -136,9 +106,10 @@ impl Index { /// Opens a new directory from a directory. pub fn from_directory(directory: Box, schema: Schema) -> Result { - let mut index = try!(Index::create_from_metas(directory, IndexMeta::with_schema(schema))); - try!(index.save_metas()); - Ok(index) + Index::create_from_metas( + directory, + IndexMeta::with_schema(schema) + ) } /// Opens a new directory from an index path. @@ -220,32 +191,6 @@ impl Index { self.segment(SegmentId::generate_random()) } - fn create_metas(&self,) -> IndexMeta { - let (committed_segments, uncommitted_segments) = self.segment_manager.segment_metas(); - IndexMeta { - committed_segments: committed_segments, - uncommitted_segments: uncommitted_segments, - schema: self.schema.clone(), - docstamp: self.docstamp, - } - } - - /// Save the index meta file. - /// This operation is atomic : - /// Either - // - it fails, in which case an error is returned, - /// and the `meta.json` remains untouched, - /// - it success, and `meta.json` is written - /// and flushed. - pub fn save_metas(&mut self,) -> Result<()> { - let metas = self.create_metas(); - let mut w = Vec::new(); - try!(write!(&mut w, "{}\n", json::as_pretty_json(&metas))); - self.directory - .atomic_write(&META_FILEPATH, &w[..]) - .map_err(From::from) - } - /// Creates a new generation of searchers after /// a change of the set of searchable indexes. /// diff --git a/src/core/index_meta.rs b/src/core/index_meta.rs new file mode 100644 index 000000000..ff035778a --- /dev/null +++ b/src/core/index_meta.rs @@ -0,0 +1,47 @@ + +use schema::Schema; +use core::SegmentId; + + +/// MetaInformation about the `Index`. +/// +/// This object is serialized on disk in the `meta.json` file. +/// It keeps information about +/// * the searchable segments, +/// * the index docstamp +/// * the schema +/// +#[derive(Clone,Debug,RustcDecodable,RustcEncodable)] +pub struct IndexMeta { + pub committed_segments: Vec, + pub uncommitted_segments: Vec, + pub schema: Schema, + pub docstamp: u64, +} + +impl IndexMeta { + pub fn with_schema(schema: Schema) -> IndexMeta { + IndexMeta { + committed_segments: Vec::new(), + uncommitted_segments: Vec::new(), + schema: schema, + docstamp: 0u64, + } + } +} + +#[derive(Clone, Debug, RustcDecodable,RustcEncodable)] +pub struct SegmentMeta { + pub segment_id: SegmentId, + pub num_docs: usize, +} + +#[cfg(test)] +impl SegmentMeta { + pub fn new(segment_id: SegmentId, num_docs: usize) -> SegmentMeta { + SegmentMeta { + segment_id: segment_id, + num_docs: num_docs, + } + } +} \ No newline at end of file diff --git a/src/core/mod.rs b/src/core/mod.rs index bd1fa38ed..f16911302 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -5,8 +5,12 @@ mod segment_reader; mod segment_id; mod segment_component; mod segment; +mod index_meta; mod pool; + +use std::path::PathBuf; + pub use self::segment_component::SegmentComponent; pub use self::segment_id::SegmentId; pub use self::segment_reader::SegmentReader; @@ -14,19 +18,9 @@ pub use self::segment::Segment; pub use self::segment::SegmentInfo; pub use self::segment::SerializableSegment; pub use self::index::Index; +pub use self::index_meta::{IndexMeta, SegmentMeta}; -#[derive(Clone, Debug, RustcDecodable,RustcEncodable)] -pub struct SegmentMeta { - pub segment_id: SegmentId, - pub num_docs: usize, -} -#[cfg(test)] -impl SegmentMeta { - pub fn new(segment_id: SegmentId, num_docs: usize) -> SegmentMeta { - SegmentMeta { - segment_id: segment_id, - num_docs: num_docs, - } - } +lazy_static! { + pub static ref META_FILEPATH: PathBuf = PathBuf::from("meta.json"); } \ No newline at end of file diff --git a/src/core/pool.rs b/src/core/pool.rs index e42375bb5..b06b6110a 100644 --- a/src/core/pool.rs +++ b/src/core/pool.rs @@ -37,15 +37,16 @@ impl Pool { } fn generation(&self,) -> usize { - self.generation.load(Ordering::SeqCst) + self.generation.load(Ordering::Acquire) } pub fn inc_generation(&self,) { - self.generation.fetch_add(1, Ordering::SeqCst); + self.generation.fetch_add(1, Ordering::Release); } pub fn acquire(&self,) -> LeasedItem { - let generation = self.generation.load(Ordering::SeqCst); + let generation = self.generation.load(Ordering::Acquire); + println("generation {}"); loop { let gen_item = self.queue.pop(); if gen_item.generation >= generation { diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 12695717d..37c9072b1 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -2,13 +2,16 @@ use schema::Schema; use schema::Document; use indexer::SegmentSerializer; use core::Index; +use Directory; use core::SerializableSegment; use core::Segment; use std::thread::JoinHandle; +use rustc_serialize::json; use indexer::SegmentWriter; use indexer::MergeCandidate; use std::clone::Clone; use std::io; +use std::io::Write; use indexer::MergePolicy; use std::thread; use std::mem; @@ -18,6 +21,8 @@ use datastruct::stacker::Heap; use std::mem::swap; use chan; use core::SegmentMeta; +use core::IndexMeta; +use core::META_FILEPATH; use super::super::core::index::get_segment_manager; use super::segment_manager::{CommitState, SegmentManager, get_segment_ready_for_commit}; use Result; @@ -40,6 +45,44 @@ type DocumentReceiver = chan::Receiver; type SegmentUpdateSender = chan::Sender; type SegmentUpdateReceiver = chan::Receiver; + + +fn create_metas(segment_manager: &SegmentManager, + schema: Schema, + docstamp: u64) -> IndexMeta { + let (committed_segments, uncommitted_segments) = segment_manager.segment_metas(); + IndexMeta { + committed_segments: committed_segments, + uncommitted_segments: uncommitted_segments, + schema: schema, + docstamp: docstamp, + } +} + + +/// Save the index meta file. +/// This operation is atomic : +/// Either +// - it fails, in which case an error is returned, +/// and the `meta.json` remains untouched, +/// - it success, and `meta.json` is written +/// and flushed. +/// +/// This method is not part of tantivy's public API +pub fn save_metas( + segment_manager: &SegmentManager, + schema: Schema, + docstamp: u64, + directory: &mut Directory) -> Result<()> { + let metas = create_metas(segment_manager, schema, docstamp); + let mut w = Vec::new(); + try!(write!(&mut w, "{}\n", json::as_pretty_json(&metas))); + directory + .atomic_write(&META_FILEPATH, &w[..]) + .map_err(From::from) +} + + /// `IndexWriter` is the user entry-point to add document to an index. /// /// It manages a small number of indexing thread, as well as a shared @@ -61,7 +104,9 @@ pub struct IndexWriter { worker_id: usize, num_threads: usize, - docstamp: u64, + + uncommitted_docstamp: u64, + committed_docstamp: u64, } // IndexWriter cannot be sent to another thread. @@ -207,8 +252,12 @@ fn process_segment_updates(mut index: Index, if generation != new_generation { generation = new_generation; - // saving the meta file. - index.save_metas().expect("Could not save metas."); + // saving the meta file. + save_metas( + segment_manager, + index.schema(), + index.docstamp(), + index.directory_mut()).expect("Could not save metas."); // update the searchers so that they eventually will // use the new segments. @@ -326,7 +375,15 @@ impl IndexWriter { } fn on_change(&mut self,) -> Result<()> { - try!(self.index.save_metas()); + let segment_manager = get_segment_manager(&self.index); + // saving the meta file. + try!( + save_metas( + &*segment_manager, + self.index.schema(), + self.committed_docstamp, + self.index.directory_mut()) + ); try!(self.index.load_searchers()); Ok(()) } @@ -365,7 +422,8 @@ impl IndexWriter { workers_join_handle: Vec::new(), num_threads: num_threads, - docstamp: index.docstamp(), + committed_docstamp: index.docstamp(), + uncommitted_docstamp: index.docstamp(), worker_id: 0, }; try!(index_writer.start_workers()); @@ -498,9 +556,9 @@ impl IndexWriter { } try!(self.on_change()); - // reset the docstamp to what it was before - self.docstamp = self.index.docstamp(); - Ok(self.docstamp) + // reset the docstamp + self.uncommitted_docstamp = self.committed_docstamp; + Ok(self.committed_docstamp) } @@ -525,7 +583,7 @@ impl IndexWriter { self.recreate_document_channel(); // Docstamp of the last document in this commit. - let commit_docstamp = self.docstamp; + self.committed_docstamp = self.uncommitted_docstamp; let mut former_workers_join_handle = Vec::new(); swap(&mut former_workers_join_handle, &mut self.workers_join_handle); @@ -541,10 +599,10 @@ impl IndexWriter { } self.segment_update_sender.send(SegmentUpdate::Commit); - + // super::super::core::index::commit(&mut self.index, commit_docstamp); try!(self.on_change()); - Ok(commit_docstamp) + Ok(self.committed_docstamp) } @@ -560,8 +618,8 @@ impl IndexWriter { /// have been added since the creation of the index. pub fn add_document(&mut self, doc: Document) -> io::Result { self.document_sender.send(doc); - self.docstamp += 1; - Ok(self.docstamp) + self.uncommitted_docstamp += 1; + Ok(self.uncommitted_docstamp) }