From 644b4bd0a1dc98be434fe268cea1bccd81ec0f22 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Sun, 27 Jan 2019 12:32:21 +0100 Subject: [PATCH] Issue/468b (#482) * Moving lock to directory/ * added fs2 * doc * Using fs2 for locking * Added unit test * Fixed error message related unit test * Fixing location of import --- Cargo.toml | 3 +- src/core/index.rs | 17 ++- src/directory/directory.rs | 116 +++++++++++++++++++++ src/directory/directory_lock.rs | 58 +++++++++++ src/directory/error.rs | 20 ++++ src/directory/managed_directory.rs | 12 ++- src/directory/mmap_directory.rs | 51 +++++++++ src/directory/mod.rs | 130 +---------------------- src/directory/tests.rs | 161 +++++++++++++++++++++++++++++ src/error.rs | 15 +-- src/indexer/directory_lock.rs | 131 ----------------------- src/indexer/index_writer.rs | 10 +- src/indexer/mod.rs | 4 +- src/lib.rs | 2 - 14 files changed, 449 insertions(+), 281 deletions(-) create mode 100644 src/directory/directory_lock.rs create mode 100644 src/directory/tests.rs delete mode 100644 src/indexer/directory_lock.rs diff --git a/Cargo.toml b/Cargo.toml index 98a53b462..5158747e7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ serde = "1.0" serde_derive = "1.0" serde_json = "1.0" num_cpus = "1.2" +fs2={version="0.4", optional=true} itertools = "0.8" levenshtein_automata = {version="0.1", features=["fst_automaton"]} bit-set = "0.5" @@ -70,7 +71,7 @@ overflow-checks = true [features] # by default no-fail is disabled. We manually enable it when running test. default = ["mmap", "no_fail"] -mmap = ["fst/mmap", "atomicwrites"] +mmap = ["fst/mmap", "atomicwrites", "fs2"] lz4-compression = ["lz4"] no_fail = ["fail/no_fail"] unstable = [] # useful for benches. diff --git a/src/core/index.rs b/src/core/index.rs index 005746cf2..9ff9b50f3 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -18,7 +18,6 @@ use error::TantivyError; use indexer::index_writer::open_index_writer; use indexer::index_writer::HEAP_SIZE_MIN; use indexer::segment_updater::save_new_metas; -use indexer::LockType; use num_cpus; use schema::Field; use schema::FieldType; @@ -33,6 +32,8 @@ use tokenizer::BoxedTokenizer; use tokenizer::TokenizerManager; use IndexWriter; use Result; +use directory::INDEX_WRITER_LOCK; +use directory::META_LOCK; fn load_metas(directory: &Directory) -> Result { let meta_data = directory.atomic_read(&META_FILEPATH)?; @@ -232,7 +233,8 @@ impl Index { /// Each thread will receive a budget of `overall_heap_size_in_bytes / num_threads`. /// /// # Errors - /// If the lockfile already exists, returns `Error::FileAlreadyExists`. + /// If the lockfile already exists, returns `Error::DirectoryLockBusy` or an `Error::IOError`. + /// /// # Panics /// If the heap size per thread is too small, panics. pub fn writer_with_num_threads( @@ -240,7 +242,14 @@ impl Index { num_threads: usize, overall_heap_size_in_bytes: usize, ) -> Result { - let directory_lock = LockType::IndexWriterLock.acquire_lock(&self.directory)?; + let directory_lock = self.directory.acquire_lock(&INDEX_WRITER_LOCK) + .map_err(|err| { + TantivyError::LockFailure(err, + Some("Failed to acquire index lock. If you are using\ + a regular directory, this means there is already an \ + `IndexWriter` working on this `Directory`, in this process \ + or in a different process.".to_string()))} + )?; let heap_size_in_bytes_per_thread = overall_heap_size_in_bytes / num_threads; open_index_writer( self, @@ -339,7 +348,7 @@ impl Index { /// get the freshest `index` at all time, is to watch `meta.json` and /// call `load_searchers` whenever a changes happen. pub fn load_searchers(&self) -> Result<()> { - let _meta_lock = LockType::MetaLock.acquire_lock(self.directory())?; + let _meta_lock = self.directory().acquire_lock(&META_LOCK)?; let searchable_segments = self.searchable_segments()?; let segment_readers: Vec = searchable_segments .iter() diff --git a/src/directory/directory.rs b/src/directory/directory.rs index 0f99be74b..3dda08123 100644 --- a/src/directory/directory.rs +++ b/src/directory/directory.rs @@ -2,10 +2,102 @@ use directory::error::{DeleteError, OpenReadError, OpenWriteError}; use directory::{ReadOnlySource, WritePtr}; use std::fmt; use std::io; +use std::io::Write; use std::marker::Send; use std::marker::Sync; use std::path::Path; use std::result; +use directory::error::LockError; +use std::time::Duration; +use std::thread; +use std::path::PathBuf; +use directory::directory_lock::Lock; + + +/// Retry the logic of acquiring locks is pretty simple. +/// We just retry `n` times after a given `duratio`, both +/// depending on the type of lock. +struct RetryPolicy { + num_retries: usize, + wait_in_ms: u64, +} + +impl RetryPolicy { + fn no_retry() -> RetryPolicy { + RetryPolicy { + num_retries: 0, + wait_in_ms: 0, + } + } + + fn wait_and_retry(&mut self) -> bool { + if self.num_retries == 0 { + false + } else { + self.num_retries -= 1; + let wait_duration = Duration::from_millis(self.wait_in_ms); + thread::sleep(wait_duration); + true + } + } +} + +/// The `DirectoryLock` is an object that represents a file lock. +/// See [`LockType`](struct.LockType.html) +/// +/// It is transparently associated to a lock file, that gets deleted +/// on `Drop.` The lock is released automatically on `Drop`. +pub struct DirectoryLock(Box); + +struct DirectoryLockGuard { + directory: Box, + path: PathBuf +} + +impl From> for DirectoryLock { + fn from(underlying: Box) -> Self { + DirectoryLock(underlying) + } +} + +impl Drop for DirectoryLockGuard { + fn drop(&mut self) { + if let Err(e) = self.directory.delete(&*self.path) { + error!("Failed to remove the lock file. {:?}", e); + } + } +} + +enum TryAcquireLockError { + FileExists, + IOError(io::Error), +} + +fn try_acquire_lock(filepath: &Path, directory: &mut Directory) -> Result { + let mut write = directory.open_write(filepath).map_err(|e| match e { + OpenWriteError::FileAlreadyExists(_) => TryAcquireLockError::FileExists, + OpenWriteError::IOError(io_error) => TryAcquireLockError::IOError(io_error.into()), + })?; + write + .flush() + .map_err(TryAcquireLockError::IOError)?; + Ok(DirectoryLock::from(Box::new(DirectoryLockGuard { + directory: directory.box_clone(), + path: filepath.to_owned(), + }))) +} + + +fn retry_policy(is_blocking: bool) -> RetryPolicy { + if is_blocking { + RetryPolicy { + num_retries: 100, + wait_in_ms: 100, + } + } else { + RetryPolicy::no_retry() + } +} /// Write-once read many (WORM) abstraction for where /// tantivy's data should be stored. @@ -73,6 +165,30 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static { /// /// The file may or may not previously exist. fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()>; + + + /// Acquire a lock in the given directory. + /// + /// The method is blocking or not depending on the `Lock` object. + fn acquire_lock(&self, lock: &Lock) -> Result { + let mut box_directory = self.box_clone(); + let mut retry_policy = retry_policy(lock.is_blocking); + loop { + match try_acquire_lock(&lock.filepath, &mut *box_directory) { + Ok(result) => { + return Ok(result); + } + Err(TryAcquireLockError::FileExists) => { + if !retry_policy.wait_and_retry() { + return Err(LockError::LockBusy); + } + } + Err(TryAcquireLockError::IOError(io_error)) => { + return Err(LockError::IOError(io_error)); + } + } + } + } } /// DirectoryClone diff --git a/src/directory/directory_lock.rs b/src/directory/directory_lock.rs new file mode 100644 index 000000000..afb79199a --- /dev/null +++ b/src/directory/directory_lock.rs @@ -0,0 +1,58 @@ +use std::path::PathBuf; + +/// A directory lock. +/// +/// A lock is associated to a specific path and some +/// [`LockParams`](./enum.LockParams.html). +/// Tantivy itself uses only two locks but client application +/// can use the directory facility to define their own locks. +/// - [INDEX_WRITER_LOCK](./struct.INDEX_WRITER_LOCK.html) +/// - [META_LOCK](./struct.META_LOCK.html) +/// +/// Check out these locks documentation for more information. +/// +#[derive(Debug)] +pub struct Lock { + /// The lock needs to be associated with its own file `path`. + /// Depending on the platform, the lock might rely on the creation + /// and deletion of this filepath. + pub filepath: PathBuf, + /// `lock_params` describes whether acquiring the lock is meant + /// to be a blocking operation or a non-blocking. + /// + /// Acquiring a blocking lock blocks until the lock is + /// available. + /// Acquiring a blocking lock returns rapidly, either successfully + /// or with an error signifying that someone is already holding + /// the lock. + pub is_blocking: bool, +} + + +lazy_static! { + /// Only one process should be able to write tantivy's index at a time. + /// This lock file, when present, is in charge of preventing other processes to open an IndexWriter. + /// + /// If the process is killed and this file remains, it is safe to remove it manually. + /// + /// Failing to acquire this lock usually means a misuse of tantivy's API, + /// (creating more than one instance of the `IndexWriter`), are a spurious + /// lock file remaining after a crash. In the latter case, removing the file after + /// checking no process running tantivy is running is safe. + pub static ref INDEX_WRITER_LOCK: Lock = Lock { + filepath: PathBuf::from(".tantivy-writer.lock"), + is_blocking: false + }; + /// The meta lock file is here to protect the segment files being opened by + /// `.load_searchers()` from being garbage collected. + /// It makes it possible for another process to safely consume + /// our index in-writing. Ideally, we may have prefered `RWLock` semantics + /// here, but it is difficult to achieve on Windows. + /// + /// Opening segment readers is a very fast process. + pub static ref META_LOCK: Lock = Lock { + filepath: PathBuf::from(".tantivy-meta.lock"), + is_blocking: true + }; +} + diff --git a/src/directory/error.rs b/src/directory/error.rs index 8d4b5e572..6997c6eee 100644 --- a/src/directory/error.rs +++ b/src/directory/error.rs @@ -3,6 +3,20 @@ use std::fmt; use std::io; use std::path::PathBuf; +/// Error while trying to acquire a directory lock. +#[derive(Debug, Fail)] +pub enum LockError { + /// Failed to acquired a lock as it is already hold by another + /// client. + /// - In the context of a blocking lock, this means the lock was not released within some `timeout` period. + /// - In the context of a non-blocking lock, this means the lock was busy at the moment of the call. + #[fail(display = "Could not acquire lock as it is already held, possibly by a different process.")] + LockBusy, + /// Trying to acquire a lock failed with an `IOError` + #[fail(display = "Failed to acquire the lock due to an io:Error.")] + IOError(io::Error) +} + /// General IO error with an optional path to the offending file. #[derive(Debug)] pub struct IOError { @@ -10,6 +24,12 @@ pub struct IOError { err: io::Error, } +impl Into for IOError { + fn into(self) -> io::Error { + self.err + } +} + impl fmt::Display for IOError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self.path { diff --git a/src/directory/managed_directory.rs b/src/directory/managed_directory.rs index 19e76556e..471b0cd1b 100644 --- a/src/directory/managed_directory.rs +++ b/src/directory/managed_directory.rs @@ -1,8 +1,7 @@ use core::MANAGED_FILEPATH; -use directory::error::{DeleteError, IOError, OpenReadError, OpenWriteError}; +use directory::error::{DeleteError, IOError, LockError, OpenReadError, OpenWriteError}; use directory::{ReadOnlySource, WritePtr}; use error::DataCorruption; -use indexer::LockType; use serde_json; use std::collections::HashSet; use std::io; @@ -13,6 +12,9 @@ use std::sync::RwLockWriteGuard; use std::sync::{Arc, RwLock}; use Directory; use Result; +use directory::META_LOCK; +use directory::Lock; +use directory::DirectoryLock; /// Returns true iff the file is "managed". /// Non-managed file are not subject to garbage collection. @@ -125,7 +127,7 @@ impl ManagedDirectory { // 2) writer change meta.json (for instance after a merge or a commit) // 3) gc kicks in. // 4) gc removes a file that was useful for process B, before process B opened it. - if let Ok(_meta_lock) = LockType::MetaLock.acquire_lock(self) { + if let Ok(_meta_lock) = self.acquire_lock(&META_LOCK) { let living_files = get_living_files(); for managed_path in &meta_informations_rlock.managed_paths { if !living_files.contains(managed_path) { @@ -235,6 +237,10 @@ impl Directory for ManagedDirectory { fn exists(&self, path: &Path) -> bool { self.directory.exists(path) } + + fn acquire_lock(&self, lock: &Lock) -> result::Result { + self.directory.acquire_lock(lock) + } } impl Clone for ManagedDirectory { diff --git a/src/directory/mmap_directory.rs b/src/directory/mmap_directory.rs index 03031562f..134004f9c 100644 --- a/src/directory/mmap_directory.rs +++ b/src/directory/mmap_directory.rs @@ -1,3 +1,6 @@ +extern crate fs2; + +use self::fs2::FileExt; use atomicwrites; use common::make_io_err; use directory::error::{DeleteError, IOError, OpenDirectoryError, OpenReadError, OpenWriteError}; @@ -19,6 +22,9 @@ use std::result; use std::sync::Arc; use std::sync::RwLock; use tempdir::TempDir; +use directory::Lock; +use directory::DirectoryLock; +use directory::error::LockError; /// Returns None iff the file exists, can be read, but is empty (and hence /// cannot be mmapped). @@ -115,6 +121,14 @@ impl MmapCache { /// /// The Mmap object are cached to limit the /// system calls. +/// +/// In the `MmapDirectory`, locks are implemented using the `fs2` crate definition of locks. +/// +/// On MacOS & linux, it relies on `flock` (aka `BSD Lock`). These locks solve most of the +/// problems related to POSIX Locks, but may their contract may not be respected on `NFS` +/// depending on the implementation. +/// +/// On Windows the semantics are again different. #[derive(Clone)] pub struct MmapDirectory { root_path: PathBuf, @@ -213,6 +227,21 @@ impl MmapDirectory { } } +/// We rely on fs2 for file locking. On Windows & MacOS this +/// uses BSD locks (`flock`). The lock is actually released when +/// the `File` object is dropped and its associated file descriptor +/// is closed. +struct ReleaseLockFile { + _file: File, + path: PathBuf +} + +impl Drop for ReleaseLockFile { + fn drop(&mut self) { + debug!("Releasing lock {:?}", self.path); + } +} + /// This Write wraps a File, but has the specificity of /// call `sync_all` on flush. struct SafeFileWriter(File); @@ -354,6 +383,28 @@ impl Directory for MmapDirectory { meta_file.write(|f| f.write_all(data))?; Ok(()) } + + fn acquire_lock(&self, lock: &Lock) -> Result { + let full_path = self.resolve_path(&lock.filepath); + // We make sure that the file exists. + let file: File = OpenOptions::new() + .write(true) + .create(true) //< if the file does not exist yet, create it. + .open(&full_path) + .map_err(|err| LockError::IOError(err))?; + if lock.is_blocking { + file.lock_exclusive() + .map_err(LockError::IOError)?; + } else { + file.try_lock_exclusive() + .map_err(|_| LockError::LockBusy)? + } + // dropping the file handle will release the lock. + Ok(DirectoryLock::from(Box::new(ReleaseLockFile { + path: lock.filepath.clone(), + _file: file + }))) + } } #[cfg(test)] diff --git a/src/directory/mod.rs b/src/directory/mod.rs index c4627b88b..84de77660 100644 --- a/src/directory/mod.rs +++ b/src/directory/mod.rs @@ -12,15 +12,17 @@ mod managed_directory; mod ram_directory; mod read_only_source; mod shared_vec_slice; +mod directory_lock; /// Errors specific to the directory module. pub mod error; use std::io::{BufWriter, Seek, Write}; - +pub use self::directory::DirectoryLock; pub use self::directory::{Directory, DirectoryClone}; pub use self::ram_directory::RAMDirectory; pub use self::read_only_source::ReadOnlySource; +pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK}; #[cfg(feature = "mmap")] pub use self::mmap_directory::MmapDirectory; @@ -38,128 +40,4 @@ impl SeekableWrite for T {} pub type WritePtr = BufWriter>; #[cfg(test)] -mod tests { - - use super::*; - use std::io::{Seek, SeekFrom, Write}; - use std::path::Path; - - lazy_static! { - static ref TEST_PATH: &'static Path = Path::new("some_path_for_test"); - } - - #[test] - fn test_ram_directory() { - let mut ram_directory = RAMDirectory::create(); - test_directory(&mut ram_directory); - } - - #[test] - #[cfg(feature = "mmap")] - fn test_mmap_directory() { - let mut mmap_directory = MmapDirectory::create_from_tempdir().unwrap(); - test_directory(&mut mmap_directory); - } - - #[test] - #[should_panic] - fn ram_directory_panics_if_flush_forgotten() { - let mut ram_directory = RAMDirectory::create(); - let mut write_file = ram_directory.open_write(*TEST_PATH).unwrap(); - assert!(write_file.write_all(&[4]).is_ok()); - } - - fn test_simple(directory: &mut Directory) { - { - { - let mut write_file = directory.open_write(*TEST_PATH).unwrap(); - assert!(directory.exists(*TEST_PATH)); - write_file.write_all(&[4]).unwrap(); - write_file.write_all(&[3]).unwrap(); - write_file.write_all(&[7, 3, 5]).unwrap(); - write_file.flush().unwrap(); - } - let read_file = directory.open_read(*TEST_PATH).unwrap(); - let data: &[u8] = &*read_file; - assert_eq!(data, &[4u8, 3u8, 7u8, 3u8, 5u8]); - } - - assert!(directory.delete(*TEST_PATH).is_ok()); - assert!(!directory.exists(*TEST_PATH)); - } - - fn test_seek(directory: &mut Directory) { - { - { - let mut write_file = directory.open_write(*TEST_PATH).unwrap(); - write_file.write_all(&[4, 3, 7, 3, 5]).unwrap(); - write_file.seek(SeekFrom::Start(0)).unwrap(); - write_file.write_all(&[3, 1]).unwrap(); - write_file.flush().unwrap(); - } - let read_file = directory.open_read(*TEST_PATH).unwrap(); - let data: &[u8] = &*read_file; - assert_eq!(data, &[3u8, 1u8, 7u8, 3u8, 5u8]); - } - - assert!(directory.delete(*TEST_PATH).is_ok()); - } - - fn test_rewrite_forbidden(directory: &mut Directory) { - { - directory.open_write(*TEST_PATH).unwrap(); - assert!(directory.exists(*TEST_PATH)); - } - { - assert!(directory.open_write(*TEST_PATH).is_err()); - } - assert!(directory.delete(*TEST_PATH).is_ok()); - } - - fn test_write_create_the_file(directory: &mut Directory) { - { - assert!(directory.open_read(*TEST_PATH).is_err()); - let _w = directory.open_write(*TEST_PATH).unwrap(); - assert!(directory.exists(*TEST_PATH)); - assert!(directory.open_read(*TEST_PATH).is_ok()); - assert!(directory.delete(*TEST_PATH).is_ok()); - } - } - - fn test_directory_delete(directory: &mut Directory) { - assert!(directory.open_read(*TEST_PATH).is_err()); - let mut write_file = directory.open_write(*TEST_PATH).unwrap(); - write_file.write_all(&[1, 2, 3, 4]).unwrap(); - write_file.flush().unwrap(); - { - let read_handle = directory.open_read(*TEST_PATH).unwrap(); - { - assert_eq!(&*read_handle, &[1u8, 2u8, 3u8, 4u8]); - - // Mapped files can't be deleted on Windows - if !cfg!(windows) { - assert!(directory.delete(*TEST_PATH).is_ok()); - assert_eq!(&*read_handle, &[1u8, 2u8, 3u8, 4u8]); - } - - assert!(directory.delete(Path::new("SomeOtherPath")).is_err()); - } - } - - if cfg!(windows) { - assert!(directory.delete(*TEST_PATH).is_ok()); - } - - assert!(directory.open_read(*TEST_PATH).is_err()); - assert!(directory.delete(*TEST_PATH).is_err()); - } - - fn test_directory(directory: &mut Directory) { - test_simple(directory); - test_seek(directory); - test_rewrite_forbidden(directory); - test_write_create_the_file(directory); - test_directory_delete(directory); - } - -} +mod tests; diff --git a/src/directory/tests.rs b/src/directory/tests.rs new file mode 100644 index 000000000..0b018f898 --- /dev/null +++ b/src/directory/tests.rs @@ -0,0 +1,161 @@ +use super::*; +use std::io::{Seek, SeekFrom, Write}; +use std::path::{Path, PathBuf}; +use std::time; + +lazy_static! { + static ref TEST_PATH: &'static Path = Path::new("some_path_for_test"); +} + +#[test] +fn test_ram_directory() { + let mut ram_directory = RAMDirectory::create(); + test_directory(&mut ram_directory); +} + +#[test] +#[cfg(feature = "mmap")] +fn test_mmap_directory() { + let mut mmap_directory = MmapDirectory::create_from_tempdir().unwrap(); + test_directory(&mut mmap_directory); +} + +#[test] +#[should_panic] +fn ram_directory_panics_if_flush_forgotten() { + let mut ram_directory = RAMDirectory::create(); + let mut write_file = ram_directory.open_write(*TEST_PATH).unwrap(); + assert!(write_file.write_all(&[4]).is_ok()); +} + +fn test_simple(directory: &mut Directory) { + { + { + let mut write_file = directory.open_write(*TEST_PATH).unwrap(); + assert!(directory.exists(*TEST_PATH)); + write_file.write_all(&[4]).unwrap(); + write_file.write_all(&[3]).unwrap(); + write_file.write_all(&[7, 3, 5]).unwrap(); + write_file.flush().unwrap(); + } + let read_file = directory.open_read(*TEST_PATH).unwrap(); + let data: &[u8] = &*read_file; + assert_eq!(data, &[4u8, 3u8, 7u8, 3u8, 5u8]); + } + + assert!(directory.delete(*TEST_PATH).is_ok()); + assert!(!directory.exists(*TEST_PATH)); +} + +fn test_seek(directory: &mut Directory) { + { + { + let mut write_file = directory.open_write(*TEST_PATH).unwrap(); + write_file.write_all(&[4, 3, 7, 3, 5]).unwrap(); + write_file.seek(SeekFrom::Start(0)).unwrap(); + write_file.write_all(&[3, 1]).unwrap(); + write_file.flush().unwrap(); + } + let read_file = directory.open_read(*TEST_PATH).unwrap(); + let data: &[u8] = &*read_file; + assert_eq!(data, &[3u8, 1u8, 7u8, 3u8, 5u8]); + } + + assert!(directory.delete(*TEST_PATH).is_ok()); +} + +fn test_rewrite_forbidden(directory: &mut Directory) { + { + directory.open_write(*TEST_PATH).unwrap(); + assert!(directory.exists(*TEST_PATH)); + } + { + assert!(directory.open_write(*TEST_PATH).is_err()); + } + assert!(directory.delete(*TEST_PATH).is_ok()); +} + +fn test_write_create_the_file(directory: &mut Directory) { + { + assert!(directory.open_read(*TEST_PATH).is_err()); + let _w = directory.open_write(*TEST_PATH).unwrap(); + assert!(directory.exists(*TEST_PATH)); + assert!(directory.open_read(*TEST_PATH).is_ok()); + assert!(directory.delete(*TEST_PATH).is_ok()); + } +} + +fn test_directory_delete(directory: &mut Directory) { + assert!(directory.open_read(*TEST_PATH).is_err()); + let mut write_file = directory.open_write(*TEST_PATH).unwrap(); + write_file.write_all(&[1, 2, 3, 4]).unwrap(); + write_file.flush().unwrap(); + { + let read_handle = directory.open_read(*TEST_PATH).unwrap(); + { + assert_eq!(&*read_handle, &[1u8, 2u8, 3u8, 4u8]); + + // Mapped files can't be deleted on Windows + if !cfg!(windows) { + assert!(directory.delete(*TEST_PATH).is_ok()); + assert_eq!(&*read_handle, &[1u8, 2u8, 3u8, 4u8]); + } + + assert!(directory.delete(Path::new("SomeOtherPath")).is_err()); + } + } + + if cfg!(windows) { + assert!(directory.delete(*TEST_PATH).is_ok()); + } + + assert!(directory.open_read(*TEST_PATH).is_err()); + assert!(directory.delete(*TEST_PATH).is_err()); +} + +fn test_directory(directory: &mut Directory) { + test_simple(directory); + test_seek(directory); + test_rewrite_forbidden(directory); + test_write_create_the_file(directory); + test_directory_delete(directory); + test_lock_non_blocking(directory); + test_lock_blocking(directory); +} + +fn test_lock_non_blocking(directory: &mut Directory) { + { + let lock_a_res = directory.acquire_lock(&Lock { filepath: PathBuf::from("a.lock"), is_blocking: false }); + assert!(lock_a_res.is_ok()); + let lock_b_res = directory.acquire_lock(&Lock { filepath: PathBuf::from("b.lock"), is_blocking: false }); + assert!(lock_b_res.is_ok()); + let lock_a_res2 = directory.acquire_lock(&Lock { filepath: PathBuf::from("a.lock"), is_blocking: false }); + assert!(lock_a_res2.is_err()); + } + let lock_a_res = directory.acquire_lock(&Lock { filepath: PathBuf::from("a.lock"), is_blocking: false }); + assert!(lock_a_res.is_ok()); +} + + + +fn test_lock_blocking(directory: &mut Directory) { + let lock_a_res = directory.acquire_lock(&Lock { filepath: PathBuf::from("a.lock"), is_blocking: true }); + assert!(lock_a_res.is_ok()); + std::thread::spawn(move || { //< lock_a_res is sent to the thread. + std::thread::sleep(time::Duration::from_millis(10)); + // explicitely droping lock_a_res. It would have been sufficient to just force it + // to be part of the move, but the intent seems clearer that way. + drop(lock_a_res); + }); + { + // A non-blocking call should fail, as the thread is running and holding the lock. + let lock_a_res = directory.acquire_lock(&Lock { filepath: PathBuf::from("a.lock"), is_blocking: false }); + assert!(lock_a_res.is_err()); + } + { // the blocking call should wait for at least 10ms. + let start = time::Instant::now(); + let lock_a_res = directory.acquire_lock(&Lock { filepath: PathBuf::from("a.lock"), is_blocking: true }); + assert!(lock_a_res.is_ok()); + assert!(start.elapsed().subsec_millis() >= 10); + } +} \ No newline at end of file diff --git a/src/error.rs b/src/error.rs index d460aa03a..9094128e7 100644 --- a/src/error.rs +++ b/src/error.rs @@ -4,13 +4,13 @@ use std::io; use directory::error::{IOError, OpenDirectoryError, OpenReadError, OpenWriteError}; use fastfield::FastFieldNotAvailableError; -use indexer::LockType; use query; use schema; use serde_json; use std::fmt; use std::path::PathBuf; use std::sync::PoisonError; +use directory::error::LockError; pub struct DataCorruption { filepath: Option, @@ -57,11 +57,8 @@ pub enum TantivyError { #[fail(display = "Index already exists")] IndexAlreadyExists, /// Failed to acquire file lock - #[fail( - display = "Failed to acquire Lockfile: {:?}. Possible causes: another IndexWriter instance or panic during previous lock drop.", - _0 - )] - LockFailure(LockType), + #[fail(display = "Failed to acquire Lockfile: {:?}. {:?}", _0, _1)] + LockFailure(LockError, Option), /// IO Error. #[fail(display = "An IO error occurred: '{}'", _0)] IOError(#[cause] IOError), @@ -100,6 +97,12 @@ impl From for TantivyError { } } +impl From for TantivyError { + fn from(lock_error: LockError) -> TantivyError { + TantivyError::LockFailure(lock_error, None) + } +} + impl From for TantivyError { fn from(io_error: IOError) -> TantivyError { TantivyError::IOError(io_error) diff --git a/src/indexer/directory_lock.rs b/src/indexer/directory_lock.rs deleted file mode 100644 index 172165bc2..000000000 --- a/src/indexer/directory_lock.rs +++ /dev/null @@ -1,131 +0,0 @@ -use directory::error::OpenWriteError; -use std::io::Write; -use std::path::{Path, PathBuf}; -use std::thread; -use std::time::Duration; -use Directory; -use TantivyError; - -#[derive(Debug, Clone, Copy)] -pub enum LockType { - /// Only one process should be able to write tantivy's index at a time. - /// This lock file, when present, is in charge of preventing other processes to open an IndexWriter. - /// - /// If the process is killed and this file remains, it is safe to remove it manually. - /// - /// Failing to acquire this lock usually means a misuse of tantivy's API, - /// (creating more than one instance of the `IndexWriter`), are a spurious - /// lock file remaining after a crash. In the latter case, removing the file after - /// checking no process running tantivy is running is safe. - IndexWriterLock, - /// The meta lock file is here to protect the segment files being opened by - /// `.load_searchers()` from being garbage collected. - /// It makes it possible for another process to safely consume - /// our index in-writing. Ideally, we may have prefered `RWLock` semantics - /// here, but it is difficult to achieve on Windows. - /// - /// Opening segment readers is a very fast process. - /// Right now if the lock cannot be acquire on the first attempt, the logic - /// is very simplistic. We retry after `100ms` until we effectively - /// acquire the lock. - /// This lock should not have much contention in normal usage. - MetaLock, -} - -/// Retry the logic of acquiring locks is pretty simple. -/// We just retry `n` times after a given `duratio`, both -/// depending on the type of lock. -struct RetryPolicy { - num_retries: usize, - wait_in_ms: u64, -} - -impl RetryPolicy { - fn no_retry() -> RetryPolicy { - RetryPolicy { - num_retries: 0, - wait_in_ms: 0, - } - } - - fn wait_and_retry(&mut self) -> bool { - if self.num_retries == 0 { - false - } else { - self.num_retries -= 1; - let wait_duration = Duration::from_millis(self.wait_in_ms); - thread::sleep(wait_duration); - true - } - } -} - -impl LockType { - fn retry_policy(self) -> RetryPolicy { - match self { - LockType::IndexWriterLock => RetryPolicy::no_retry(), - LockType::MetaLock => RetryPolicy { - num_retries: 100, - wait_in_ms: 100, - }, - } - } - - fn try_acquire_lock(self, directory: &mut Directory) -> Result { - let path = self.filename(); - let mut write = directory.open_write(path).map_err(|e| match e { - OpenWriteError::FileAlreadyExists(_) => TantivyError::LockFailure(self), - OpenWriteError::IOError(io_error) => TantivyError::IOError(io_error), - })?; - write.flush()?; - Ok(DirectoryLock { - directory: directory.box_clone(), - path: path.to_owned(), - }) - } - - /// Acquire a lock in the given directory. - pub fn acquire_lock(self, directory: &Directory) -> Result { - let mut box_directory = directory.box_clone(); - let mut retry_policy = self.retry_policy(); - loop { - let lock_result = self.try_acquire_lock(&mut *box_directory); - match lock_result { - Ok(result) => { - return Ok(result); - } - Err(TantivyError::LockFailure(ref filepath)) => { - if !retry_policy.wait_and_retry() { - return Err(TantivyError::LockFailure(filepath.to_owned())); - } - } - Err(_) => {} - } - } - } - - fn filename(&self) -> &Path { - match *self { - LockType::MetaLock => Path::new(".tantivy-meta.lock"), - LockType::IndexWriterLock => Path::new(".tantivy-indexer.lock"), - } - } -} - -/// The `DirectoryLock` is an object that represents a file lock. -/// See [`LockType`](struct.LockType.html) -/// -/// It is transparently associated to a lock file, that gets deleted -/// on `Drop.` The lock is release automatically on `Drop`. -pub struct DirectoryLock { - directory: Box, - path: PathBuf, -} - -impl Drop for DirectoryLock { - fn drop(&mut self) { - if let Err(e) = self.directory.delete(&*self.path) { - error!("Failed to remove the lock file. {:?}", e); - } - } -} diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index e07bce772..13daade40 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -16,7 +16,6 @@ use indexer::delete_queue::{DeleteCursor, DeleteQueue}; use indexer::doc_opstamp_mapping::DocToOpstampMapping; use indexer::operation::DeleteOperation; use indexer::stamper::Stamper; -use indexer::DirectoryLock; use indexer::MergePolicy; use indexer::SegmentEntry; use indexer::SegmentWriter; @@ -29,6 +28,7 @@ use std::sync::Arc; use std::thread; use std::thread::JoinHandle; use Result; +use directory::DirectoryLock; use futures::{Future, Canceled}; // Size of the margin for the heap. A segment is closed when the remaining memory @@ -657,6 +657,7 @@ mod tests { use schema::{self, Document}; use Index; use Term; + use directory::error::LockError; #[test] fn test_lockfile_stops_duplicates() { @@ -664,8 +665,8 @@ mod tests { let index = Index::create_in_ram(schema_builder.build()); let _index_writer = index.writer(3_000_000).unwrap(); match index.writer(3_000_000) { - Err(TantivyError::LockFailure(_)) => {} - _ => panic!("Expected FileAlreadyExists error"), + Err(TantivyError::LockFailure(LockError::LockBusy, _)) => {} + _ => panic!("Expected a `LockFailure` error"), } } @@ -677,8 +678,7 @@ mod tests { match index.writer_with_num_threads(1, 3_000_000) { Err(err) => { let err_msg = err.to_string(); - assert!(err_msg.contains("Lockfile")); - assert!(err_msg.contains("Possible causes:")) + assert!(err_msg.contains("already an `IndexWriter`")); } _ => panic!("Expected LockfileAlreadyExists error"), } diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index 2669d0df6..0620230ad 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -1,5 +1,5 @@ pub mod delete_queue; -mod directory_lock; + mod doc_opstamp_mapping; pub mod index_writer; mod log_merge_policy; @@ -16,8 +16,6 @@ pub mod segment_updater; mod segment_writer; mod stamper; -pub(crate) use self::directory_lock::DirectoryLock; -pub use self::directory_lock::LockType; pub use self::index_writer::IndexWriter; pub use self::log_merge_policy::LogMergePolicy; pub use self::merge_operation::{MergeOperation, MergeOperationInventory}; diff --git a/src/lib.rs b/src/lib.rs index 3262fd768..226d725c5 100755 --- a/src/lib.rs +++ b/src/lib.rs @@ -130,9 +130,7 @@ extern crate bit_set; extern crate bitpacking; extern crate byteorder; extern crate scoped_pool; - extern crate combine; - extern crate crossbeam; extern crate fnv; extern crate fst;