diff --git a/src/core/index_meta.rs b/src/core/index_meta.rs index c5d33e370..8927c5403 100644 --- a/src/core/index_meta.rs +++ b/src/core/index_meta.rs @@ -2,6 +2,7 @@ use core::SegmentMeta; use schema::Schema; use serde_json; use std::fmt; +use Opstamp; /// Meta information about the `Index`. /// @@ -15,7 +16,7 @@ use std::fmt; pub struct IndexMeta { pub segments: Vec, pub schema: Schema, - pub opstamp: u64, + pub opstamp: Opstamp, #[serde(skip_serializing_if = "Option::is_none")] pub payload: Option, } diff --git a/src/core/segment.rs b/src/core/segment.rs index a747cfaec..b87208fcb 100644 --- a/src/core/segment.rs +++ b/src/core/segment.rs @@ -10,6 +10,7 @@ use schema::Schema; use std::fmt; use std::path::PathBuf; use std::result; +use Opstamp; use Result; /// A segment is a piece of the index. @@ -50,7 +51,7 @@ impl Segment { } #[doc(hidden)] - pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: u64) -> Segment { + pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: Opstamp) -> Segment { Segment { index: self.index, meta: self.meta.with_delete_meta(num_deleted_docs, opstamp), diff --git a/src/core/segment_meta.rs b/src/core/segment_meta.rs index 9478663ea..2f4b4177e 100644 --- a/src/core/segment_meta.rs +++ b/src/core/segment_meta.rs @@ -5,6 +5,7 @@ use serde; use std::collections::HashSet; use std::fmt; use std::path::PathBuf; +use Opstamp; lazy_static! { static ref INVENTORY: Inventory = { Inventory::new() }; @@ -13,7 +14,7 @@ lazy_static! { #[derive(Clone, Debug, Serialize, Deserialize)] struct DeleteMeta { num_deleted_docs: u32, - opstamp: u64, + opstamp: Opstamp, } /// `SegmentMeta` contains simple meta information about a segment. @@ -136,9 +137,9 @@ impl SegmentMeta { self.max_doc() - self.num_deleted_docs() } - /// Returns the opstamp of the last delete operation + /// Returns the `Opstamp` of the last delete operation /// taken in account in this segment. - pub fn delete_opstamp(&self) -> Option { + pub fn delete_opstamp(&self) -> Option { self.tracked .deletes .as_ref() @@ -152,7 +153,7 @@ impl SegmentMeta { } #[doc(hidden)] - pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: u64) -> SegmentMeta { + pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: Opstamp) -> SegmentMeta { let delete_meta = DeleteMeta { num_deleted_docs, opstamp, diff --git a/src/indexer/delete_queue.rs b/src/indexer/delete_queue.rs index be36bef7a..595691f98 100644 --- a/src/indexer/delete_queue.rs +++ b/src/indexer/delete_queue.rs @@ -2,6 +2,7 @@ use super::operation::DeleteOperation; use std::mem; use std::ops::DerefMut; use std::sync::{Arc, RwLock}; +use Opstamp; // The DeleteQueue is similar in conceptually to a multiple // consumer single producer broadcast channel. @@ -184,7 +185,7 @@ impl DeleteCursor { /// queue are consume and the next get will return None. /// - the next get will return the first operation with an /// `opstamp >= target_opstamp`. - pub fn skip_to(&mut self, target_opstamp: u64) { + pub fn skip_to(&mut self, target_opstamp: Opstamp) { // TODO Can be optimize as we work with block. while self.is_behind_opstamp(target_opstamp) { self.advance(); @@ -192,7 +193,7 @@ impl DeleteCursor { } #[cfg_attr(feature = "cargo-clippy", allow(clippy::wrong_self_convention))] - fn is_behind_opstamp(&mut self, target_opstamp: u64) -> bool { + fn is_behind_opstamp(&mut self, target_opstamp: Opstamp) -> bool { self.get() .map(|operation| operation.opstamp < target_opstamp) .unwrap_or(false) diff --git a/src/indexer/doc_opstamp_mapping.rs b/src/indexer/doc_opstamp_mapping.rs index 26bbe4c2f..d616800ab 100644 --- a/src/indexer/doc_opstamp_mapping.rs +++ b/src/indexer/doc_opstamp_mapping.rs @@ -1,5 +1,6 @@ use std::sync::Arc; use DocId; +use Opstamp; // Doc to opstamp is used to identify which // document should be deleted. @@ -23,7 +24,7 @@ pub enum DocToOpstampMapping { } impl From> for DocToOpstampMapping { - fn from(opstamps: Vec) -> DocToOpstampMapping { + fn from(opstamps: Vec) -> DocToOpstampMapping { DocToOpstampMapping::WithMap(Arc::new(opstamps)) } } @@ -35,7 +36,7 @@ impl DocToOpstampMapping { // // The edge case opstamp = some doc opstamp is in practise // never called. - pub fn compute_doc_limit(&self, target_opstamp: u64) -> DocId { + pub fn compute_doc_limit(&self, target_opstamp: Opstamp) -> DocId { match *self { DocToOpstampMapping::WithMap(ref doc_opstamps) => { match doc_opstamps.binary_search(&target_opstamp) { diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 94961f5fc..398e61358 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -30,6 +30,7 @@ use std::ops::Range; use std::sync::Arc; use std::thread; use std::thread::JoinHandle; +use Opstamp; use Result; // Size of the margin for the heap. A segment is closed when the remaining memory @@ -99,7 +100,7 @@ pub struct IndexWriter { delete_queue: DeleteQueue, stamper: Stamper, - committed_opstamp: u64, + committed_opstamp: Opstamp, } /// Open a new index writer. Attempts to acquire a lockfile. @@ -177,7 +178,7 @@ pub fn compute_deleted_bitset( segment_reader: &SegmentReader, delete_cursor: &mut DeleteCursor, doc_opstamps: &DocToOpstampMapping, - target_opstamp: u64, + target_opstamp: Opstamp, ) -> Result { let mut might_have_changed = false; @@ -219,7 +220,7 @@ pub fn compute_deleted_bitset( pub fn advance_deletes( mut segment: Segment, segment_entry: &mut SegmentEntry, - target_opstamp: u64, + target_opstamp: Opstamp, ) -> Result<()> { { if segment_entry.meta().delete_opstamp() == Some(target_opstamp) { @@ -299,11 +300,11 @@ fn index_documents( // the worker thread. assert!(num_docs > 0); - let doc_opstamps: Vec = segment_writer.finalize()?; + let doc_opstamps: Vec = segment_writer.finalize()?; let segment_meta = SegmentMeta::new(segment_id, num_docs); - let last_docstamp: u64 = *(doc_opstamps.last().unwrap()); + let last_docstamp: Opstamp = *(doc_opstamps.last().unwrap()); let delete_bitset_opt = if delete_cursor.get().is_some() { let doc_to_opstamps = DocToOpstampMapping::from(doc_opstamps); @@ -494,7 +495,7 @@ impl IndexWriter { /// state as it was after the last commit. /// /// The opstamp at the last commit is returned. - pub fn rollback(&mut self) -> Result<()> { + pub fn rollback(&mut self) -> Result { info!("Rolling back to opstamp {}", self.committed_opstamp); // marks the segment updater as killed. From now on, all @@ -529,7 +530,7 @@ impl IndexWriter { // was dropped with the index_writer. for _ in document_receiver.clone() {} - Ok(()) + Ok(self.committed_opstamp) } /// Prepares a commit. @@ -567,7 +568,7 @@ impl IndexWriter { info!("Preparing commit"); // this will drop the current document channel - // and recreate a new one channels. + // and recreate a new one. self.recreate_document_channel(); let former_workers_join_handle = mem::replace(&mut self.workers_join_handle, Vec::new()); @@ -601,7 +602,7 @@ impl IndexWriter { /// Commit returns the `opstamp` of the last document /// that made it in the commit. /// - pub fn commit(&mut self) -> Result { + pub fn commit(&mut self) -> Result { self.prepare_commit()?.commit() } @@ -617,7 +618,7 @@ impl IndexWriter { /// /// Like adds, the deletion itself will be visible /// only after calling `commit()`. - pub fn delete_term(&mut self, term: Term) -> u64 { + pub fn delete_term(&mut self, term: Term) -> Opstamp { let opstamp = self.stamper.stamp(); let delete_operation = DeleteOperation { opstamp, term }; self.delete_queue.push(delete_operation); @@ -631,7 +632,7 @@ impl IndexWriter { /// /// This is also the opstamp of the commit that is currently /// available for searchers. - pub fn commit_opstamp(&self) -> u64 { + pub fn commit_opstamp(&self) -> Opstamp { self.committed_opstamp } @@ -645,7 +646,7 @@ impl IndexWriter { /// /// Currently it represents the number of documents that /// have been added since the creation of the index. - pub fn add_document(&mut self, document: Document) -> u64 { + pub fn add_document(&mut self, document: Document) -> Opstamp { let opstamp = self.stamper.stamp(); let add_operation = AddOperation { opstamp, document }; let send_result = self.operation_sender.send(vec![add_operation]); @@ -662,7 +663,7 @@ impl IndexWriter { /// The total number of stamps generated by this method is `count + 1`; /// each operation gets a stamp from the `stamps` iterator and `last_opstamp` /// is for the batch itself. - fn get_batch_opstamps(&mut self, count: u64) -> (u64, Range) { + fn get_batch_opstamps(&mut self, count: Opstamp) -> (Opstamp, Range) { let Range { start, end } = self.stamper.stamps(count + 1u64); let last_opstamp = end - 1; let stamps = Range { @@ -688,7 +689,7 @@ impl IndexWriter { /// Like adds and deletes (see `IndexWriter.add_document` and /// `IndexWriter.delete_term`), the changes made by calling `run` will be /// visible to readers only after calling `commit()`. - pub fn run(&mut self, user_operations: Vec) -> u64 { + pub fn run(&mut self, user_operations: Vec) -> Opstamp { let count = user_operations.len() as u64; if count == 0 { return self.stamper.stamp(); diff --git a/src/indexer/merge_operation.rs b/src/indexer/merge_operation.rs index 9d7bcbca6..0474fefd8 100644 --- a/src/indexer/merge_operation.rs +++ b/src/indexer/merge_operation.rs @@ -1,5 +1,6 @@ use census::{Inventory, TrackedObject}; use std::collections::HashSet; +use Opstamp; use SegmentId; #[derive(Default)] @@ -17,8 +18,8 @@ impl MergeOperationInventory { } } -/// A `MergeOperation` has two role. -/// It carries all of the information required to describe a merge : +/// A `MergeOperation` has two roles. +/// It carries all of the information required to describe a merge: /// - `target_opstamp` is the opstamp up to which we want to consume the /// delete queue and reflect their deletes. /// - `segment_ids` is the list of segment to be merged. @@ -35,14 +36,14 @@ pub struct MergeOperation { } struct InnerMergeOperation { - target_opstamp: u64, + target_opstamp: Opstamp, segment_ids: Vec, } impl MergeOperation { pub fn new( inventory: &MergeOperationInventory, - target_opstamp: u64, + target_opstamp: Opstamp, segment_ids: Vec, ) -> MergeOperation { let inner_merge_operation = InnerMergeOperation { @@ -54,7 +55,7 @@ impl MergeOperation { } } - pub fn target_opstamp(&self) -> u64 { + pub fn target_opstamp(&self) -> Opstamp { self.inner.target_opstamp } diff --git a/src/indexer/operation.rs b/src/indexer/operation.rs index fe57a4a3a..03bfabb59 100644 --- a/src/indexer/operation.rs +++ b/src/indexer/operation.rs @@ -1,17 +1,18 @@ use schema::Document; use schema::Term; +use Opstamp; /// Timestamped Delete operation. #[derive(Clone, Eq, PartialEq, Debug)] pub struct DeleteOperation { - pub opstamp: u64, + pub opstamp: Opstamp, pub term: Term, } /// Timestamped Add operation. #[derive(Eq, PartialEq, Debug)] pub struct AddOperation { - pub opstamp: u64, + pub opstamp: Opstamp, pub document: Document, } diff --git a/src/indexer/prepared_commit.rs b/src/indexer/prepared_commit.rs index 4728af01a..92f47cdfd 100644 --- a/src/indexer/prepared_commit.rs +++ b/src/indexer/prepared_commit.rs @@ -1,15 +1,16 @@ use super::IndexWriter; +use Opstamp; use Result; /// A prepared commit pub struct PreparedCommit<'a> { index_writer: &'a mut IndexWriter, payload: Option, - opstamp: u64, + opstamp: Opstamp, } impl<'a> PreparedCommit<'a> { - pub(crate) fn new(index_writer: &'a mut IndexWriter, opstamp: u64) -> PreparedCommit { + pub(crate) fn new(index_writer: &'a mut IndexWriter, opstamp: Opstamp) -> PreparedCommit { PreparedCommit { index_writer, payload: None, @@ -17,7 +18,7 @@ impl<'a> PreparedCommit<'a> { } } - pub fn opstamp(&self) -> u64 { + pub fn opstamp(&self) -> Opstamp { self.opstamp } @@ -25,11 +26,11 @@ impl<'a> PreparedCommit<'a> { self.payload = Some(payload.to_string()) } - pub fn abort(self) -> Result<()> { + pub fn abort(self) -> Result { self.index_writer.rollback() } - pub fn commit(self) -> Result { + pub fn commit(self) -> Result { info!("committing {}", self.opstamp); self.index_writer .segment_updater() diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index 316dbd68a..4dd7a6804 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -36,6 +36,7 @@ use std::sync::Arc; use std::sync::RwLock; use std::thread; use std::thread::JoinHandle; +use Opstamp; use Result; /// Save the index meta file. @@ -224,7 +225,7 @@ impl SegmentUpdater { /// /// Tne method returns copies of the segment entries, /// updated with the delete information. - fn purge_deletes(&self, target_opstamp: u64) -> Result> { + fn purge_deletes(&self, target_opstamp: Opstamp) -> Result> { let mut segment_entries = self.0.segment_manager.segment_entries(); for segment_entry in &mut segment_entries { let segment = self.0.index.segment(segment_entry.meta().clone()); @@ -233,7 +234,7 @@ impl SegmentUpdater { Ok(segment_entries) } - pub fn save_metas(&self, opstamp: u64, commit_message: Option) { + pub fn save_metas(&self, opstamp: Opstamp, commit_message: Option) { if self.is_alive() { let index = &self.0.index; let directory = index.directory(); @@ -280,7 +281,7 @@ impl SegmentUpdater { .garbage_collect(|| self.0.segment_manager.list_files()); } - pub fn commit(&self, opstamp: u64, payload: Option) -> Result<()> { + pub fn commit(&self, opstamp: Opstamp, payload: Option) -> Result<()> { self.run_async(move |segment_updater| { if segment_updater.is_alive() { let segment_entries = segment_updater diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index 41fab5a6e..a7da06fd1 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -16,6 +16,7 @@ use tokenizer::BoxedTokenizer; use tokenizer::FacetTokenizer; use tokenizer::{TokenStream, Tokenizer}; use DocId; +use Opstamp; use Result; /// A `SegmentWriter` is in charge of creating segment index from a @@ -29,7 +30,7 @@ pub struct SegmentWriter { segment_serializer: SegmentSerializer, fast_field_writers: FastFieldsWriter, fieldnorms_writer: FieldNormsWriter, - doc_opstamps: Vec, + doc_opstamps: Vec, tokenizers: Vec>>, } diff --git a/src/indexer/stamper.rs b/src/indexer/stamper.rs index 6cd1c494a..631cbb527 100644 --- a/src/indexer/stamper.rs +++ b/src/indexer/stamper.rs @@ -1,70 +1,27 @@ use std::ops::Range; -use std::sync::atomic::Ordering; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; +use Opstamp; -// AtomicU64 have not landed in stable. -// For the moment let's just use AtomicUsize on -// x86/64 bit platform, and a mutex on other platform. -#[cfg(target_arch = "x86_64")] -mod archicture_impl { - - use std::sync::atomic::{AtomicUsize, Ordering}; - - #[derive(Default)] - pub struct AtomicU64Ersatz(AtomicUsize); - - impl AtomicU64Ersatz { - pub fn new(first_opstamp: u64) -> AtomicU64Ersatz { - AtomicU64Ersatz(AtomicUsize::new(first_opstamp as usize)) - } - - pub fn fetch_add(&self, val: u64, order: Ordering) -> u64 { - self.0.fetch_add(val as usize, order) as u64 - } - } -} - -#[cfg(not(target_arch = "x86_64"))] -mod archicture_impl { - - use std::sync::atomic::Ordering; - /// Under other architecture, we rely on a mutex. - use std::sync::RwLock; - - #[derive(Default)] - pub struct AtomicU64Ersatz(RwLock); - - impl AtomicU64Ersatz { - pub fn new(first_opstamp: u64) -> AtomicU64Ersatz { - AtomicU64Ersatz(RwLock::new(first_opstamp)) - } - - pub fn fetch_add(&self, incr: u64, _order: Ordering) -> u64 { - let mut lock = self.0.write().unwrap(); - let previous_val = *lock; - *lock = previous_val + incr; - previous_val - } - } -} - -use self::archicture_impl::AtomicU64Ersatz; - +/// Stamper provides Opstamps, which is just an auto-increment id to label +/// an operation. +/// +/// Cloning does not "fork" the stamp generation. The stamper actually wraps an `Arc`. #[derive(Clone, Default)] -pub struct Stamper(Arc); +pub struct Stamper(Arc); impl Stamper { - pub fn new(first_opstamp: u64) -> Stamper { - Stamper(Arc::new(AtomicU64Ersatz::new(first_opstamp))) + pub fn new(first_opstamp: Opstamp) -> Stamper { + Stamper(Arc::new(AtomicU64::new(first_opstamp))) } - pub fn stamp(&self) -> u64 { + pub fn stamp(&self) -> Opstamp { self.0.fetch_add(1u64, Ordering::SeqCst) as u64 } /// Given a desired count `n`, `stamps` returns an iterator that /// will supply `n` number of u64 stamps. - pub fn stamps(&self, n: u64) -> Range { + pub fn stamps(&self, n: u64) -> Range { let start = self.0.fetch_add(n, Ordering::SeqCst); Range { start, @@ -92,4 +49,5 @@ mod test { assert_eq!(stamper.stamps(3u64), (12..15)); assert_eq!(stamper.stamp(), 15u64); } + } diff --git a/src/lib.rs b/src/lib.rs index 4f0e50a0d..63c9f6929 100755 --- a/src/lib.rs +++ b/src/lib.rs @@ -254,6 +254,16 @@ pub mod merge_policy { /// as they are added in the segment. pub type DocId = u32; +/// A u64 assigned to every operation incrementally +/// +/// All operations modifying the index receives an monotonic Opstamp. +/// The resulting state of the index is consistent with the opstamp ordering. +/// +/// For instance, a commit with opstamp `32_423` will reflect all Add and Delete operations +/// with an opstamp `<= 32_423`. A delete operation with opstamp n will no affect a document added +/// with opstamp `n+1`. +pub type Opstamp = u64; + /// A f32 that represents the relevance of the document to the query /// /// This is modelled internally as a `f32`. The