bug/4 Pretty bad race condition remaining

1
This commit is contained in:
Paul Masurel
2016-10-12 10:41:41 +09:00
parent 97099e9911
commit 3bc9978d95
5 changed files with 143 additions and 98 deletions

View File

@@ -1,8 +1,7 @@
use Result;
use Error;
use std::path::{PathBuf, Path};
use std::path::Path;
use schema::Schema;
use std::io::Write;
use std::sync::Arc;
use std::fmt;
use rustc_serialize::json;
@@ -16,49 +15,21 @@ use super::segment::Segment;
use core::SegmentReader;
use super::pool::Pool;
use super::pool::LeasedItem;
use core::SegmentMeta;
use indexer::SegmentManager;
use indexer::{MergePolicy, SimpleMergePolicy};
use core::IndexMeta;
use core::META_FILEPATH;
use super::segment::create_segment;
const NUM_SEARCHERS: usize = 12;
/// Accessor to the index segment manager
///
/// This method is not part of tantivy's public API
pub fn get_segment_manager(index: &Index) -> Arc<SegmentManager> {
index.segment_manager.clone()
}
/// MetaInformation about the `Index`.
///
/// This object is serialized on disk in the `meta.json` file.
/// It keeps information about
/// * the searchable segments,
/// * the index docstamp
/// * the schema
///
#[derive(Clone,Debug,RustcDecodable,RustcEncodable)]
pub struct IndexMeta {
committed_segments: Vec<SegmentMeta>,
uncommitted_segments: Vec<SegmentMeta>,
schema: Schema,
docstamp: u64,
}
impl IndexMeta {
fn with_schema(schema: Schema) -> IndexMeta {
IndexMeta {
committed_segments: Vec::new(),
uncommitted_segments: Vec::new(),
schema: schema,
docstamp: 0u64,
}
}
}
lazy_static! {
static ref META_FILEPATH: PathBuf = PathBuf::from("meta.json");
}
fn load_metas(directory: &Directory) -> Result<IndexMeta> {
let meta_file = try!(directory.open_read(&META_FILEPATH));
@@ -70,10 +41,9 @@ fn load_metas(directory: &Directory) -> Result<IndexMeta> {
Ok(loaded_meta)
}
// pub fn commit(index: &mut Index, docstamp: u64) {
// index.docstamp = docstamp;
// index.segment_manager.commit();
// }
pub fn set_metas(index: &mut Index, docstamp: u64) {
index.docstamp = docstamp;
}
/// Tantivy's Search Index
pub struct Index {
@@ -136,9 +106,10 @@ impl Index {
/// Opens a new directory from a directory.
pub fn from_directory(directory: Box<Directory>, schema: Schema) -> Result<Index> {
let mut index = try!(Index::create_from_metas(directory, IndexMeta::with_schema(schema)));
try!(index.save_metas());
Ok(index)
Index::create_from_metas(
directory,
IndexMeta::with_schema(schema)
)
}
/// Opens a new directory from an index path.
@@ -220,32 +191,6 @@ impl Index {
self.segment(SegmentId::generate_random())
}
fn create_metas(&self,) -> IndexMeta {
let (committed_segments, uncommitted_segments) = self.segment_manager.segment_metas();
IndexMeta {
committed_segments: committed_segments,
uncommitted_segments: uncommitted_segments,
schema: self.schema.clone(),
docstamp: self.docstamp,
}
}
/// Save the index meta file.
/// This operation is atomic :
/// Either
// - it fails, in which case an error is returned,
/// and the `meta.json` remains untouched,
/// - it success, and `meta.json` is written
/// and flushed.
pub fn save_metas(&mut self,) -> Result<()> {
let metas = self.create_metas();
let mut w = Vec::new();
try!(write!(&mut w, "{}\n", json::as_pretty_json(&metas)));
self.directory
.atomic_write(&META_FILEPATH, &w[..])
.map_err(From::from)
}
/// Creates a new generation of searchers after
/// a change of the set of searchable indexes.
///

47
src/core/index_meta.rs Normal file
View File

@@ -0,0 +1,47 @@
use schema::Schema;
use core::SegmentId;
/// MetaInformation about the `Index`.
///
/// This object is serialized on disk in the `meta.json` file.
/// It keeps information about
/// * the searchable segments,
/// * the index docstamp
/// * the schema
///
#[derive(Clone,Debug,RustcDecodable,RustcEncodable)]
pub struct IndexMeta {
pub committed_segments: Vec<SegmentMeta>,
pub uncommitted_segments: Vec<SegmentMeta>,
pub schema: Schema,
pub docstamp: u64,
}
impl IndexMeta {
pub fn with_schema(schema: Schema) -> IndexMeta {
IndexMeta {
committed_segments: Vec::new(),
uncommitted_segments: Vec::new(),
schema: schema,
docstamp: 0u64,
}
}
}
#[derive(Clone, Debug, RustcDecodable,RustcEncodable)]
pub struct SegmentMeta {
pub segment_id: SegmentId,
pub num_docs: usize,
}
#[cfg(test)]
impl SegmentMeta {
pub fn new(segment_id: SegmentId, num_docs: usize) -> SegmentMeta {
SegmentMeta {
segment_id: segment_id,
num_docs: num_docs,
}
}
}

View File

@@ -5,8 +5,12 @@ mod segment_reader;
mod segment_id;
mod segment_component;
mod segment;
mod index_meta;
mod pool;
use std::path::PathBuf;
pub use self::segment_component::SegmentComponent;
pub use self::segment_id::SegmentId;
pub use self::segment_reader::SegmentReader;
@@ -14,19 +18,9 @@ pub use self::segment::Segment;
pub use self::segment::SegmentInfo;
pub use self::segment::SerializableSegment;
pub use self::index::Index;
pub use self::index_meta::{IndexMeta, SegmentMeta};
#[derive(Clone, Debug, RustcDecodable,RustcEncodable)]
pub struct SegmentMeta {
pub segment_id: SegmentId,
pub num_docs: usize,
}
#[cfg(test)]
impl SegmentMeta {
pub fn new(segment_id: SegmentId, num_docs: usize) -> SegmentMeta {
SegmentMeta {
segment_id: segment_id,
num_docs: num_docs,
}
}
lazy_static! {
pub static ref META_FILEPATH: PathBuf = PathBuf::from("meta.json");
}

View File

@@ -37,15 +37,16 @@ impl<T> Pool<T> {
}
fn generation(&self,) -> usize {
self.generation.load(Ordering::SeqCst)
self.generation.load(Ordering::Acquire)
}
pub fn inc_generation(&self,) {
self.generation.fetch_add(1, Ordering::SeqCst);
self.generation.fetch_add(1, Ordering::Release);
}
pub fn acquire(&self,) -> LeasedItem<T> {
let generation = self.generation.load(Ordering::SeqCst);
let generation = self.generation.load(Ordering::Acquire);
println("generation {}");
loop {
let gen_item = self.queue.pop();
if gen_item.generation >= generation {

View File

@@ -2,13 +2,16 @@ use schema::Schema;
use schema::Document;
use indexer::SegmentSerializer;
use core::Index;
use Directory;
use core::SerializableSegment;
use core::Segment;
use std::thread::JoinHandle;
use rustc_serialize::json;
use indexer::SegmentWriter;
use indexer::MergeCandidate;
use std::clone::Clone;
use std::io;
use std::io::Write;
use indexer::MergePolicy;
use std::thread;
use std::mem;
@@ -18,6 +21,8 @@ use datastruct::stacker::Heap;
use std::mem::swap;
use chan;
use core::SegmentMeta;
use core::IndexMeta;
use core::META_FILEPATH;
use super::super::core::index::get_segment_manager;
use super::segment_manager::{CommitState, SegmentManager, get_segment_ready_for_commit};
use Result;
@@ -40,6 +45,44 @@ type DocumentReceiver = chan::Receiver<Document>;
type SegmentUpdateSender = chan::Sender<SegmentUpdate>;
type SegmentUpdateReceiver = chan::Receiver<SegmentUpdate>;
fn create_metas(segment_manager: &SegmentManager,
schema: Schema,
docstamp: u64) -> IndexMeta {
let (committed_segments, uncommitted_segments) = segment_manager.segment_metas();
IndexMeta {
committed_segments: committed_segments,
uncommitted_segments: uncommitted_segments,
schema: schema,
docstamp: docstamp,
}
}
/// Save the index meta file.
/// This operation is atomic :
/// Either
// - it fails, in which case an error is returned,
/// and the `meta.json` remains untouched,
/// - it success, and `meta.json` is written
/// and flushed.
///
/// This method is not part of tantivy's public API
pub fn save_metas(
segment_manager: &SegmentManager,
schema: Schema,
docstamp: u64,
directory: &mut Directory) -> Result<()> {
let metas = create_metas(segment_manager, schema, docstamp);
let mut w = Vec::new();
try!(write!(&mut w, "{}\n", json::as_pretty_json(&metas)));
directory
.atomic_write(&META_FILEPATH, &w[..])
.map_err(From::from)
}
/// `IndexWriter` is the user entry-point to add document to an index.
///
/// It manages a small number of indexing thread, as well as a shared
@@ -61,7 +104,9 @@ pub struct IndexWriter {
worker_id: usize,
num_threads: usize,
docstamp: u64,
uncommitted_docstamp: u64,
committed_docstamp: u64,
}
// IndexWriter cannot be sent to another thread.
@@ -207,8 +252,12 @@ fn process_segment_updates(mut index: Index,
if generation != new_generation {
generation = new_generation;
// saving the meta file.
index.save_metas().expect("Could not save metas.");
// saving the meta file.
save_metas(
segment_manager,
index.schema(),
index.docstamp(),
index.directory_mut()).expect("Could not save metas.");
// update the searchers so that they eventually will
// use the new segments.
@@ -326,7 +375,15 @@ impl IndexWriter {
}
fn on_change(&mut self,) -> Result<()> {
try!(self.index.save_metas());
let segment_manager = get_segment_manager(&self.index);
// saving the meta file.
try!(
save_metas(
&*segment_manager,
self.index.schema(),
self.committed_docstamp,
self.index.directory_mut())
);
try!(self.index.load_searchers());
Ok(())
}
@@ -365,7 +422,8 @@ impl IndexWriter {
workers_join_handle: Vec::new(),
num_threads: num_threads,
docstamp: index.docstamp(),
committed_docstamp: index.docstamp(),
uncommitted_docstamp: index.docstamp(),
worker_id: 0,
};
try!(index_writer.start_workers());
@@ -498,9 +556,9 @@ impl IndexWriter {
}
try!(self.on_change());
// reset the docstamp to what it was before
self.docstamp = self.index.docstamp();
Ok(self.docstamp)
// reset the docstamp
self.uncommitted_docstamp = self.committed_docstamp;
Ok(self.committed_docstamp)
}
@@ -525,7 +583,7 @@ impl IndexWriter {
self.recreate_document_channel();
// Docstamp of the last document in this commit.
let commit_docstamp = self.docstamp;
self.committed_docstamp = self.uncommitted_docstamp;
let mut former_workers_join_handle = Vec::new();
swap(&mut former_workers_join_handle, &mut self.workers_join_handle);
@@ -541,10 +599,10 @@ impl IndexWriter {
}
self.segment_update_sender.send(SegmentUpdate::Commit);
// super::super::core::index::commit(&mut self.index, commit_docstamp);
try!(self.on_change());
Ok(commit_docstamp)
Ok(self.committed_docstamp)
}
@@ -560,8 +618,8 @@ impl IndexWriter {
/// have been added since the creation of the index.
pub fn add_document(&mut self, doc: Document) -> io::Result<u64> {
self.document_sender.send(doc);
self.docstamp += 1;
Ok(self.docstamp)
self.uncommitted_docstamp += 1;
Ok(self.uncommitted_docstamp)
}