issue/43 Segment have a commit opstamp

This commit is contained in:
Paul Masurel
2017-02-01 10:06:32 +09:00
parent ca977fb17b
commit 09782858da
10 changed files with 157 additions and 106 deletions

View File

@@ -21,6 +21,7 @@ use core::IndexMeta;
use core::META_FILEPATH;
use super::segment::create_segment;
use indexer::segment_updater::save_new_metas;
use directory::error::{FileError, OpenWriteError};
const NUM_SEARCHERS: usize = 12;
@@ -41,6 +42,45 @@ pub struct Index {
docstamp: u64,
}
/// Deletes all of the document of the segment.
/// This is called when there is a merge or a rollback.
///
/// # Disclaimer
/// If deletion of a file fails (e.g. a file
/// was read-only.), the method does not
/// fail and just logs an error when it fails.
pub fn delete_segment(directory: &Directory, segment_id: SegmentId) {
info!("Deleting segment {:?}", segment_id);
let segment_filepaths_res = directory.ls_starting_with(
&*segment_id.uuid_string()
);
match segment_filepaths_res {
Ok(segment_filepaths) => {
for segment_filepath in &segment_filepaths {
if let Err(err) = directory.delete(&segment_filepath) {
match err {
FileError::FileDoesNotExist(_) => {
// this is normal behavior.
// the position file for instance may not exists.
}
FileError::IOError(err) => {
error!("Failed to remove {:?} : {:?}", segment_id, err);
}
}
}
}
}
Err(_) => {
error!("Failed to list files of segment {:?} for deletion.", segment_id.uuid_string());
}
}
}
impl Index {
/// Creates a new index using the `RAMDirectory`.
///
@@ -76,7 +116,7 @@ impl Index {
/// Creates a new index given a directory and an `IndexMeta`.
fn create_from_metas(directory: Box<Directory>, metas: IndexMeta) -> Result<Index> {
let schema = metas.schema.clone();
let docstamp = metas.docstamp;
let docstamp = metas.opstamp;
// TODO log somethings is uncommitted is not empty.
let index = Index {
directory: directory,
@@ -143,27 +183,33 @@ impl Index {
/// Returns the list of segments that are searchable
pub fn searchable_segments(&self) -> Result<Vec<Segment>> {
let searchable_segment_ids = self.searchable_segment_ids()?;
let metas = load_metas(self.directory())?;
let searchable_segment_ids = metas
.committed_segments
.iter()
.map(|segment_meta| segment_meta.segment_id)
.collect::<Vec<_>>();
let commit_opstamp = metas.opstamp;
Ok(searchable_segment_ids
.into_iter()
.map(|segment_id| self.segment(segment_id))
.map(|segment_id| self.segment(segment_id, commit_opstamp))
.collect())
}
/// Remove all of the file associated with the segment.
///
/// This method cannot fail. If a problem occurs,
/// some files may end up never being removed.
/// The error will only be logged.
pub fn delete_segment(&self, segment_id: SegmentId) {
self.segment(segment_id).delete();
delete_segment(self.directory(), segment_id);
}
/// Return a segment object given a `segment_id`
///
/// The segment may or may not exist.
pub fn segment(&self, segment_id: SegmentId) -> Segment {
create_segment(self.clone(), segment_id)
pub fn segment(&self, segment_id: SegmentId, commit_opstamp: u64) -> Segment {
create_segment(self.clone(), segment_id, commit_opstamp)
}
/// Return a reference to the index directory.
@@ -179,24 +225,22 @@ impl Index {
/// Reads the meta.json and returns the list of
/// committed segments.
pub fn committed_segments(&self) -> Result<Vec<SegmentMeta>> {
Ok(load_metas(self.directory())?.committed_segments)
}
/// Returns the list of segment ids that are searchable.
pub fn searchable_segment_ids(&self) -> Result<Vec<SegmentId>> {
self.committed_segments()
.map(|commited_segments| {
commited_segments
.iter()
.map(|segment_meta| segment_meta.segment_id)
.collect()
})
Ok(load_metas(self.directory())?
.committed_segments
.iter()
.map(|segment_meta| segment_meta.segment_id)
.collect())
}
/// Creates a new segment.
pub fn new_segment(&self) -> Segment {
self.segment(SegmentId::generate_random())
pub fn new_segment(&self, opstamp: u64) -> Segment {
self.segment(SegmentId::generate_random(), opstamp)
}
/// Creates a new generation of searchers after

View File

@@ -14,7 +14,7 @@ pub struct IndexMeta {
pub committed_segments: Vec<SegmentMeta>,
pub uncommitted_segments: Vec<SegmentMeta>,
pub schema: Schema,
pub docstamp: u64,
pub opstamp: u64,
}
impl IndexMeta {
@@ -23,7 +23,7 @@ impl IndexMeta {
committed_segments: Vec::new(),
uncommitted_segments: Vec::new(),
schema: schema,
docstamp: 0u64,
opstamp: 0u64,
}
}
}

View File

@@ -11,13 +11,12 @@ use core::Index;
use std::result;
use directory::error::{FileError, OpenWriteError};
/// A segment is a piece of the index.
#[derive(Clone)]
pub struct Segment {
index: Index,
segment_id: SegmentId,
commit_opstamp: u64,
}
impl fmt::Debug for Segment {
@@ -30,10 +29,11 @@ impl fmt::Debug for Segment {
/// Creates a new segment given an `Index` and a `SegmentId`
///
/// The function is here to make it private outside `tantivy`.
pub fn create_segment(index: Index, segment_id: SegmentId) -> Segment {
pub fn create_segment(index: Index, segment_id: SegmentId, commit_opstamp: u64) -> Segment {
Segment {
index: index,
segment_id: segment_id,
commit_opstamp: commit_opstamp,
}
}
@@ -59,42 +59,6 @@ impl Segment {
self.segment_id.relative_path(component)
}
/// Deletes all of the document of the segment.
/// This is called when there is a merge or a rollback.
///
/// # Disclaimer
/// If deletion of a file fails (e.g. a file
/// was read-only.), the method does not
/// fail and just logs an error when it fails.
pub fn delete(&self) {
info!("Deleting segment {:?}", self.segment_id);
let segment_filepaths_res = self.index.directory().ls_starting_with(
&*self.segment_id.uuid_string()
);
match segment_filepaths_res {
Ok(segment_filepaths) => {
for segment_filepath in &segment_filepaths {
if let Err(err) = self.index.directory().delete(&segment_filepath) {
match err {
FileError::FileDoesNotExist(_) => {
// this is normal behavior.
// the position file for instance may not exists.
}
FileError::IOError(err) => {
error!("Failed to remove {:?} : {:?}", self.segment_id, err);
}
}
}
}
}
Err(_) => {
error!("Failed to list files of segment {:?} for deletion.", self.segment_id.uuid_string());
}
}
}
/// Open one of the component file for read.
pub fn open_read(&self, component: SegmentComponent) -> result::Result<ReadOnlySource, FileError> {
let path = self.relative_path(component);

View File

@@ -5,6 +5,7 @@ pub struct SegmentMeta {
pub segment_id: SegmentId,
pub num_docs: u32,
pub num_deleted_docs: u32,
pub opstamp: u64,
}
#[cfg(test)]
@@ -14,6 +15,7 @@ impl SegmentMeta {
segment_id: segment_id,
num_docs: num_docs,
num_deleted_docs: 0,
opstamp: 0u64,
}
}
}

View File

@@ -32,7 +32,7 @@ pub enum Error {
/// The data within is corrupted.
///
/// For instance, it contains invalid JSON.
CorruptedFile(PathBuf, Box<error::Error + Send>),
CorruptedFile(PathBuf, Box<error::Error + Send + Sync>),
/// Invalid argument was passed by the user.
InvalidArgument(String),
/// An Error happened in one of the thread

View File

@@ -125,6 +125,7 @@ fn index_documents(heap: &mut Heap,
segment_id: segment_id,
num_docs: num_docs,
num_deleted_docs: num_deleted_docs as u32,
opstamp: last_opstamp,
};
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor.clone());
@@ -185,14 +186,13 @@ impl IndexWriter {
let generation = self.generation;
let join_handle: JoinHandle<Result<()>> = try!(thread::Builder::new()
let join_handle: JoinHandle<Result<()>> =
thread::Builder::new()
.name(format!("indexing thread {} for gen {}", self.worker_id, generation))
.spawn(move || {
let mut delete_cursor_clone = delete_cursor.clone();
loop {
let segment = index.new_segment();
let mut document_iterator = document_receiver_clone.clone()
.into_iter()
.peekable();
@@ -203,25 +203,28 @@ impl IndexWriter {
// this is a valid guarantee as the
// peeked document now belongs to
// our local iterator.
if document_iterator.peek().is_some() {
let valid_generation = try!(index_documents(&mut heap,
segment,
&schema,
generation,
&mut document_iterator,
&mut segment_updater,
&mut delete_cursor_clone));
if valid_generation {
return Ok(());
}
} else {
let opstamp: u64;
if let Some(operation) = document_iterator.peek() {
opstamp = operation.opstamp;
}
else {
// No more documents.
// Happens when there is a commit, or if the `IndexWriter`
// was dropped.
return Ok(());
opstamp = 0u64;
return Ok(())
}
let segment = index.new_segment(opstamp);
let valid_generation = index_documents(&mut heap,
segment,
&schema,
generation,
&mut document_iterator,
&mut segment_updater,
&mut delete_cursor_clone)?;
}
}));
})?;
self.worker_id += 1;
self.workers_join_handle.push(join_handle);
Ok(())
@@ -308,8 +311,8 @@ impl IndexWriter {
}
/// Merges a given list of segments
pub fn merge(&mut self, segments: &[SegmentId]) -> impl Future<Item=SegmentEntry, Error=Canceled> {
self.segment_updater.start_merge(segments.to_vec())
pub fn merge(&mut self, segment_ids: &[SegmentId]) -> impl Future<Item=SegmentEntry, Error=Canceled> {
self.segment_updater.start_merge(segment_ids)
}
/// Closes the current document channel send.

View File

@@ -63,6 +63,14 @@ impl SegmentManager {
}),
}
}
pub fn segment_entry(&self, segment_id: &SegmentId) -> Option<SegmentEntry> {
let registers = self.read();
registers
.committed
.segment_entry(segment_id)
.or_else(|| registers.uncommitted.segment_entry(segment_id))
}
// Lock poisoning should never happen :
// The lock is acquired and released within this class,
@@ -113,16 +121,17 @@ impl SegmentManager {
registers_lock.uncommitted.add_segment_entry(segment_entry);
}
pub fn end_merge(&self, merged_segment_ids: &[SegmentId], merged_segment_entry: SegmentEntry) {
pub fn end_merge(&self, merged_segment_metas: &[SegmentMeta], merged_segment_entry: SegmentEntry) {
let mut registers_lock = self.write();
if registers_lock.uncommitted.contains_all(merged_segment_ids) {
for segment_id in merged_segment_ids {
let merged_segment_ids: Vec<SegmentId> = merged_segment_metas.iter().map(|meta| meta.segment_id).collect();
if registers_lock.uncommitted.contains_all(&merged_segment_ids) {
for segment_id in &merged_segment_ids {
registers_lock.uncommitted.remove_segment(segment_id);
}
registers_lock.uncommitted.add_segment_entry(merged_segment_entry);
}
else if registers_lock.committed.contains_all(merged_segment_ids) {
for segment_id in merged_segment_ids {
else if registers_lock.committed.contains_all(&merged_segment_ids) {
for segment_id in &merged_segment_ids {
registers_lock.committed.remove_segment(segment_id);
}
registers_lock.committed.add_segment_entry(merged_segment_entry);

View File

@@ -32,6 +32,10 @@ impl SegmentEntry {
pub fn segment_id(&self) -> SegmentId {
self.meta.segment_id
}
pub fn meta(&self) -> &SegmentMeta {
&self.meta
}
fn start_merge(&mut self,) {
self.state = SegmentState::InMerge;
@@ -120,7 +124,6 @@ impl SegmentRegister {
.collect()
}
#[cfg(test)]
pub fn segment_entry(&self, segment_id: &SegmentId) -> Option<SegmentEntry> {
self.segment_states
.get(&segment_id)

View File

@@ -1,6 +1,7 @@
#![allow(for_kv_map)]
use core::Index;
use Error;
use core::Segment;
use indexer::{MergePolicy, DefaultMergePolicy};
use core::SegmentId;
@@ -35,13 +36,13 @@ use std::io::Write;
use super::segment_manager::{SegmentManager, get_segments};
fn create_metas(segment_manager: &SegmentManager, schema: Schema, docstamp: u64) -> IndexMeta {
fn create_metas(segment_manager: &SegmentManager, schema: Schema, opstamp: u64) -> IndexMeta {
let (committed_segments, uncommitted_segments) = segment_manager.segment_metas();
IndexMeta {
committed_segments: committed_segments,
uncommitted_segments: uncommitted_segments,
schema: schema,
docstamp: docstamp,
opstamp: opstamp,
}
}
@@ -104,7 +105,7 @@ struct InnerSegmentUpdater {
segment_manager: SegmentManager,
merge_policy: RwLock<Box<MergePolicy>>,
merging_thread_id: AtomicUsize,
merging_threads: RwLock<HashMap<usize, JoinHandle<SegmentEntry>>>,
merging_threads: RwLock<HashMap<usize, JoinHandle<Result<SegmentEntry>>>>,
generation: AtomicUsize,
}
@@ -184,28 +185,52 @@ impl SegmentUpdater {
}
pub fn start_merge(&self, segment_ids: Vec<SegmentId>) -> impl Future<Item=SegmentEntry, Error=Canceled> {
pub fn start_merge(&self, segment_ids: &[SegmentId]) -> impl Future<Item=SegmentEntry, Error=Canceled> {
self.0.segment_manager.start_merge(&segment_ids);
self.0.segment_manager.start_merge(segment_ids);
let segment_updater_clone = self.clone();
let segment_ids_vec = segment_ids.to_vec();
let merging_thread_id = self.get_merging_thread_id();
let (merging_future_send, merging_future_recv) = oneshot();
if segment_ids.is_empty() {
return merging_future_recv;
}
let merging_join_handle = thread::spawn(move || {
info!("Start merge: {:?}", segment_ids);
info!("Start merge: {:?}", segment_ids_vec);
let ref index = segment_updater_clone.0.index;
let schema = index.schema();
let segments: Vec<Segment> = segment_ids
let segment_metas: Vec<SegmentMeta> = segment_ids_vec
.iter()
.map(|&segment_id| index.segment(segment_id))
.map(|segment_id|
segment_updater_clone.0.segment_manager
.segment_entry(segment_id)
.map(|segment_entry| segment_entry.meta().clone())
.ok_or(Error::InvalidArgument(format!("Segment({:?}) does not exist anymore", segment_id)))
)
.collect::<Result<_>>()?;
let segments: Vec<Segment> = segment_metas
.iter()
.map(|ref segment_metas| index.segment(segment_metas.segment_id, segment_metas.opstamp))
.collect();
// An IndexMerger is like a "view" of our merged segments.
// TODO unwrap
let merger: IndexMerger = IndexMerger::open(schema, &segments[..]).expect("Creating index merger failed");
let mut merged_segment = index.new_segment();
let opstamp = segment_metas
.iter()
.map(|meta| meta.opstamp)
.max()
.unwrap();
let mut merged_segment = index.new_segment(opstamp);
// ... we just serialize this index merger in our new segment
// to merge the two segments.
@@ -215,19 +240,20 @@ impl SegmentUpdater {
segment_id: merged_segment.id(),
num_docs: num_docs,
num_deleted_docs: 0u32,
opstamp: opstamp,
};
// TODO fix delete cursor
let delete_queue = DeleteQueue::default();
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor());
segment_updater_clone
.end_merge(segment_ids.clone(), segment_entry.clone())
.end_merge(segment_metas.clone(), segment_entry.clone())
.wait()
.unwrap();
merging_future_send.complete(segment_entry.clone());
segment_updater_clone.0.merging_threads.write().unwrap().remove(&merging_thread_id);
segment_entry
Ok(segment_entry)
});
self.0.merging_threads.write().unwrap().insert(merging_thread_id, merging_join_handle);
merging_future_recv
@@ -242,26 +268,26 @@ impl SegmentUpdater {
let mut merge_candidates = merge_policy.compute_merge_candidates(&uncommitted_segments);
let committed_merge_candidates = merge_policy.compute_merge_candidates(&committed_segments);
merge_candidates.extend_from_slice(&committed_merge_candidates[..]);
for MergeCandidate(segment_ids) in merge_candidates {
self.start_merge(segment_ids);
for MergeCandidate(segment_metas) in merge_candidates {
self.start_merge(&segment_metas);
}
}
fn end_merge(&self,
merged_segment_ids: Vec<SegmentId>,
fn end_merge(&self,
merged_segment_metas: Vec<SegmentMeta>,
resulting_segment_entry: SegmentEntry) -> impl Future<Item=(), Error=&'static str> {
self.run_async(move |segment_updater| {
segment_updater.0.segment_manager.end_merge(&merged_segment_ids, resulting_segment_entry);
segment_updater.0.segment_manager.end_merge(&merged_segment_metas, resulting_segment_entry);
let mut directory = segment_updater.0.index.directory().box_clone();
save_metas(
&segment_updater.0.segment_manager,
segment_updater.0.index.schema(),
segment_updater.0.index.docstamp(),
directory.borrow_mut()).expect("Could not save metas.");
for segment_id in merged_segment_ids {
segment_updater.0.index.delete_segment(segment_id);
for segment_meta in merged_segment_metas {
segment_updater.0.index.delete_segment(segment_meta.segment_id);
}
})

View File

@@ -61,7 +61,7 @@ mod tests {
let text_field = schema_builder.add_text_field("text", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut segment = index.new_segment();
let mut segment = index.new_segment(0u64);
let mut posting_serializer = PostingsSerializer::open(&mut segment).unwrap();
let term = Term::from_field_text(text_field, "abc");
posting_serializer.new_term(&term, 3).unwrap();
@@ -81,7 +81,7 @@ mod tests {
let text_field = schema_builder.add_text_field("text", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema.clone());
let segment = index.new_segment();
let segment = index.new_segment(0u64);
let heap = Heap::with_capacity(10_000_000);
{
let mut segment_writer = SegmentWriter::for_segment(&heap, segment.clone(), &schema).unwrap();