From 7b97dde3356dc1ac61887ef64b0d53e296d94e57 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Mon, 28 Jan 2019 12:37:55 +0100 Subject: [PATCH] Clippy + cargo fmt --- src/core/index.rs | 25 +++++++++++------ src/directory/directory.rs | 24 ++++++++-------- src/directory/directory_lock.rs | 8 ++---- src/directory/error.rs | 6 ++-- src/directory/managed_directory.rs | 6 ++-- src/directory/mmap_directory.rs | 20 ++++++------- src/directory/mod.rs | 6 ++-- src/directory/tests.rs | 45 ++++++++++++++++++++++-------- src/error.rs | 4 +-- src/indexer/index_writer.rs | 11 +++++--- src/indexer/segment_register.rs | 4 +-- src/lib.rs | 2 +- 12 files changed, 93 insertions(+), 68 deletions(-) diff --git a/src/core/index.rs b/src/core/index.rs index 9ff9b50f3..da3e9f22f 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -12,6 +12,8 @@ use core::META_FILEPATH; use directory::ManagedDirectory; #[cfg(feature = "mmap")] use directory::MmapDirectory; +use directory::INDEX_WRITER_LOCK; +use directory::META_LOCK; use directory::{Directory, RAMDirectory}; use error::DataCorruption; use error::TantivyError; @@ -32,8 +34,6 @@ use tokenizer::BoxedTokenizer; use tokenizer::TokenizerManager; use IndexWriter; use Result; -use directory::INDEX_WRITER_LOCK; -use directory::META_LOCK; fn load_metas(directory: &Directory) -> Result { let meta_data = directory.atomic_read(&META_FILEPATH)?; @@ -242,14 +242,21 @@ impl Index { num_threads: usize, overall_heap_size_in_bytes: usize, ) -> Result { - let directory_lock = self.directory.acquire_lock(&INDEX_WRITER_LOCK) + let directory_lock = self + .directory + .acquire_lock(&INDEX_WRITER_LOCK) .map_err(|err| { - TantivyError::LockFailure(err, - Some("Failed to acquire index lock. If you are using\ - a regular directory, this means there is already an \ - `IndexWriter` working on this `Directory`, in this process \ - or in a different process.".to_string()))} - )?; + TantivyError::LockFailure( + err, + Some( + "Failed to acquire index lock. If you are using\ + a regular directory, this means there is already an \ + `IndexWriter` working on this `Directory`, in this process \ + or in a different process." + .to_string(), + ), + ) + })?; let heap_size_in_bytes_per_thread = overall_heap_size_in_bytes / num_threads; open_index_writer( self, diff --git a/src/directory/directory.rs b/src/directory/directory.rs index 3dda08123..c5f975c9d 100644 --- a/src/directory/directory.rs +++ b/src/directory/directory.rs @@ -1,3 +1,5 @@ +use directory::directory_lock::Lock; +use directory::error::LockError; use directory::error::{DeleteError, OpenReadError, OpenWriteError}; use directory::{ReadOnlySource, WritePtr}; use std::fmt; @@ -6,13 +8,10 @@ use std::io::Write; use std::marker::Send; use std::marker::Sync; use std::path::Path; -use std::result; -use directory::error::LockError; -use std::time::Duration; -use std::thread; use std::path::PathBuf; -use directory::directory_lock::Lock; - +use std::result; +use std::thread; +use std::time::Duration; /// Retry the logic of acquiring locks is pretty simple. /// We just retry `n` times after a given `duratio`, both @@ -51,7 +50,7 @@ pub struct DirectoryLock(Box); struct DirectoryLockGuard { directory: Box, - path: PathBuf + path: PathBuf, } impl From> for DirectoryLock { @@ -73,21 +72,21 @@ enum TryAcquireLockError { IOError(io::Error), } -fn try_acquire_lock(filepath: &Path, directory: &mut Directory) -> Result { +fn try_acquire_lock( + filepath: &Path, + directory: &mut Directory, +) -> Result { let mut write = directory.open_write(filepath).map_err(|e| match e { OpenWriteError::FileAlreadyExists(_) => TryAcquireLockError::FileExists, OpenWriteError::IOError(io_error) => TryAcquireLockError::IOError(io_error.into()), })?; - write - .flush() - .map_err(TryAcquireLockError::IOError)?; + write.flush().map_err(TryAcquireLockError::IOError)?; Ok(DirectoryLock::from(Box::new(DirectoryLockGuard { directory: directory.box_clone(), path: filepath.to_owned(), }))) } - fn retry_policy(is_blocking: bool) -> RetryPolicy { if is_blocking { RetryPolicy { @@ -166,7 +165,6 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static { /// The file may or may not previously exist. fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()>; - /// Acquire a lock in the given directory. /// /// The method is blocking or not depending on the `Lock` object. diff --git a/src/directory/directory_lock.rs b/src/directory/directory_lock.rs index afb79199a..f341ea6f9 100644 --- a/src/directory/directory_lock.rs +++ b/src/directory/directory_lock.rs @@ -13,9 +13,9 @@ use std::path::PathBuf; /// #[derive(Debug)] pub struct Lock { - /// The lock needs to be associated with its own file `path`. - /// Depending on the platform, the lock might rely on the creation - /// and deletion of this filepath. + /// The lock needs to be associated with its own file `path`. + /// Depending on the platform, the lock might rely on the creation + /// and deletion of this filepath. pub filepath: PathBuf, /// `lock_params` describes whether acquiring the lock is meant /// to be a blocking operation or a non-blocking. @@ -28,7 +28,6 @@ pub struct Lock { pub is_blocking: bool, } - lazy_static! { /// Only one process should be able to write tantivy's index at a time. /// This lock file, when present, is in charge of preventing other processes to open an IndexWriter. @@ -55,4 +54,3 @@ lazy_static! { is_blocking: true }; } - diff --git a/src/directory/error.rs b/src/directory/error.rs index 6997c6eee..1179ad28d 100644 --- a/src/directory/error.rs +++ b/src/directory/error.rs @@ -10,11 +10,13 @@ pub enum LockError { /// client. /// - In the context of a blocking lock, this means the lock was not released within some `timeout` period. /// - In the context of a non-blocking lock, this means the lock was busy at the moment of the call. - #[fail(display = "Could not acquire lock as it is already held, possibly by a different process.")] + #[fail( + display = "Could not acquire lock as it is already held, possibly by a different process." + )] LockBusy, /// Trying to acquire a lock failed with an `IOError` #[fail(display = "Failed to acquire the lock due to an io:Error.")] - IOError(io::Error) + IOError(io::Error), } /// General IO error with an optional path to the offending file. diff --git a/src/directory/managed_directory.rs b/src/directory/managed_directory.rs index 471b0cd1b..5367494f0 100644 --- a/src/directory/managed_directory.rs +++ b/src/directory/managed_directory.rs @@ -1,5 +1,8 @@ use core::MANAGED_FILEPATH; use directory::error::{DeleteError, IOError, LockError, OpenReadError, OpenWriteError}; +use directory::DirectoryLock; +use directory::Lock; +use directory::META_LOCK; use directory::{ReadOnlySource, WritePtr}; use error::DataCorruption; use serde_json; @@ -12,9 +15,6 @@ use std::sync::RwLockWriteGuard; use std::sync::{Arc, RwLock}; use Directory; use Result; -use directory::META_LOCK; -use directory::Lock; -use directory::DirectoryLock; /// Returns true iff the file is "managed". /// Non-managed file are not subject to garbage collection. diff --git a/src/directory/mmap_directory.rs b/src/directory/mmap_directory.rs index 134004f9c..ed4a3563a 100644 --- a/src/directory/mmap_directory.rs +++ b/src/directory/mmap_directory.rs @@ -3,9 +3,12 @@ extern crate fs2; use self::fs2::FileExt; use atomicwrites; use common::make_io_err; +use directory::error::LockError; use directory::error::{DeleteError, IOError, OpenDirectoryError, OpenReadError, OpenWriteError}; use directory::shared_vec_slice::SharedVecSlice; use directory::Directory; +use directory::DirectoryLock; +use directory::Lock; use directory::ReadOnlySource; use directory::WritePtr; use fst::raw::MmapReadOnly; @@ -22,9 +25,6 @@ use std::result; use std::sync::Arc; use std::sync::RwLock; use tempdir::TempDir; -use directory::Lock; -use directory::DirectoryLock; -use directory::error::LockError; /// Returns None iff the file exists, can be read, but is empty (and hence /// cannot be mmapped). @@ -233,7 +233,7 @@ impl MmapDirectory { /// is closed. struct ReleaseLockFile { _file: File, - path: PathBuf + path: PathBuf, } impl Drop for ReleaseLockFile { @@ -386,23 +386,21 @@ impl Directory for MmapDirectory { fn acquire_lock(&self, lock: &Lock) -> Result { let full_path = self.resolve_path(&lock.filepath); - // We make sure that the file exists. + // We make sure that the file exists. let file: File = OpenOptions::new() .write(true) .create(true) //< if the file does not exist yet, create it. .open(&full_path) - .map_err(|err| LockError::IOError(err))?; + .map_err(LockError::IOError)?; if lock.is_blocking { - file.lock_exclusive() - .map_err(LockError::IOError)?; + file.lock_exclusive().map_err(LockError::IOError)?; } else { - file.try_lock_exclusive() - .map_err(|_| LockError::LockBusy)? + file.try_lock_exclusive().map_err(|_| LockError::LockBusy)? } // dropping the file handle will release the lock. Ok(DirectoryLock::from(Box::new(ReleaseLockFile { path: lock.filepath.clone(), - _file: file + _file: file, }))) } } diff --git a/src/directory/mod.rs b/src/directory/mod.rs index 84de77660..29fce4fb4 100644 --- a/src/directory/mod.rs +++ b/src/directory/mod.rs @@ -8,21 +8,21 @@ WORM directory abstraction. mod mmap_directory; mod directory; +mod directory_lock; mod managed_directory; mod ram_directory; mod read_only_source; mod shared_vec_slice; -mod directory_lock; /// Errors specific to the directory module. pub mod error; -use std::io::{BufWriter, Seek, Write}; pub use self::directory::DirectoryLock; pub use self::directory::{Directory, DirectoryClone}; +pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK}; pub use self::ram_directory::RAMDirectory; pub use self::read_only_source::ReadOnlySource; -pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK}; +use std::io::{BufWriter, Seek, Write}; #[cfg(feature = "mmap")] pub use self::mmap_directory::MmapDirectory; diff --git a/src/directory/tests.rs b/src/directory/tests.rs index 0b018f898..11bedfeb5 100644 --- a/src/directory/tests.rs +++ b/src/directory/tests.rs @@ -125,23 +125,37 @@ fn test_directory(directory: &mut Directory) { fn test_lock_non_blocking(directory: &mut Directory) { { - let lock_a_res = directory.acquire_lock(&Lock { filepath: PathBuf::from("a.lock"), is_blocking: false }); + let lock_a_res = directory.acquire_lock(&Lock { + filepath: PathBuf::from("a.lock"), + is_blocking: false, + }); assert!(lock_a_res.is_ok()); - let lock_b_res = directory.acquire_lock(&Lock { filepath: PathBuf::from("b.lock"), is_blocking: false }); + let lock_b_res = directory.acquire_lock(&Lock { + filepath: PathBuf::from("b.lock"), + is_blocking: false, + }); assert!(lock_b_res.is_ok()); - let lock_a_res2 = directory.acquire_lock(&Lock { filepath: PathBuf::from("a.lock"), is_blocking: false }); + let lock_a_res2 = directory.acquire_lock(&Lock { + filepath: PathBuf::from("a.lock"), + is_blocking: false, + }); assert!(lock_a_res2.is_err()); } - let lock_a_res = directory.acquire_lock(&Lock { filepath: PathBuf::from("a.lock"), is_blocking: false }); + let lock_a_res = directory.acquire_lock(&Lock { + filepath: PathBuf::from("a.lock"), + is_blocking: false, + }); assert!(lock_a_res.is_ok()); } - - fn test_lock_blocking(directory: &mut Directory) { - let lock_a_res = directory.acquire_lock(&Lock { filepath: PathBuf::from("a.lock"), is_blocking: true }); + let lock_a_res = directory.acquire_lock(&Lock { + filepath: PathBuf::from("a.lock"), + is_blocking: true, + }); assert!(lock_a_res.is_ok()); - std::thread::spawn(move || { //< lock_a_res is sent to the thread. + std::thread::spawn(move || { + //< lock_a_res is sent to the thread. std::thread::sleep(time::Duration::from_millis(10)); // explicitely droping lock_a_res. It would have been sufficient to just force it // to be part of the move, but the intent seems clearer that way. @@ -149,13 +163,20 @@ fn test_lock_blocking(directory: &mut Directory) { }); { // A non-blocking call should fail, as the thread is running and holding the lock. - let lock_a_res = directory.acquire_lock(&Lock { filepath: PathBuf::from("a.lock"), is_blocking: false }); + let lock_a_res = directory.acquire_lock(&Lock { + filepath: PathBuf::from("a.lock"), + is_blocking: false, + }); assert!(lock_a_res.is_err()); } - { // the blocking call should wait for at least 10ms. + { + // the blocking call should wait for at least 10ms. let start = time::Instant::now(); - let lock_a_res = directory.acquire_lock(&Lock { filepath: PathBuf::from("a.lock"), is_blocking: true }); + let lock_a_res = directory.acquire_lock(&Lock { + filepath: PathBuf::from("a.lock"), + is_blocking: true, + }); assert!(lock_a_res.is_ok()); assert!(start.elapsed().subsec_millis() >= 10); } -} \ No newline at end of file +} diff --git a/src/error.rs b/src/error.rs index 9094128e7..7c8bb25e7 100644 --- a/src/error.rs +++ b/src/error.rs @@ -2,6 +2,7 @@ use std::io; +use directory::error::LockError; use directory::error::{IOError, OpenDirectoryError, OpenReadError, OpenWriteError}; use fastfield::FastFieldNotAvailableError; use query; @@ -10,7 +11,6 @@ use serde_json; use std::fmt; use std::path::PathBuf; use std::sync::PoisonError; -use directory::error::LockError; pub struct DataCorruption { filepath: Option, @@ -57,7 +57,7 @@ pub enum TantivyError { #[fail(display = "Index already exists")] IndexAlreadyExists, /// Failed to acquire file lock - #[fail(display = "Failed to acquire Lockfile: {:?}. {:?}", _0, _1)] + #[fail(display = "Failed to acquire Lockfile: {:?}. {:?}", _0, _1)] LockFailure(LockError, Option), /// IO Error. #[fail(display = "An IO error occurred: '{}'", _0)] diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 13daade40..d2a2f02c5 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -9,9 +9,11 @@ use core::SegmentId; use core::SegmentMeta; use core::SegmentReader; use crossbeam::channel; +use directory::DirectoryLock; use docset::DocSet; use error::TantivyError; use fastfield::write_delete_bitset; +use futures::{Canceled, Future}; use indexer::delete_queue::{DeleteCursor, DeleteQueue}; use indexer::doc_opstamp_mapping::DocToOpstampMapping; use indexer::operation::DeleteOperation; @@ -28,8 +30,6 @@ use std::sync::Arc; use std::thread; use std::thread::JoinHandle; use Result; -use directory::DirectoryLock; -use futures::{Future, Canceled}; // Size of the margin for the heap. A segment is closed when the remaining memory // in the heap goes below MARGIN_IN_BYTES. @@ -459,7 +459,10 @@ impl IndexWriter { /// Merges a given list of segments /// /// `segment_ids` is required to be non-empty. - pub fn merge(&mut self, segment_ids: &[SegmentId]) -> Result> { + pub fn merge( + &mut self, + segment_ids: &[SegmentId], + ) -> Result> { self.segment_updater.start_merge(segment_ids) } @@ -652,12 +655,12 @@ impl IndexWriter { mod tests { use super::initial_table_size; + use directory::error::LockError; use error::*; use indexer::NoMergePolicy; use schema::{self, Document}; use Index; use Term; - use directory::error::LockError; #[test] fn test_lockfile_stops_duplicates() { diff --git a/src/indexer/segment_register.rs b/src/indexer/segment_register.rs index 2133989ce..74234f2de 100644 --- a/src/indexer/segment_register.rs +++ b/src/indexer/segment_register.rs @@ -76,9 +76,7 @@ impl SegmentRegister { } pub fn get(&self, segment_id: &SegmentId) -> Option { - self.segment_states - .get(segment_id) - .map(|segment_entry| segment_entry.clone()) + self.segment_states.get(segment_id).cloned() } pub fn new(segment_metas: Vec, delete_cursor: &DeleteCursor) -> SegmentRegister { diff --git a/src/lib.rs b/src/lib.rs index 226d725c5..c5442ae56 100755 --- a/src/lib.rs +++ b/src/lib.rs @@ -129,7 +129,6 @@ extern crate base64; extern crate bit_set; extern crate bitpacking; extern crate byteorder; -extern crate scoped_pool; extern crate combine; extern crate crossbeam; extern crate fnv; @@ -144,6 +143,7 @@ extern crate num_cpus; extern crate owning_ref; extern crate regex; extern crate rust_stemmers; +extern crate scoped_pool; extern crate serde; extern crate stable_deref_trait; extern crate tempdir;