mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-06 09:12:55 +00:00
Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
176f67a266 | ||
|
|
19babff849 | ||
|
|
bf2576adf9 |
@@ -150,7 +150,7 @@ impl Index {
|
|||||||
///
|
///
|
||||||
/// This will overwrite existing meta.json
|
/// This will overwrite existing meta.json
|
||||||
fn from_directory(mut directory: ManagedDirectory, schema: Schema) -> Result<Index> {
|
fn from_directory(mut directory: ManagedDirectory, schema: Schema) -> Result<Index> {
|
||||||
save_new_metas(schema.clone(), 0, directory.borrow_mut())?;
|
save_new_metas(schema.clone(), directory.borrow_mut())?;
|
||||||
let metas = IndexMeta::with_schema(schema);
|
let metas = IndexMeta::with_schema(schema);
|
||||||
Index::create_from_metas(directory, &metas)
|
Index::create_from_metas(directory, &metas)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -558,11 +558,8 @@ impl IndexWriter {
|
|||||||
// and recreate a new one channels.
|
// and recreate a new one channels.
|
||||||
self.recreate_document_channel();
|
self.recreate_document_channel();
|
||||||
|
|
||||||
let mut former_workers_join_handle = Vec::new();
|
let former_workers_join_handle =
|
||||||
swap(
|
mem::replace(&mut self.workers_join_handle, Vec::new());
|
||||||
&mut former_workers_join_handle,
|
|
||||||
&mut self.workers_join_handle,
|
|
||||||
);
|
|
||||||
|
|
||||||
for worker_handle in former_workers_join_handle {
|
for worker_handle in former_workers_join_handle {
|
||||||
let indexing_worker_result = worker_handle
|
let indexing_worker_result = worker_handle
|
||||||
@@ -739,7 +736,7 @@ mod tests {
|
|||||||
index_writer.add_document(doc!(text_field=>"b"));
|
index_writer.add_document(doc!(text_field=>"b"));
|
||||||
index_writer.add_document(doc!(text_field=>"c"));
|
index_writer.add_document(doc!(text_field=>"c"));
|
||||||
}
|
}
|
||||||
assert_eq!(index_writer.commit().unwrap(), 2u64);
|
assert_eq!(index_writer.commit().unwrap(), 3u64);
|
||||||
index.load_searchers().unwrap();
|
index.load_searchers().unwrap();
|
||||||
assert_eq!(num_docs_containing("a"), 0);
|
assert_eq!(num_docs_containing("a"), 0);
|
||||||
assert_eq!(num_docs_containing("b"), 1);
|
assert_eq!(num_docs_containing("b"), 1);
|
||||||
@@ -802,7 +799,6 @@ mod tests {
|
|||||||
{
|
{
|
||||||
let mut prepared_commit = index_writer.prepare_commit().expect("commit failed");
|
let mut prepared_commit = index_writer.prepare_commit().expect("commit failed");
|
||||||
prepared_commit.set_payload("first commit");
|
prepared_commit.set_payload("first commit");
|
||||||
assert_eq!(prepared_commit.opstamp(), 100);
|
|
||||||
prepared_commit.commit().expect("commit failed");
|
prepared_commit.commit().expect("commit failed");
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
@@ -836,7 +832,6 @@ mod tests {
|
|||||||
{
|
{
|
||||||
let mut prepared_commit = index_writer.prepare_commit().expect("commit failed");
|
let mut prepared_commit = index_writer.prepare_commit().expect("commit failed");
|
||||||
prepared_commit.set_payload("first commit");
|
prepared_commit.set_payload("first commit");
|
||||||
assert_eq!(prepared_commit.opstamp(), 100);
|
|
||||||
prepared_commit.abort().expect("commit failed");
|
prepared_commit.abort().expect("commit failed");
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -654,6 +654,7 @@ mod tests {
|
|||||||
use schema::IntOptions;
|
use schema::IntOptions;
|
||||||
use schema::Term;
|
use schema::Term;
|
||||||
use schema::TextFieldIndexing;
|
use schema::TextFieldIndexing;
|
||||||
|
use schema::INT_INDEXED;
|
||||||
use std::io::Cursor;
|
use std::io::Cursor;
|
||||||
use DocAddress;
|
use DocAddress;
|
||||||
use IndexWriter;
|
use IndexWriter;
|
||||||
@@ -983,7 +984,7 @@ mod tests {
|
|||||||
.wait()
|
.wait()
|
||||||
.expect("Merging failed");
|
.expect("Merging failed");
|
||||||
index.load_searchers().unwrap();
|
index.load_searchers().unwrap();
|
||||||
let ref searcher = *index.searcher();
|
let searcher = index.searcher();
|
||||||
assert_eq!(searcher.segment_readers().len(), 1);
|
assert_eq!(searcher.segment_readers().len(), 1);
|
||||||
assert_eq!(searcher.num_docs(), 3);
|
assert_eq!(searcher.num_docs(), 3);
|
||||||
assert_eq!(searcher.segment_readers()[0].num_docs(), 3);
|
assert_eq!(searcher.segment_readers()[0].num_docs(), 3);
|
||||||
@@ -1029,7 +1030,7 @@ mod tests {
|
|||||||
index_writer.commit().unwrap();
|
index_writer.commit().unwrap();
|
||||||
|
|
||||||
index.load_searchers().unwrap();
|
index.load_searchers().unwrap();
|
||||||
let ref searcher = *index.searcher();
|
let searcher = index.searcher();
|
||||||
assert_eq!(searcher.segment_readers().len(), 1);
|
assert_eq!(searcher.segment_readers().len(), 1);
|
||||||
assert_eq!(searcher.num_docs(), 2);
|
assert_eq!(searcher.num_docs(), 2);
|
||||||
assert_eq!(searcher.segment_readers()[0].num_docs(), 2);
|
assert_eq!(searcher.segment_readers()[0].num_docs(), 2);
|
||||||
@@ -1125,6 +1126,7 @@ mod tests {
|
|||||||
{
|
{
|
||||||
// Test removing all docs
|
// Test removing all docs
|
||||||
index_writer.delete_term(Term::from_field_text(text_field, "g"));
|
index_writer.delete_term(Term::from_field_text(text_field, "g"));
|
||||||
|
index_writer.commit().unwrap();
|
||||||
let segment_ids = index
|
let segment_ids = index
|
||||||
.searchable_segment_ids()
|
.searchable_segment_ids()
|
||||||
.expect("Searchable segments failed.");
|
.expect("Searchable segments failed.");
|
||||||
@@ -1255,6 +1257,34 @@ mod tests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_bug_merge() {
|
||||||
|
let mut schema_builder = schema::Schema::builder();
|
||||||
|
let int_field = schema_builder.add_u64_field("intvals", INT_INDEXED);
|
||||||
|
let index = Index::create_in_ram(schema_builder.build());
|
||||||
|
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||||
|
index_writer.add_document(doc!(int_field => 1u64));
|
||||||
|
index_writer.commit().expect("commit failed");
|
||||||
|
index_writer.add_document(doc!(int_field => 1u64));
|
||||||
|
index_writer.commit().expect("commit failed");
|
||||||
|
index.load_searchers().unwrap();
|
||||||
|
let searcher = index.searcher();
|
||||||
|
assert_eq!(searcher.num_docs(), 2);
|
||||||
|
index_writer.delete_term(Term::from_field_u64(int_field, 1));
|
||||||
|
let segment_ids = index
|
||||||
|
.searchable_segment_ids()
|
||||||
|
.expect("Searchable segments failed.");
|
||||||
|
index_writer
|
||||||
|
.merge(&segment_ids)
|
||||||
|
.expect("Failed to initiate merge")
|
||||||
|
.wait()
|
||||||
|
.expect("Merging failed");
|
||||||
|
index.load_searchers().unwrap();
|
||||||
|
// commit has not been called yet. The document should still be
|
||||||
|
// there.
|
||||||
|
assert_eq!(index.searcher().num_docs(), 2);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_merge_multivalued_int_fields_all_deleted() {
|
fn test_merge_multivalued_int_fields_all_deleted() {
|
||||||
let mut schema_builder = schema::Schema::builder();
|
let mut schema_builder = schema::Schema::builder();
|
||||||
|
|||||||
@@ -18,7 +18,6 @@ use indexer::delete_queue::DeleteCursor;
|
|||||||
use indexer::index_writer::advance_deletes;
|
use indexer::index_writer::advance_deletes;
|
||||||
use indexer::merger::IndexMerger;
|
use indexer::merger::IndexMerger;
|
||||||
use indexer::stamper::Stamper;
|
use indexer::stamper::Stamper;
|
||||||
use indexer::MergeCandidate;
|
|
||||||
use indexer::SegmentEntry;
|
use indexer::SegmentEntry;
|
||||||
use indexer::SegmentSerializer;
|
use indexer::SegmentSerializer;
|
||||||
use indexer::{DefaultMergePolicy, MergePolicy};
|
use indexer::{DefaultMergePolicy, MergePolicy};
|
||||||
@@ -45,8 +44,15 @@ use Result;
|
|||||||
/// and flushed.
|
/// and flushed.
|
||||||
///
|
///
|
||||||
/// This method is not part of tantivy's public API
|
/// This method is not part of tantivy's public API
|
||||||
pub fn save_new_metas(schema: Schema, opstamp: u64, directory: &mut Directory) -> Result<()> {
|
pub fn save_new_metas(schema: Schema, directory: &mut Directory) -> Result<()> {
|
||||||
save_metas(vec![], schema, opstamp, None, directory)
|
save_metas(
|
||||||
|
&IndexMeta {
|
||||||
|
segments: Vec::new(),
|
||||||
|
schema,
|
||||||
|
opstamp: 0u64,
|
||||||
|
payload: None
|
||||||
|
},
|
||||||
|
directory)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Save the index meta file.
|
/// Save the index meta file.
|
||||||
@@ -58,20 +64,17 @@ pub fn save_new_metas(schema: Schema, opstamp: u64, directory: &mut Directory) -
|
|||||||
/// and flushed.
|
/// and flushed.
|
||||||
///
|
///
|
||||||
/// This method is not part of tantivy's public API
|
/// This method is not part of tantivy's public API
|
||||||
pub fn save_metas(
|
fn save_metas(
|
||||||
segment_metas: Vec<SegmentMeta>,
|
metas: &IndexMeta,
|
||||||
schema: Schema,
|
|
||||||
opstamp: u64,
|
|
||||||
payload: Option<String>,
|
|
||||||
directory: &mut Directory,
|
directory: &mut Directory,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let metas = IndexMeta {
|
// let metas = IndexMeta {
|
||||||
segments: segment_metas,
|
// segments: segment_metas,
|
||||||
schema,
|
// schema,
|
||||||
opstamp,
|
// opstamp,
|
||||||
payload,
|
// payload,
|
||||||
};
|
// };
|
||||||
let mut buffer = serde_json::to_vec_pretty(&metas)?;
|
let mut buffer = serde_json::to_vec_pretty(metas)?;
|
||||||
writeln!(&mut buffer)?;
|
writeln!(&mut buffer)?;
|
||||||
directory.atomic_write(&META_FILEPATH, &buffer[..])?;
|
directory.atomic_write(&META_FILEPATH, &buffer[..])?;
|
||||||
debug!("Saved metas {:?}", serde_json::to_string_pretty(&metas));
|
debug!("Saved metas {:?}", serde_json::to_string_pretty(&metas));
|
||||||
@@ -86,6 +89,11 @@ pub fn save_metas(
|
|||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct SegmentUpdater(Arc<InnerSegmentUpdater>);
|
pub struct SegmentUpdater(Arc<InnerSegmentUpdater>);
|
||||||
|
|
||||||
|
struct MergeOperation {
|
||||||
|
pub target_opstamp: u64,
|
||||||
|
pub segment_ids: Vec<SegmentId>,
|
||||||
|
}
|
||||||
|
|
||||||
fn perform_merge(
|
fn perform_merge(
|
||||||
index: &Index,
|
index: &Index,
|
||||||
mut segment_entries: Vec<SegmentEntry>,
|
mut segment_entries: Vec<SegmentEntry>,
|
||||||
@@ -126,6 +134,13 @@ fn perform_merge(
|
|||||||
}
|
}
|
||||||
|
|
||||||
struct InnerSegmentUpdater {
|
struct InnerSegmentUpdater {
|
||||||
|
// we keep a copy of the current active IndexMeta to
|
||||||
|
// avoid loading the file everytime we need it in the
|
||||||
|
// `SegmentUpdater`.
|
||||||
|
//
|
||||||
|
// This should be up to date as all update happen through
|
||||||
|
// the unique active `SegmentUpdater`.
|
||||||
|
active_metas: RwLock<Arc<IndexMeta>>,
|
||||||
pool: CpuPool,
|
pool: CpuPool,
|
||||||
index: Index,
|
index: Index,
|
||||||
segment_manager: SegmentManager,
|
segment_manager: SegmentManager,
|
||||||
@@ -149,7 +164,9 @@ impl SegmentUpdater {
|
|||||||
.name_prefix("segment_updater")
|
.name_prefix("segment_updater")
|
||||||
.pool_size(1)
|
.pool_size(1)
|
||||||
.create();
|
.create();
|
||||||
|
let index_meta = index.load_metas()?;
|
||||||
Ok(SegmentUpdater(Arc::new(InnerSegmentUpdater {
|
Ok(SegmentUpdater(Arc::new(InnerSegmentUpdater {
|
||||||
|
active_metas: RwLock::new(Arc::new(index_meta)),
|
||||||
pool,
|
pool,
|
||||||
index,
|
index,
|
||||||
segment_manager,
|
segment_manager,
|
||||||
@@ -244,14 +261,18 @@ impl SegmentUpdater {
|
|||||||
//
|
//
|
||||||
// Segment 1 from disk 1, Segment 1 from disk 2, etc.
|
// Segment 1 from disk 1, Segment 1 from disk 2, etc.
|
||||||
commited_segment_metas.sort_by_key(|segment_meta| -(segment_meta.max_doc() as i32));
|
commited_segment_metas.sort_by_key(|segment_meta| -(segment_meta.max_doc() as i32));
|
||||||
save_metas(
|
let index_meta = IndexMeta {
|
||||||
commited_segment_metas,
|
segments: commited_segment_metas,
|
||||||
index.schema(),
|
schema: index.schema(),
|
||||||
opstamp,
|
opstamp,
|
||||||
commit_message,
|
payload: commit_message
|
||||||
|
};
|
||||||
|
save_metas(
|
||||||
|
&index_meta,
|
||||||
directory.box_clone().borrow_mut(),
|
directory.box_clone().borrow_mut(),
|
||||||
)
|
)
|
||||||
.expect("Could not save metas.");
|
.expect("Could not save metas.");
|
||||||
|
self.store_meta(&index_meta);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -286,16 +307,27 @@ impl SegmentUpdater {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn start_merge(&self, segment_ids: &[SegmentId]) -> Result<Receiver<SegmentMeta>> {
|
pub fn start_merge(&self, segment_ids: &[SegmentId]) -> Result<Receiver<SegmentMeta>> {
|
||||||
//let future_merged_segment = */
|
|
||||||
let segment_ids_vec = segment_ids.to_vec();
|
let segment_ids_vec = segment_ids.to_vec();
|
||||||
|
let commit_opstamp = self.load_metas().opstamp;
|
||||||
self.run_async(move |segment_updater| {
|
self.run_async(move |segment_updater| {
|
||||||
segment_updater.start_merge_impl(&segment_ids_vec[..])
|
segment_updater.start_merge_impl(&segment_ids_vec[..], commit_opstamp)
|
||||||
})
|
})
|
||||||
.wait()?
|
.wait()?
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn store_meta(&self, index_meta: &IndexMeta) {
|
||||||
|
*self.0.active_metas.write().unwrap() = Arc::new(index_meta.clone());
|
||||||
|
}
|
||||||
|
fn load_metas(&self) -> Arc<IndexMeta> {
|
||||||
|
self.0.active_metas.read().unwrap().clone()
|
||||||
|
}
|
||||||
|
|
||||||
// `segment_ids` is required to be non-empty.
|
// `segment_ids` is required to be non-empty.
|
||||||
fn start_merge_impl(&self, segment_ids: &[SegmentId]) -> Result<Receiver<SegmentMeta>> {
|
fn start_merge_impl(
|
||||||
|
&self,
|
||||||
|
segment_ids: &[SegmentId],
|
||||||
|
target_opstamp: u64,
|
||||||
|
) -> Result<Receiver<SegmentMeta>> {
|
||||||
assert!(!segment_ids.is_empty(), "Segment_ids cannot be empty.");
|
assert!(!segment_ids.is_empty(), "Segment_ids cannot be empty.");
|
||||||
|
|
||||||
let segment_updater_clone = self.clone();
|
let segment_updater_clone = self.clone();
|
||||||
@@ -310,8 +342,6 @@ impl SegmentUpdater {
|
|||||||
);
|
);
|
||||||
let (merging_future_send, merging_future_recv) = oneshot();
|
let (merging_future_send, merging_future_recv) = oneshot();
|
||||||
|
|
||||||
let target_opstamp = self.0.stamper.stamp();
|
|
||||||
|
|
||||||
// first we need to apply deletes to our segment.
|
// first we need to apply deletes to our segment.
|
||||||
let merging_join_handle = thread::Builder::new()
|
let merging_join_handle = thread::Builder::new()
|
||||||
.name(format!("mergingthread-{}", merging_thread_id))
|
.name(format!("mergingthread-{}", merging_thread_id))
|
||||||
@@ -373,11 +403,32 @@ impl SegmentUpdater {
|
|||||||
// Committed segments cannot be merged with uncommitted_segments.
|
// Committed segments cannot be merged with uncommitted_segments.
|
||||||
// We therefore consider merges using these two sets of segments independently.
|
// We therefore consider merges using these two sets of segments independently.
|
||||||
let merge_policy = self.get_merge_policy();
|
let merge_policy = self.get_merge_policy();
|
||||||
let mut merge_candidates = merge_policy.compute_merge_candidates(&uncommitted_segments);
|
|
||||||
let committed_merge_candidates = merge_policy.compute_merge_candidates(&committed_segments);
|
let current_opstamp = self.0.stamper.stamp();
|
||||||
merge_candidates.extend_from_slice(&committed_merge_candidates[..]);
|
let mut merge_candidates = merge_policy
|
||||||
for MergeCandidate(segment_metas) in merge_candidates {
|
.compute_merge_candidates(&uncommitted_segments)
|
||||||
match self.start_merge_impl(&segment_metas) {
|
.into_iter()
|
||||||
|
.map(|merge_candidate| MergeOperation {
|
||||||
|
target_opstamp: current_opstamp,
|
||||||
|
segment_ids: merge_candidate.0,
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
let commit_opstamp = self.load_metas().opstamp;
|
||||||
|
let committed_merge_candidates = merge_policy
|
||||||
|
.compute_merge_candidates(&committed_segments)
|
||||||
|
.into_iter()
|
||||||
|
.map(|merge_candidate| MergeOperation {
|
||||||
|
target_opstamp: commit_opstamp,
|
||||||
|
segment_ids: merge_candidate.0,
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
merge_candidates.extend(committed_merge_candidates.into_iter());
|
||||||
|
for MergeOperation {
|
||||||
|
target_opstamp,
|
||||||
|
segment_ids,
|
||||||
|
} in merge_candidates
|
||||||
|
{
|
||||||
|
match self.start_merge_impl(&segment_ids, target_opstamp) {
|
||||||
Ok(merge_future) => {
|
Ok(merge_future) => {
|
||||||
if let Err(e) = merge_future.fuse().poll() {
|
if let Err(e) = merge_future.fuse().poll() {
|
||||||
error!("The merge task failed quickly after starting: {:?}", e);
|
error!("The merge task failed quickly after starting: {:?}", e);
|
||||||
@@ -412,12 +463,7 @@ impl SegmentUpdater {
|
|||||||
info!("End merge {:?}", after_merge_segment_entry.meta());
|
info!("End merge {:?}", after_merge_segment_entry.meta());
|
||||||
let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone();
|
let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone();
|
||||||
if let Some(delete_operation) = delete_cursor.get() {
|
if let Some(delete_operation) = delete_cursor.get() {
|
||||||
let committed_opstamp = segment_updater
|
let committed_opstamp = segment_updater.load_metas().opstamp;
|
||||||
.0
|
|
||||||
.index
|
|
||||||
.load_metas()
|
|
||||||
.expect("Failed to read opstamp")
|
|
||||||
.opstamp;
|
|
||||||
if delete_operation.opstamp < committed_opstamp {
|
if delete_operation.opstamp < committed_opstamp {
|
||||||
let index = &segment_updater.0.index;
|
let index = &segment_updater.0.index;
|
||||||
let segment = index.segment(after_merge_segment_entry.meta().clone());
|
let segment = index.segment(after_merge_segment_entry.meta().clone());
|
||||||
@@ -446,8 +492,8 @@ impl SegmentUpdater {
|
|||||||
.end_merge(&before_merge_segment_ids, after_merge_segment_entry);
|
.end_merge(&before_merge_segment_ids, after_merge_segment_entry);
|
||||||
segment_updater.consider_merge_options();
|
segment_updater.consider_merge_options();
|
||||||
info!("save metas");
|
info!("save metas");
|
||||||
let previous_metas = segment_updater.0.index.load_metas().unwrap();
|
let previous_metas = segment_updater.load_metas();
|
||||||
segment_updater.save_metas(previous_metas.opstamp, previous_metas.payload);
|
segment_updater.save_metas(previous_metas.opstamp, previous_metas.payload.clone());
|
||||||
segment_updater.garbage_collect_files_exec();
|
segment_updater.garbage_collect_files_exec();
|
||||||
})
|
})
|
||||||
.wait()
|
.wait()
|
||||||
|
|||||||
@@ -1,50 +1,68 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
use std::sync::atomic::Ordering;
|
||||||
|
|
||||||
|
|
||||||
// AtomicU64 have not landed in stable.
|
// AtomicU64 have not landed in stable.
|
||||||
// For the moment let's just use AtomicUsize on
|
// For the moment let's just use AtomicUsize on
|
||||||
// x86/64 bit platform, and a mutex on other platform.
|
// x86/64 bit platform, and a mutex on other platform.
|
||||||
|
#[cfg(target_arch = "x86_64")]
|
||||||
#[cfg(target = "x86_64")]
|
|
||||||
mod archicture_impl {
|
mod archicture_impl {
|
||||||
|
|
||||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
#[derive(Clone, Default)]
|
#[derive(Default)]
|
||||||
pub struct Stamper(Arc<AtomicU64>);
|
pub struct AtomicU64Ersatz(AtomicUsize);
|
||||||
|
|
||||||
impl Stamper {
|
impl AtomicU64Ersatz {
|
||||||
pub fn new(first_opstamp: u64) -> Stamper {
|
pub fn new(first_opstamp: u64) -> AtomicU64Ersatz {
|
||||||
Stamper(Arc::new(AtomicU64::new(first_opstamp)))
|
AtomicU64Ersatz(AtomicUsize::new(first_opstamp as usize))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn stamp(&self) -> u64 {
|
pub fn fetch_add(&self, val: u64, order: Ordering) -> u64 {
|
||||||
self.0.fetch_add(1u64, Ordering::SeqCst) as u64
|
self.0.fetch_add(val as usize, order) as u64
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(not(target = "x86_64"))]
|
#[cfg(not(target_arch = "x86_64"))]
|
||||||
mod archicture_impl {
|
mod archicture_impl {
|
||||||
|
|
||||||
use std::sync::{Arc, Mutex};
|
/// Under other architecture, we rely on a mutex.
|
||||||
|
use std::sync::Mutex;
|
||||||
|
use std::sync::atomic::Ordering;
|
||||||
|
|
||||||
#[derive(Clone, Default)]
|
#[derive(Default)]
|
||||||
pub struct Stamper(Arc<Mutex<u64>>);
|
pub struct AtomicU64Ersatz(Mutex<u64>);
|
||||||
|
|
||||||
impl Stamper {
|
impl AtomicU64Ersatz {
|
||||||
pub fn new(first_opstamp: u64) -> Stamper {
|
pub fn new(first_opstamp: u64) -> AtomicU64Ersatz {
|
||||||
Stamper(Arc::new(Mutex::new(first_opstamp)))
|
AtomicU64Ersatz(AtomicUsize::new(first_opstamp))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn stamp(&self) -> u64 {
|
pub fn fetch_add(&self, val: u64, _order: Ordering) -> u64 {
|
||||||
let mut guard = self.0.lock().expect("Failed to lock the stamper");
|
let lock = self.0.lock().unwrap();
|
||||||
let previous_val = *guard;
|
let previous_val = *lock;
|
||||||
*guard = previous_val + 1;
|
*lock = previous_val + 1;
|
||||||
previous_val
|
previous_val
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub use self::archicture_impl::Stamper;
|
use self::archicture_impl::AtomicU64Ersatz;
|
||||||
|
|
||||||
|
#[derive(Clone, Default)]
|
||||||
|
pub struct Stamper(Arc<AtomicU64Ersatz>);
|
||||||
|
|
||||||
|
impl Stamper {
|
||||||
|
pub fn new(first_opstamp: u64) -> Stamper {
|
||||||
|
Stamper(Arc::new(AtomicU64Ersatz::new(first_opstamp)))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn stamp(&self) -> u64 {
|
||||||
|
self.0.fetch_add(1u64, Ordering::SeqCst) as u64
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
|
|||||||
Reference in New Issue
Block a user