diff --git a/src/directory/file_slice.rs b/src/directory/file_slice.rs index d3be3988e..cc2b97aa6 100644 --- a/src/directory/file_slice.rs +++ b/src/directory/file_slice.rs @@ -2,10 +2,11 @@ use stable_deref_trait::StableDeref; use crate::common::HasLen; use crate::directory::OwnedBytes; -use std::sync::Arc; +use std::sync::{Arc, Weak}; use std::{io, ops::Deref}; -pub type BoxedData = Box + Send + Sync + 'static>; +pub type ArcBytes = Arc + Send + Sync + 'static>; +pub type WeakArcBytes = Weak + Send + Sync + 'static>; /// Objects that represents files sections in tantivy. /// diff --git a/src/directory/mmap_directory.rs b/src/directory/mmap_directory.rs index 3b270009f..122a366d8 100644 --- a/src/directory/mmap_directory.rs +++ b/src/directory/mmap_directory.rs @@ -2,13 +2,13 @@ use crate::core::META_FILEPATH; use crate::directory::error::LockError; use crate::directory::error::{DeleteError, OpenDirectoryError, OpenReadError, OpenWriteError}; use crate::directory::file_watcher::FileWatcher; -use crate::directory::BoxedData; use crate::directory::Directory; use crate::directory::DirectoryLock; use crate::directory::Lock; use crate::directory::WatchCallback; use crate::directory::WatchHandle; use crate::directory::{AntiCallToken, FileHandle, OwnedBytes}; +use crate::directory::{ArcBytes, WeakArcBytes}; use crate::directory::{TerminatingWrite, WritePtr}; use fs2::FileExt; use memmap::Mmap; @@ -24,7 +24,6 @@ use std::path::{Path, PathBuf}; use std::result; use std::sync::Arc; use std::sync::RwLock; -use std::sync::Weak; use std::{collections::HashMap, ops::Deref}; use tempfile::TempDir; @@ -77,7 +76,7 @@ pub struct CacheInfo { struct MmapCache { counters: CacheCounters, - cache: HashMap>, + cache: HashMap, } impl Default for MmapCache { @@ -111,7 +110,7 @@ impl MmapCache { } // Returns None if the file exists but as a len of 0 (and hence is not mmappable). - fn get_mmap(&mut self, full_path: &Path) -> Result>, OpenReadError> { + fn get_mmap(&mut self, full_path: &Path) -> Result, OpenReadError> { if let Some(mmap_weak) = self.cache.get(full_path) { if let Some(mmap_arc) = mmap_weak.upgrade() { self.counters.hit += 1; @@ -122,7 +121,7 @@ impl MmapCache { self.counters.miss += 1; let mmap_opt = open_mmap(full_path)?; Ok(mmap_opt.map(|mmap| { - let mmap_arc: Arc = Arc::new(Box::new(mmap)); + let mmap_arc: ArcBytes = Arc::new(mmap); let mmap_weak = Arc::downgrade(&mmap_arc); self.cache.insert(full_path.to_owned(), mmap_weak); mmap_arc @@ -315,7 +314,7 @@ impl TerminatingWrite for SafeFileWriter { } #[derive(Clone)] -struct MmapArc(Arc + Send + Sync>>); +struct MmapArc(Arc + Send + Sync>); impl Deref for MmapArc { type Target = [u8]; diff --git a/src/directory/mod.rs b/src/directory/mod.rs index ae65833d0..8bd2c3185 100644 --- a/src/directory/mod.rs +++ b/src/directory/mod.rs @@ -23,7 +23,7 @@ pub mod error; pub use self::directory::DirectoryLock; pub use self::directory::{Directory, DirectoryClone}; pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK}; -pub(crate) use self::file_slice::BoxedData; +pub(crate) use self::file_slice::{ArcBytes, WeakArcBytes}; pub use self::file_slice::{FileHandle, FileSlice}; pub use self::owned_bytes::OwnedBytes; pub use self::ram_directory::RAMDirectory; diff --git a/src/directory/owned_bytes.rs b/src/directory/owned_bytes.rs index 8a2f4add4..73303f50c 100644 --- a/src/directory/owned_bytes.rs +++ b/src/directory/owned_bytes.rs @@ -99,7 +99,7 @@ impl OwnedBytes { /// Reads an `u8` from the `OwnedBytes` and advance by one byte. pub fn read_u8(&mut self) -> u8 { - assert!(self.len() > 0); + assert!(!self.is_empty()); let byte = self.as_slice()[0]; self.advance(1); diff --git a/src/directory/watch_event_router.rs b/src/directory/watch_event_router.rs index d0523d66c..c42d03be3 100644 --- a/src/directory/watch_event_router.rs +++ b/src/directory/watch_event_router.rs @@ -6,12 +6,12 @@ use std::sync::Weak; /// Cloneable wrapper for callbacks registered when watching files of a `Directory`. #[derive(Clone)] -pub struct WatchCallback(Arc>); +pub struct WatchCallback(Arc); impl WatchCallback { /// Wraps a `Fn()` to create a WatchCallback. pub fn new(op: F) -> Self { - WatchCallback(Arc::new(Box::new(op))) + WatchCallback(Arc::new(op)) } fn call(&self) { diff --git a/src/indexer/delete_queue.rs b/src/indexer/delete_queue.rs index c2b3e21c9..ba445dd3b 100644 --- a/src/indexer/delete_queue.rs +++ b/src/indexer/delete_queue.rs @@ -53,7 +53,7 @@ impl DeleteQueue { return block; } let block = Arc::new(Block { - operations: Arc::default(), + operations: Arc::new([]), next: NextBlock::from(self.clone()), }); wlock.last_block = Arc::downgrade(&block); @@ -108,7 +108,7 @@ impl DeleteQueue { let delete_operations = mem::replace(&mut self_wlock.writer, vec![]); let new_block = Arc::new(Block { - operations: Arc::new(delete_operations.into_boxed_slice()), + operations: Arc::from(delete_operations.into_boxed_slice()), next: NextBlock::from(self.clone()), }); @@ -167,7 +167,7 @@ impl NextBlock { } struct Block { - operations: Arc>, + operations: Arc<[DeleteOperation]>, next: NextBlock, } diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 0ad6a9034..2021c4882 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -449,7 +449,7 @@ impl IndexWriter { } /// Accessor to the merge policy. - pub fn get_merge_policy(&self) -> Arc> { + pub fn get_merge_policy(&self) -> Arc { self.segment_updater.get_merge_policy() } diff --git a/src/indexer/operation.rs b/src/indexer/operation.rs index d59d7781e..e6dc33f10 100644 --- a/src/indexer/operation.rs +++ b/src/indexer/operation.rs @@ -9,6 +9,15 @@ pub struct DeleteOperation { pub term: Term, } +impl Default for DeleteOperation { + fn default() -> Self { + DeleteOperation { + opstamp: 0u64, + term: Term::new(), + } + } +} + /// Timestamped Add operation. #[derive(Eq, PartialEq, Debug)] pub struct AddOperation { diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index a346e8fc4..d0cb240bc 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -154,7 +154,7 @@ pub(crate) struct InnerSegmentUpdater { index: Index, segment_manager: SegmentManager, - merge_policy: RwLock>>, + merge_policy: RwLock>, killed: AtomicBool, stamper: Stamper, merge_operations: MergeOperationInventory, @@ -193,19 +193,19 @@ impl SegmentUpdater { merge_thread_pool, index, segment_manager, - merge_policy: RwLock::new(Arc::new(Box::new(DefaultMergePolicy::default()))), + merge_policy: RwLock::new(Arc::new(DefaultMergePolicy::default())), killed: AtomicBool::new(false), stamper, merge_operations: Default::default(), }))) } - pub fn get_merge_policy(&self) -> Arc> { + pub fn get_merge_policy(&self) -> Arc { self.merge_policy.read().unwrap().clone() } pub fn set_merge_policy(&self, merge_policy: Box) { - let arc_merge_policy = Arc::new(merge_policy); + let arc_merge_policy = Arc::from(merge_policy); *self.merge_policy.write().unwrap() = arc_merge_policy; }