Issue/468b (#482)

* Moving lock to directory/

* added fs2

* doc

* Using fs2 for locking

* Added unit test

* Fixed error message related unit test

* Fixing location of import
This commit is contained in:
Paul Masurel
2019-01-27 12:32:21 +01:00
committed by GitHub
parent bf94fd77db
commit 644b4bd0a1
14 changed files with 449 additions and 281 deletions

View File

@@ -29,6 +29,7 @@ serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
num_cpus = "1.2"
fs2={version="0.4", optional=true}
itertools = "0.8"
levenshtein_automata = {version="0.1", features=["fst_automaton"]}
bit-set = "0.5"
@@ -70,7 +71,7 @@ overflow-checks = true
[features]
# by default no-fail is disabled. We manually enable it when running test.
default = ["mmap", "no_fail"]
mmap = ["fst/mmap", "atomicwrites"]
mmap = ["fst/mmap", "atomicwrites", "fs2"]
lz4-compression = ["lz4"]
no_fail = ["fail/no_fail"]
unstable = [] # useful for benches.

View File

@@ -18,7 +18,6 @@ use error::TantivyError;
use indexer::index_writer::open_index_writer;
use indexer::index_writer::HEAP_SIZE_MIN;
use indexer::segment_updater::save_new_metas;
use indexer::LockType;
use num_cpus;
use schema::Field;
use schema::FieldType;
@@ -33,6 +32,8 @@ use tokenizer::BoxedTokenizer;
use tokenizer::TokenizerManager;
use IndexWriter;
use Result;
use directory::INDEX_WRITER_LOCK;
use directory::META_LOCK;
fn load_metas(directory: &Directory) -> Result<IndexMeta> {
let meta_data = directory.atomic_read(&META_FILEPATH)?;
@@ -232,7 +233,8 @@ impl Index {
/// Each thread will receive a budget of `overall_heap_size_in_bytes / num_threads`.
///
/// # Errors
/// If the lockfile already exists, returns `Error::FileAlreadyExists`.
/// If the lockfile already exists, returns `Error::DirectoryLockBusy` or an `Error::IOError`.
///
/// # Panics
/// If the heap size per thread is too small, panics.
pub fn writer_with_num_threads(
@@ -240,7 +242,14 @@ impl Index {
num_threads: usize,
overall_heap_size_in_bytes: usize,
) -> Result<IndexWriter> {
let directory_lock = LockType::IndexWriterLock.acquire_lock(&self.directory)?;
let directory_lock = self.directory.acquire_lock(&INDEX_WRITER_LOCK)
.map_err(|err| {
TantivyError::LockFailure(err,
Some("Failed to acquire index lock. If you are using\
a regular directory, this means there is already an \
`IndexWriter` working on this `Directory`, in this process \
or in a different process.".to_string()))}
)?;
let heap_size_in_bytes_per_thread = overall_heap_size_in_bytes / num_threads;
open_index_writer(
self,
@@ -339,7 +348,7 @@ impl Index {
/// get the freshest `index` at all time, is to watch `meta.json` and
/// call `load_searchers` whenever a changes happen.
pub fn load_searchers(&self) -> Result<()> {
let _meta_lock = LockType::MetaLock.acquire_lock(self.directory())?;
let _meta_lock = self.directory().acquire_lock(&META_LOCK)?;
let searchable_segments = self.searchable_segments()?;
let segment_readers: Vec<SegmentReader> = searchable_segments
.iter()

View File

@@ -2,10 +2,102 @@ use directory::error::{DeleteError, OpenReadError, OpenWriteError};
use directory::{ReadOnlySource, WritePtr};
use std::fmt;
use std::io;
use std::io::Write;
use std::marker::Send;
use std::marker::Sync;
use std::path::Path;
use std::result;
use directory::error::LockError;
use std::time::Duration;
use std::thread;
use std::path::PathBuf;
use directory::directory_lock::Lock;
/// Retry the logic of acquiring locks is pretty simple.
/// We just retry `n` times after a given `duratio`, both
/// depending on the type of lock.
struct RetryPolicy {
num_retries: usize,
wait_in_ms: u64,
}
impl RetryPolicy {
fn no_retry() -> RetryPolicy {
RetryPolicy {
num_retries: 0,
wait_in_ms: 0,
}
}
fn wait_and_retry(&mut self) -> bool {
if self.num_retries == 0 {
false
} else {
self.num_retries -= 1;
let wait_duration = Duration::from_millis(self.wait_in_ms);
thread::sleep(wait_duration);
true
}
}
}
/// The `DirectoryLock` is an object that represents a file lock.
/// See [`LockType`](struct.LockType.html)
///
/// It is transparently associated to a lock file, that gets deleted
/// on `Drop.` The lock is released automatically on `Drop`.
pub struct DirectoryLock(Box<Drop + Send + 'static>);
struct DirectoryLockGuard {
directory: Box<Directory>,
path: PathBuf
}
impl<T: Drop + Send + 'static> From<Box<T>> for DirectoryLock {
fn from(underlying: Box<T>) -> Self {
DirectoryLock(underlying)
}
}
impl Drop for DirectoryLockGuard {
fn drop(&mut self) {
if let Err(e) = self.directory.delete(&*self.path) {
error!("Failed to remove the lock file. {:?}", e);
}
}
}
enum TryAcquireLockError {
FileExists,
IOError(io::Error),
}
fn try_acquire_lock(filepath: &Path, directory: &mut Directory) -> Result<DirectoryLock, TryAcquireLockError> {
let mut write = directory.open_write(filepath).map_err(|e| match e {
OpenWriteError::FileAlreadyExists(_) => TryAcquireLockError::FileExists,
OpenWriteError::IOError(io_error) => TryAcquireLockError::IOError(io_error.into()),
})?;
write
.flush()
.map_err(TryAcquireLockError::IOError)?;
Ok(DirectoryLock::from(Box::new(DirectoryLockGuard {
directory: directory.box_clone(),
path: filepath.to_owned(),
})))
}
fn retry_policy(is_blocking: bool) -> RetryPolicy {
if is_blocking {
RetryPolicy {
num_retries: 100,
wait_in_ms: 100,
}
} else {
RetryPolicy::no_retry()
}
}
/// Write-once read many (WORM) abstraction for where
/// tantivy's data should be stored.
@@ -73,6 +165,30 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
///
/// The file may or may not previously exist.
fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()>;
/// Acquire a lock in the given directory.
///
/// The method is blocking or not depending on the `Lock` object.
fn acquire_lock(&self, lock: &Lock) -> Result<DirectoryLock, LockError> {
let mut box_directory = self.box_clone();
let mut retry_policy = retry_policy(lock.is_blocking);
loop {
match try_acquire_lock(&lock.filepath, &mut *box_directory) {
Ok(result) => {
return Ok(result);
}
Err(TryAcquireLockError::FileExists) => {
if !retry_policy.wait_and_retry() {
return Err(LockError::LockBusy);
}
}
Err(TryAcquireLockError::IOError(io_error)) => {
return Err(LockError::IOError(io_error));
}
}
}
}
}
/// DirectoryClone

View File

@@ -0,0 +1,58 @@
use std::path::PathBuf;
/// A directory lock.
///
/// A lock is associated to a specific path and some
/// [`LockParams`](./enum.LockParams.html).
/// Tantivy itself uses only two locks but client application
/// can use the directory facility to define their own locks.
/// - [INDEX_WRITER_LOCK](./struct.INDEX_WRITER_LOCK.html)
/// - [META_LOCK](./struct.META_LOCK.html)
///
/// Check out these locks documentation for more information.
///
#[derive(Debug)]
pub struct Lock {
/// The lock needs to be associated with its own file `path`.
/// Depending on the platform, the lock might rely on the creation
/// and deletion of this filepath.
pub filepath: PathBuf,
/// `lock_params` describes whether acquiring the lock is meant
/// to be a blocking operation or a non-blocking.
///
/// Acquiring a blocking lock blocks until the lock is
/// available.
/// Acquiring a blocking lock returns rapidly, either successfully
/// or with an error signifying that someone is already holding
/// the lock.
pub is_blocking: bool,
}
lazy_static! {
/// Only one process should be able to write tantivy's index at a time.
/// This lock file, when present, is in charge of preventing other processes to open an IndexWriter.
///
/// If the process is killed and this file remains, it is safe to remove it manually.
///
/// Failing to acquire this lock usually means a misuse of tantivy's API,
/// (creating more than one instance of the `IndexWriter`), are a spurious
/// lock file remaining after a crash. In the latter case, removing the file after
/// checking no process running tantivy is running is safe.
pub static ref INDEX_WRITER_LOCK: Lock = Lock {
filepath: PathBuf::from(".tantivy-writer.lock"),
is_blocking: false
};
/// The meta lock file is here to protect the segment files being opened by
/// `.load_searchers()` from being garbage collected.
/// It makes it possible for another process to safely consume
/// our index in-writing. Ideally, we may have prefered `RWLock` semantics
/// here, but it is difficult to achieve on Windows.
///
/// Opening segment readers is a very fast process.
pub static ref META_LOCK: Lock = Lock {
filepath: PathBuf::from(".tantivy-meta.lock"),
is_blocking: true
};
}

View File

@@ -3,6 +3,20 @@ use std::fmt;
use std::io;
use std::path::PathBuf;
/// Error while trying to acquire a directory lock.
#[derive(Debug, Fail)]
pub enum LockError {
/// Failed to acquired a lock as it is already hold by another
/// client.
/// - In the context of a blocking lock, this means the lock was not released within some `timeout` period.
/// - In the context of a non-blocking lock, this means the lock was busy at the moment of the call.
#[fail(display = "Could not acquire lock as it is already held, possibly by a different process.")]
LockBusy,
/// Trying to acquire a lock failed with an `IOError`
#[fail(display = "Failed to acquire the lock due to an io:Error.")]
IOError(io::Error)
}
/// General IO error with an optional path to the offending file.
#[derive(Debug)]
pub struct IOError {
@@ -10,6 +24,12 @@ pub struct IOError {
err: io::Error,
}
impl Into<io::Error> for IOError {
fn into(self) -> io::Error {
self.err
}
}
impl fmt::Display for IOError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self.path {

View File

@@ -1,8 +1,7 @@
use core::MANAGED_FILEPATH;
use directory::error::{DeleteError, IOError, OpenReadError, OpenWriteError};
use directory::error::{DeleteError, IOError, LockError, OpenReadError, OpenWriteError};
use directory::{ReadOnlySource, WritePtr};
use error::DataCorruption;
use indexer::LockType;
use serde_json;
use std::collections::HashSet;
use std::io;
@@ -13,6 +12,9 @@ use std::sync::RwLockWriteGuard;
use std::sync::{Arc, RwLock};
use Directory;
use Result;
use directory::META_LOCK;
use directory::Lock;
use directory::DirectoryLock;
/// Returns true iff the file is "managed".
/// Non-managed file are not subject to garbage collection.
@@ -125,7 +127,7 @@ impl ManagedDirectory {
// 2) writer change meta.json (for instance after a merge or a commit)
// 3) gc kicks in.
// 4) gc removes a file that was useful for process B, before process B opened it.
if let Ok(_meta_lock) = LockType::MetaLock.acquire_lock(self) {
if let Ok(_meta_lock) = self.acquire_lock(&META_LOCK) {
let living_files = get_living_files();
for managed_path in &meta_informations_rlock.managed_paths {
if !living_files.contains(managed_path) {
@@ -235,6 +237,10 @@ impl Directory for ManagedDirectory {
fn exists(&self, path: &Path) -> bool {
self.directory.exists(path)
}
fn acquire_lock(&self, lock: &Lock) -> result::Result<DirectoryLock, LockError> {
self.directory.acquire_lock(lock)
}
}
impl Clone for ManagedDirectory {

View File

@@ -1,3 +1,6 @@
extern crate fs2;
use self::fs2::FileExt;
use atomicwrites;
use common::make_io_err;
use directory::error::{DeleteError, IOError, OpenDirectoryError, OpenReadError, OpenWriteError};
@@ -19,6 +22,9 @@ use std::result;
use std::sync::Arc;
use std::sync::RwLock;
use tempdir::TempDir;
use directory::Lock;
use directory::DirectoryLock;
use directory::error::LockError;
/// Returns None iff the file exists, can be read, but is empty (and hence
/// cannot be mmapped).
@@ -115,6 +121,14 @@ impl MmapCache {
///
/// The Mmap object are cached to limit the
/// system calls.
///
/// In the `MmapDirectory`, locks are implemented using the `fs2` crate definition of locks.
///
/// On MacOS & linux, it relies on `flock` (aka `BSD Lock`). These locks solve most of the
/// problems related to POSIX Locks, but may their contract may not be respected on `NFS`
/// depending on the implementation.
///
/// On Windows the semantics are again different.
#[derive(Clone)]
pub struct MmapDirectory {
root_path: PathBuf,
@@ -213,6 +227,21 @@ impl MmapDirectory {
}
}
/// We rely on fs2 for file locking. On Windows & MacOS this
/// uses BSD locks (`flock`). The lock is actually released when
/// the `File` object is dropped and its associated file descriptor
/// is closed.
struct ReleaseLockFile {
_file: File,
path: PathBuf
}
impl Drop for ReleaseLockFile {
fn drop(&mut self) {
debug!("Releasing lock {:?}", self.path);
}
}
/// This Write wraps a File, but has the specificity of
/// call `sync_all` on flush.
struct SafeFileWriter(File);
@@ -354,6 +383,28 @@ impl Directory for MmapDirectory {
meta_file.write(|f| f.write_all(data))?;
Ok(())
}
fn acquire_lock(&self, lock: &Lock) -> Result<DirectoryLock, LockError> {
let full_path = self.resolve_path(&lock.filepath);
// We make sure that the file exists.
let file: File = OpenOptions::new()
.write(true)
.create(true) //< if the file does not exist yet, create it.
.open(&full_path)
.map_err(|err| LockError::IOError(err))?;
if lock.is_blocking {
file.lock_exclusive()
.map_err(LockError::IOError)?;
} else {
file.try_lock_exclusive()
.map_err(|_| LockError::LockBusy)?
}
// dropping the file handle will release the lock.
Ok(DirectoryLock::from(Box::new(ReleaseLockFile {
path: lock.filepath.clone(),
_file: file
})))
}
}
#[cfg(test)]

View File

@@ -12,15 +12,17 @@ mod managed_directory;
mod ram_directory;
mod read_only_source;
mod shared_vec_slice;
mod directory_lock;
/// Errors specific to the directory module.
pub mod error;
use std::io::{BufWriter, Seek, Write};
pub use self::directory::DirectoryLock;
pub use self::directory::{Directory, DirectoryClone};
pub use self::ram_directory::RAMDirectory;
pub use self::read_only_source::ReadOnlySource;
pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK};
#[cfg(feature = "mmap")]
pub use self::mmap_directory::MmapDirectory;
@@ -38,128 +40,4 @@ impl<T: Seek + Write> SeekableWrite for T {}
pub type WritePtr = BufWriter<Box<SeekableWrite>>;
#[cfg(test)]
mod tests {
use super::*;
use std::io::{Seek, SeekFrom, Write};
use std::path::Path;
lazy_static! {
static ref TEST_PATH: &'static Path = Path::new("some_path_for_test");
}
#[test]
fn test_ram_directory() {
let mut ram_directory = RAMDirectory::create();
test_directory(&mut ram_directory);
}
#[test]
#[cfg(feature = "mmap")]
fn test_mmap_directory() {
let mut mmap_directory = MmapDirectory::create_from_tempdir().unwrap();
test_directory(&mut mmap_directory);
}
#[test]
#[should_panic]
fn ram_directory_panics_if_flush_forgotten() {
let mut ram_directory = RAMDirectory::create();
let mut write_file = ram_directory.open_write(*TEST_PATH).unwrap();
assert!(write_file.write_all(&[4]).is_ok());
}
fn test_simple(directory: &mut Directory) {
{
{
let mut write_file = directory.open_write(*TEST_PATH).unwrap();
assert!(directory.exists(*TEST_PATH));
write_file.write_all(&[4]).unwrap();
write_file.write_all(&[3]).unwrap();
write_file.write_all(&[7, 3, 5]).unwrap();
write_file.flush().unwrap();
}
let read_file = directory.open_read(*TEST_PATH).unwrap();
let data: &[u8] = &*read_file;
assert_eq!(data, &[4u8, 3u8, 7u8, 3u8, 5u8]);
}
assert!(directory.delete(*TEST_PATH).is_ok());
assert!(!directory.exists(*TEST_PATH));
}
fn test_seek(directory: &mut Directory) {
{
{
let mut write_file = directory.open_write(*TEST_PATH).unwrap();
write_file.write_all(&[4, 3, 7, 3, 5]).unwrap();
write_file.seek(SeekFrom::Start(0)).unwrap();
write_file.write_all(&[3, 1]).unwrap();
write_file.flush().unwrap();
}
let read_file = directory.open_read(*TEST_PATH).unwrap();
let data: &[u8] = &*read_file;
assert_eq!(data, &[3u8, 1u8, 7u8, 3u8, 5u8]);
}
assert!(directory.delete(*TEST_PATH).is_ok());
}
fn test_rewrite_forbidden(directory: &mut Directory) {
{
directory.open_write(*TEST_PATH).unwrap();
assert!(directory.exists(*TEST_PATH));
}
{
assert!(directory.open_write(*TEST_PATH).is_err());
}
assert!(directory.delete(*TEST_PATH).is_ok());
}
fn test_write_create_the_file(directory: &mut Directory) {
{
assert!(directory.open_read(*TEST_PATH).is_err());
let _w = directory.open_write(*TEST_PATH).unwrap();
assert!(directory.exists(*TEST_PATH));
assert!(directory.open_read(*TEST_PATH).is_ok());
assert!(directory.delete(*TEST_PATH).is_ok());
}
}
fn test_directory_delete(directory: &mut Directory) {
assert!(directory.open_read(*TEST_PATH).is_err());
let mut write_file = directory.open_write(*TEST_PATH).unwrap();
write_file.write_all(&[1, 2, 3, 4]).unwrap();
write_file.flush().unwrap();
{
let read_handle = directory.open_read(*TEST_PATH).unwrap();
{
assert_eq!(&*read_handle, &[1u8, 2u8, 3u8, 4u8]);
// Mapped files can't be deleted on Windows
if !cfg!(windows) {
assert!(directory.delete(*TEST_PATH).is_ok());
assert_eq!(&*read_handle, &[1u8, 2u8, 3u8, 4u8]);
}
assert!(directory.delete(Path::new("SomeOtherPath")).is_err());
}
}
if cfg!(windows) {
assert!(directory.delete(*TEST_PATH).is_ok());
}
assert!(directory.open_read(*TEST_PATH).is_err());
assert!(directory.delete(*TEST_PATH).is_err());
}
fn test_directory(directory: &mut Directory) {
test_simple(directory);
test_seek(directory);
test_rewrite_forbidden(directory);
test_write_create_the_file(directory);
test_directory_delete(directory);
}
}
mod tests;

161
src/directory/tests.rs Normal file
View File

@@ -0,0 +1,161 @@
use super::*;
use std::io::{Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::time;
lazy_static! {
static ref TEST_PATH: &'static Path = Path::new("some_path_for_test");
}
#[test]
fn test_ram_directory() {
let mut ram_directory = RAMDirectory::create();
test_directory(&mut ram_directory);
}
#[test]
#[cfg(feature = "mmap")]
fn test_mmap_directory() {
let mut mmap_directory = MmapDirectory::create_from_tempdir().unwrap();
test_directory(&mut mmap_directory);
}
#[test]
#[should_panic]
fn ram_directory_panics_if_flush_forgotten() {
let mut ram_directory = RAMDirectory::create();
let mut write_file = ram_directory.open_write(*TEST_PATH).unwrap();
assert!(write_file.write_all(&[4]).is_ok());
}
fn test_simple(directory: &mut Directory) {
{
{
let mut write_file = directory.open_write(*TEST_PATH).unwrap();
assert!(directory.exists(*TEST_PATH));
write_file.write_all(&[4]).unwrap();
write_file.write_all(&[3]).unwrap();
write_file.write_all(&[7, 3, 5]).unwrap();
write_file.flush().unwrap();
}
let read_file = directory.open_read(*TEST_PATH).unwrap();
let data: &[u8] = &*read_file;
assert_eq!(data, &[4u8, 3u8, 7u8, 3u8, 5u8]);
}
assert!(directory.delete(*TEST_PATH).is_ok());
assert!(!directory.exists(*TEST_PATH));
}
fn test_seek(directory: &mut Directory) {
{
{
let mut write_file = directory.open_write(*TEST_PATH).unwrap();
write_file.write_all(&[4, 3, 7, 3, 5]).unwrap();
write_file.seek(SeekFrom::Start(0)).unwrap();
write_file.write_all(&[3, 1]).unwrap();
write_file.flush().unwrap();
}
let read_file = directory.open_read(*TEST_PATH).unwrap();
let data: &[u8] = &*read_file;
assert_eq!(data, &[3u8, 1u8, 7u8, 3u8, 5u8]);
}
assert!(directory.delete(*TEST_PATH).is_ok());
}
fn test_rewrite_forbidden(directory: &mut Directory) {
{
directory.open_write(*TEST_PATH).unwrap();
assert!(directory.exists(*TEST_PATH));
}
{
assert!(directory.open_write(*TEST_PATH).is_err());
}
assert!(directory.delete(*TEST_PATH).is_ok());
}
fn test_write_create_the_file(directory: &mut Directory) {
{
assert!(directory.open_read(*TEST_PATH).is_err());
let _w = directory.open_write(*TEST_PATH).unwrap();
assert!(directory.exists(*TEST_PATH));
assert!(directory.open_read(*TEST_PATH).is_ok());
assert!(directory.delete(*TEST_PATH).is_ok());
}
}
fn test_directory_delete(directory: &mut Directory) {
assert!(directory.open_read(*TEST_PATH).is_err());
let mut write_file = directory.open_write(*TEST_PATH).unwrap();
write_file.write_all(&[1, 2, 3, 4]).unwrap();
write_file.flush().unwrap();
{
let read_handle = directory.open_read(*TEST_PATH).unwrap();
{
assert_eq!(&*read_handle, &[1u8, 2u8, 3u8, 4u8]);
// Mapped files can't be deleted on Windows
if !cfg!(windows) {
assert!(directory.delete(*TEST_PATH).is_ok());
assert_eq!(&*read_handle, &[1u8, 2u8, 3u8, 4u8]);
}
assert!(directory.delete(Path::new("SomeOtherPath")).is_err());
}
}
if cfg!(windows) {
assert!(directory.delete(*TEST_PATH).is_ok());
}
assert!(directory.open_read(*TEST_PATH).is_err());
assert!(directory.delete(*TEST_PATH).is_err());
}
fn test_directory(directory: &mut Directory) {
test_simple(directory);
test_seek(directory);
test_rewrite_forbidden(directory);
test_write_create_the_file(directory);
test_directory_delete(directory);
test_lock_non_blocking(directory);
test_lock_blocking(directory);
}
fn test_lock_non_blocking(directory: &mut Directory) {
{
let lock_a_res = directory.acquire_lock(&Lock { filepath: PathBuf::from("a.lock"), is_blocking: false });
assert!(lock_a_res.is_ok());
let lock_b_res = directory.acquire_lock(&Lock { filepath: PathBuf::from("b.lock"), is_blocking: false });
assert!(lock_b_res.is_ok());
let lock_a_res2 = directory.acquire_lock(&Lock { filepath: PathBuf::from("a.lock"), is_blocking: false });
assert!(lock_a_res2.is_err());
}
let lock_a_res = directory.acquire_lock(&Lock { filepath: PathBuf::from("a.lock"), is_blocking: false });
assert!(lock_a_res.is_ok());
}
fn test_lock_blocking(directory: &mut Directory) {
let lock_a_res = directory.acquire_lock(&Lock { filepath: PathBuf::from("a.lock"), is_blocking: true });
assert!(lock_a_res.is_ok());
std::thread::spawn(move || { //< lock_a_res is sent to the thread.
std::thread::sleep(time::Duration::from_millis(10));
// explicitely droping lock_a_res. It would have been sufficient to just force it
// to be part of the move, but the intent seems clearer that way.
drop(lock_a_res);
});
{
// A non-blocking call should fail, as the thread is running and holding the lock.
let lock_a_res = directory.acquire_lock(&Lock { filepath: PathBuf::from("a.lock"), is_blocking: false });
assert!(lock_a_res.is_err());
}
{ // the blocking call should wait for at least 10ms.
let start = time::Instant::now();
let lock_a_res = directory.acquire_lock(&Lock { filepath: PathBuf::from("a.lock"), is_blocking: true });
assert!(lock_a_res.is_ok());
assert!(start.elapsed().subsec_millis() >= 10);
}
}

View File

@@ -4,13 +4,13 @@ use std::io;
use directory::error::{IOError, OpenDirectoryError, OpenReadError, OpenWriteError};
use fastfield::FastFieldNotAvailableError;
use indexer::LockType;
use query;
use schema;
use serde_json;
use std::fmt;
use std::path::PathBuf;
use std::sync::PoisonError;
use directory::error::LockError;
pub struct DataCorruption {
filepath: Option<PathBuf>,
@@ -57,11 +57,8 @@ pub enum TantivyError {
#[fail(display = "Index already exists")]
IndexAlreadyExists,
/// Failed to acquire file lock
#[fail(
display = "Failed to acquire Lockfile: {:?}. Possible causes: another IndexWriter instance or panic during previous lock drop.",
_0
)]
LockFailure(LockType),
#[fail(display = "Failed to acquire Lockfile: {:?}. {:?}", _0, _1)]
LockFailure(LockError, Option<String>),
/// IO Error.
#[fail(display = "An IO error occurred: '{}'", _0)]
IOError(#[cause] IOError),
@@ -100,6 +97,12 @@ impl From<FastFieldNotAvailableError> for TantivyError {
}
}
impl From<LockError> for TantivyError {
fn from(lock_error: LockError) -> TantivyError {
TantivyError::LockFailure(lock_error, None)
}
}
impl From<IOError> for TantivyError {
fn from(io_error: IOError) -> TantivyError {
TantivyError::IOError(io_error)

View File

@@ -1,131 +0,0 @@
use directory::error::OpenWriteError;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::thread;
use std::time::Duration;
use Directory;
use TantivyError;
#[derive(Debug, Clone, Copy)]
pub enum LockType {
/// Only one process should be able to write tantivy's index at a time.
/// This lock file, when present, is in charge of preventing other processes to open an IndexWriter.
///
/// If the process is killed and this file remains, it is safe to remove it manually.
///
/// Failing to acquire this lock usually means a misuse of tantivy's API,
/// (creating more than one instance of the `IndexWriter`), are a spurious
/// lock file remaining after a crash. In the latter case, removing the file after
/// checking no process running tantivy is running is safe.
IndexWriterLock,
/// The meta lock file is here to protect the segment files being opened by
/// `.load_searchers()` from being garbage collected.
/// It makes it possible for another process to safely consume
/// our index in-writing. Ideally, we may have prefered `RWLock` semantics
/// here, but it is difficult to achieve on Windows.
///
/// Opening segment readers is a very fast process.
/// Right now if the lock cannot be acquire on the first attempt, the logic
/// is very simplistic. We retry after `100ms` until we effectively
/// acquire the lock.
/// This lock should not have much contention in normal usage.
MetaLock,
}
/// Retry the logic of acquiring locks is pretty simple.
/// We just retry `n` times after a given `duratio`, both
/// depending on the type of lock.
struct RetryPolicy {
num_retries: usize,
wait_in_ms: u64,
}
impl RetryPolicy {
fn no_retry() -> RetryPolicy {
RetryPolicy {
num_retries: 0,
wait_in_ms: 0,
}
}
fn wait_and_retry(&mut self) -> bool {
if self.num_retries == 0 {
false
} else {
self.num_retries -= 1;
let wait_duration = Duration::from_millis(self.wait_in_ms);
thread::sleep(wait_duration);
true
}
}
}
impl LockType {
fn retry_policy(self) -> RetryPolicy {
match self {
LockType::IndexWriterLock => RetryPolicy::no_retry(),
LockType::MetaLock => RetryPolicy {
num_retries: 100,
wait_in_ms: 100,
},
}
}
fn try_acquire_lock(self, directory: &mut Directory) -> Result<DirectoryLock, TantivyError> {
let path = self.filename();
let mut write = directory.open_write(path).map_err(|e| match e {
OpenWriteError::FileAlreadyExists(_) => TantivyError::LockFailure(self),
OpenWriteError::IOError(io_error) => TantivyError::IOError(io_error),
})?;
write.flush()?;
Ok(DirectoryLock {
directory: directory.box_clone(),
path: path.to_owned(),
})
}
/// Acquire a lock in the given directory.
pub fn acquire_lock(self, directory: &Directory) -> Result<DirectoryLock, TantivyError> {
let mut box_directory = directory.box_clone();
let mut retry_policy = self.retry_policy();
loop {
let lock_result = self.try_acquire_lock(&mut *box_directory);
match lock_result {
Ok(result) => {
return Ok(result);
}
Err(TantivyError::LockFailure(ref filepath)) => {
if !retry_policy.wait_and_retry() {
return Err(TantivyError::LockFailure(filepath.to_owned()));
}
}
Err(_) => {}
}
}
}
fn filename(&self) -> &Path {
match *self {
LockType::MetaLock => Path::new(".tantivy-meta.lock"),
LockType::IndexWriterLock => Path::new(".tantivy-indexer.lock"),
}
}
}
/// The `DirectoryLock` is an object that represents a file lock.
/// See [`LockType`](struct.LockType.html)
///
/// It is transparently associated to a lock file, that gets deleted
/// on `Drop.` The lock is release automatically on `Drop`.
pub struct DirectoryLock {
directory: Box<Directory>,
path: PathBuf,
}
impl Drop for DirectoryLock {
fn drop(&mut self) {
if let Err(e) = self.directory.delete(&*self.path) {
error!("Failed to remove the lock file. {:?}", e);
}
}
}

View File

@@ -16,7 +16,6 @@ use indexer::delete_queue::{DeleteCursor, DeleteQueue};
use indexer::doc_opstamp_mapping::DocToOpstampMapping;
use indexer::operation::DeleteOperation;
use indexer::stamper::Stamper;
use indexer::DirectoryLock;
use indexer::MergePolicy;
use indexer::SegmentEntry;
use indexer::SegmentWriter;
@@ -29,6 +28,7 @@ use std::sync::Arc;
use std::thread;
use std::thread::JoinHandle;
use Result;
use directory::DirectoryLock;
use futures::{Future, Canceled};
// Size of the margin for the heap. A segment is closed when the remaining memory
@@ -657,6 +657,7 @@ mod tests {
use schema::{self, Document};
use Index;
use Term;
use directory::error::LockError;
#[test]
fn test_lockfile_stops_duplicates() {
@@ -664,8 +665,8 @@ mod tests {
let index = Index::create_in_ram(schema_builder.build());
let _index_writer = index.writer(3_000_000).unwrap();
match index.writer(3_000_000) {
Err(TantivyError::LockFailure(_)) => {}
_ => panic!("Expected FileAlreadyExists error"),
Err(TantivyError::LockFailure(LockError::LockBusy, _)) => {}
_ => panic!("Expected a `LockFailure` error"),
}
}
@@ -677,8 +678,7 @@ mod tests {
match index.writer_with_num_threads(1, 3_000_000) {
Err(err) => {
let err_msg = err.to_string();
assert!(err_msg.contains("Lockfile"));
assert!(err_msg.contains("Possible causes:"))
assert!(err_msg.contains("already an `IndexWriter`"));
}
_ => panic!("Expected LockfileAlreadyExists error"),
}

View File

@@ -1,5 +1,5 @@
pub mod delete_queue;
mod directory_lock;
mod doc_opstamp_mapping;
pub mod index_writer;
mod log_merge_policy;
@@ -16,8 +16,6 @@ pub mod segment_updater;
mod segment_writer;
mod stamper;
pub(crate) use self::directory_lock::DirectoryLock;
pub use self::directory_lock::LockType;
pub use self::index_writer::IndexWriter;
pub use self::log_merge_policy::LogMergePolicy;
pub use self::merge_operation::{MergeOperation, MergeOperationInventory};

View File

@@ -130,9 +130,7 @@ extern crate bit_set;
extern crate bitpacking;
extern crate byteorder;
extern crate scoped_pool;
extern crate combine;
extern crate crossbeam;
extern crate fnv;
extern crate fst;