mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-06-04 09:30:42 +00:00
Clippy + cargo fmt
This commit is contained in:
@@ -12,6 +12,8 @@ use core::META_FILEPATH;
|
||||
use directory::ManagedDirectory;
|
||||
#[cfg(feature = "mmap")]
|
||||
use directory::MmapDirectory;
|
||||
use directory::INDEX_WRITER_LOCK;
|
||||
use directory::META_LOCK;
|
||||
use directory::{Directory, RAMDirectory};
|
||||
use error::DataCorruption;
|
||||
use error::TantivyError;
|
||||
@@ -32,8 +34,6 @@ use tokenizer::BoxedTokenizer;
|
||||
use tokenizer::TokenizerManager;
|
||||
use IndexWriter;
|
||||
use Result;
|
||||
use directory::INDEX_WRITER_LOCK;
|
||||
use directory::META_LOCK;
|
||||
|
||||
fn load_metas(directory: &Directory) -> Result<IndexMeta> {
|
||||
let meta_data = directory.atomic_read(&META_FILEPATH)?;
|
||||
@@ -242,14 +242,21 @@ impl Index {
|
||||
num_threads: usize,
|
||||
overall_heap_size_in_bytes: usize,
|
||||
) -> Result<IndexWriter> {
|
||||
let directory_lock = self.directory.acquire_lock(&INDEX_WRITER_LOCK)
|
||||
let directory_lock = self
|
||||
.directory
|
||||
.acquire_lock(&INDEX_WRITER_LOCK)
|
||||
.map_err(|err| {
|
||||
TantivyError::LockFailure(err,
|
||||
Some("Failed to acquire index lock. If you are using\
|
||||
a regular directory, this means there is already an \
|
||||
`IndexWriter` working on this `Directory`, in this process \
|
||||
or in a different process.".to_string()))}
|
||||
)?;
|
||||
TantivyError::LockFailure(
|
||||
err,
|
||||
Some(
|
||||
"Failed to acquire index lock. If you are using\
|
||||
a regular directory, this means there is already an \
|
||||
`IndexWriter` working on this `Directory`, in this process \
|
||||
or in a different process."
|
||||
.to_string(),
|
||||
),
|
||||
)
|
||||
})?;
|
||||
let heap_size_in_bytes_per_thread = overall_heap_size_in_bytes / num_threads;
|
||||
open_index_writer(
|
||||
self,
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
use directory::directory_lock::Lock;
|
||||
use directory::error::LockError;
|
||||
use directory::error::{DeleteError, OpenReadError, OpenWriteError};
|
||||
use directory::{ReadOnlySource, WritePtr};
|
||||
use std::fmt;
|
||||
@@ -6,13 +8,10 @@ use std::io::Write;
|
||||
use std::marker::Send;
|
||||
use std::marker::Sync;
|
||||
use std::path::Path;
|
||||
use std::result;
|
||||
use directory::error::LockError;
|
||||
use std::time::Duration;
|
||||
use std::thread;
|
||||
use std::path::PathBuf;
|
||||
use directory::directory_lock::Lock;
|
||||
|
||||
use std::result;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
/// Retry the logic of acquiring locks is pretty simple.
|
||||
/// We just retry `n` times after a given `duratio`, both
|
||||
@@ -51,7 +50,7 @@ pub struct DirectoryLock(Box<Drop + Send + 'static>);
|
||||
|
||||
struct DirectoryLockGuard {
|
||||
directory: Box<Directory>,
|
||||
path: PathBuf
|
||||
path: PathBuf,
|
||||
}
|
||||
|
||||
impl<T: Drop + Send + 'static> From<Box<T>> for DirectoryLock {
|
||||
@@ -73,21 +72,21 @@ enum TryAcquireLockError {
|
||||
IOError(io::Error),
|
||||
}
|
||||
|
||||
fn try_acquire_lock(filepath: &Path, directory: &mut Directory) -> Result<DirectoryLock, TryAcquireLockError> {
|
||||
fn try_acquire_lock(
|
||||
filepath: &Path,
|
||||
directory: &mut Directory,
|
||||
) -> Result<DirectoryLock, TryAcquireLockError> {
|
||||
let mut write = directory.open_write(filepath).map_err(|e| match e {
|
||||
OpenWriteError::FileAlreadyExists(_) => TryAcquireLockError::FileExists,
|
||||
OpenWriteError::IOError(io_error) => TryAcquireLockError::IOError(io_error.into()),
|
||||
})?;
|
||||
write
|
||||
.flush()
|
||||
.map_err(TryAcquireLockError::IOError)?;
|
||||
write.flush().map_err(TryAcquireLockError::IOError)?;
|
||||
Ok(DirectoryLock::from(Box::new(DirectoryLockGuard {
|
||||
directory: directory.box_clone(),
|
||||
path: filepath.to_owned(),
|
||||
})))
|
||||
}
|
||||
|
||||
|
||||
fn retry_policy(is_blocking: bool) -> RetryPolicy {
|
||||
if is_blocking {
|
||||
RetryPolicy {
|
||||
@@ -166,7 +165,6 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
|
||||
/// The file may or may not previously exist.
|
||||
fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()>;
|
||||
|
||||
|
||||
/// Acquire a lock in the given directory.
|
||||
///
|
||||
/// The method is blocking or not depending on the `Lock` object.
|
||||
|
||||
@@ -13,9 +13,9 @@ use std::path::PathBuf;
|
||||
///
|
||||
#[derive(Debug)]
|
||||
pub struct Lock {
|
||||
/// The lock needs to be associated with its own file `path`.
|
||||
/// Depending on the platform, the lock might rely on the creation
|
||||
/// and deletion of this filepath.
|
||||
/// The lock needs to be associated with its own file `path`.
|
||||
/// Depending on the platform, the lock might rely on the creation
|
||||
/// and deletion of this filepath.
|
||||
pub filepath: PathBuf,
|
||||
/// `lock_params` describes whether acquiring the lock is meant
|
||||
/// to be a blocking operation or a non-blocking.
|
||||
@@ -28,7 +28,6 @@ pub struct Lock {
|
||||
pub is_blocking: bool,
|
||||
}
|
||||
|
||||
|
||||
lazy_static! {
|
||||
/// Only one process should be able to write tantivy's index at a time.
|
||||
/// This lock file, when present, is in charge of preventing other processes to open an IndexWriter.
|
||||
@@ -55,4 +54,3 @@ lazy_static! {
|
||||
is_blocking: true
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -10,11 +10,13 @@ pub enum LockError {
|
||||
/// client.
|
||||
/// - In the context of a blocking lock, this means the lock was not released within some `timeout` period.
|
||||
/// - In the context of a non-blocking lock, this means the lock was busy at the moment of the call.
|
||||
#[fail(display = "Could not acquire lock as it is already held, possibly by a different process.")]
|
||||
#[fail(
|
||||
display = "Could not acquire lock as it is already held, possibly by a different process."
|
||||
)]
|
||||
LockBusy,
|
||||
/// Trying to acquire a lock failed with an `IOError`
|
||||
#[fail(display = "Failed to acquire the lock due to an io:Error.")]
|
||||
IOError(io::Error)
|
||||
IOError(io::Error),
|
||||
}
|
||||
|
||||
/// General IO error with an optional path to the offending file.
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
use core::MANAGED_FILEPATH;
|
||||
use directory::error::{DeleteError, IOError, LockError, OpenReadError, OpenWriteError};
|
||||
use directory::DirectoryLock;
|
||||
use directory::Lock;
|
||||
use directory::META_LOCK;
|
||||
use directory::{ReadOnlySource, WritePtr};
|
||||
use error::DataCorruption;
|
||||
use serde_json;
|
||||
@@ -12,9 +15,6 @@ use std::sync::RwLockWriteGuard;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use Directory;
|
||||
use Result;
|
||||
use directory::META_LOCK;
|
||||
use directory::Lock;
|
||||
use directory::DirectoryLock;
|
||||
|
||||
/// Returns true iff the file is "managed".
|
||||
/// Non-managed file are not subject to garbage collection.
|
||||
|
||||
@@ -3,9 +3,12 @@ extern crate fs2;
|
||||
use self::fs2::FileExt;
|
||||
use atomicwrites;
|
||||
use common::make_io_err;
|
||||
use directory::error::LockError;
|
||||
use directory::error::{DeleteError, IOError, OpenDirectoryError, OpenReadError, OpenWriteError};
|
||||
use directory::shared_vec_slice::SharedVecSlice;
|
||||
use directory::Directory;
|
||||
use directory::DirectoryLock;
|
||||
use directory::Lock;
|
||||
use directory::ReadOnlySource;
|
||||
use directory::WritePtr;
|
||||
use fst::raw::MmapReadOnly;
|
||||
@@ -22,9 +25,6 @@ use std::result;
|
||||
use std::sync::Arc;
|
||||
use std::sync::RwLock;
|
||||
use tempdir::TempDir;
|
||||
use directory::Lock;
|
||||
use directory::DirectoryLock;
|
||||
use directory::error::LockError;
|
||||
|
||||
/// Returns None iff the file exists, can be read, but is empty (and hence
|
||||
/// cannot be mmapped).
|
||||
@@ -233,7 +233,7 @@ impl MmapDirectory {
|
||||
/// is closed.
|
||||
struct ReleaseLockFile {
|
||||
_file: File,
|
||||
path: PathBuf
|
||||
path: PathBuf,
|
||||
}
|
||||
|
||||
impl Drop for ReleaseLockFile {
|
||||
@@ -386,23 +386,21 @@ impl Directory for MmapDirectory {
|
||||
|
||||
fn acquire_lock(&self, lock: &Lock) -> Result<DirectoryLock, LockError> {
|
||||
let full_path = self.resolve_path(&lock.filepath);
|
||||
// We make sure that the file exists.
|
||||
// We make sure that the file exists.
|
||||
let file: File = OpenOptions::new()
|
||||
.write(true)
|
||||
.create(true) //< if the file does not exist yet, create it.
|
||||
.open(&full_path)
|
||||
.map_err(|err| LockError::IOError(err))?;
|
||||
.map_err(LockError::IOError)?;
|
||||
if lock.is_blocking {
|
||||
file.lock_exclusive()
|
||||
.map_err(LockError::IOError)?;
|
||||
file.lock_exclusive().map_err(LockError::IOError)?;
|
||||
} else {
|
||||
file.try_lock_exclusive()
|
||||
.map_err(|_| LockError::LockBusy)?
|
||||
file.try_lock_exclusive().map_err(|_| LockError::LockBusy)?
|
||||
}
|
||||
// dropping the file handle will release the lock.
|
||||
Ok(DirectoryLock::from(Box::new(ReleaseLockFile {
|
||||
path: lock.filepath.clone(),
|
||||
_file: file
|
||||
_file: file,
|
||||
})))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,21 +8,21 @@ WORM directory abstraction.
|
||||
mod mmap_directory;
|
||||
|
||||
mod directory;
|
||||
mod directory_lock;
|
||||
mod managed_directory;
|
||||
mod ram_directory;
|
||||
mod read_only_source;
|
||||
mod shared_vec_slice;
|
||||
mod directory_lock;
|
||||
|
||||
/// Errors specific to the directory module.
|
||||
pub mod error;
|
||||
|
||||
use std::io::{BufWriter, Seek, Write};
|
||||
pub use self::directory::DirectoryLock;
|
||||
pub use self::directory::{Directory, DirectoryClone};
|
||||
pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK};
|
||||
pub use self::ram_directory::RAMDirectory;
|
||||
pub use self::read_only_source::ReadOnlySource;
|
||||
pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK};
|
||||
use std::io::{BufWriter, Seek, Write};
|
||||
|
||||
#[cfg(feature = "mmap")]
|
||||
pub use self::mmap_directory::MmapDirectory;
|
||||
|
||||
@@ -125,23 +125,37 @@ fn test_directory(directory: &mut Directory) {
|
||||
|
||||
fn test_lock_non_blocking(directory: &mut Directory) {
|
||||
{
|
||||
let lock_a_res = directory.acquire_lock(&Lock { filepath: PathBuf::from("a.lock"), is_blocking: false });
|
||||
let lock_a_res = directory.acquire_lock(&Lock {
|
||||
filepath: PathBuf::from("a.lock"),
|
||||
is_blocking: false,
|
||||
});
|
||||
assert!(lock_a_res.is_ok());
|
||||
let lock_b_res = directory.acquire_lock(&Lock { filepath: PathBuf::from("b.lock"), is_blocking: false });
|
||||
let lock_b_res = directory.acquire_lock(&Lock {
|
||||
filepath: PathBuf::from("b.lock"),
|
||||
is_blocking: false,
|
||||
});
|
||||
assert!(lock_b_res.is_ok());
|
||||
let lock_a_res2 = directory.acquire_lock(&Lock { filepath: PathBuf::from("a.lock"), is_blocking: false });
|
||||
let lock_a_res2 = directory.acquire_lock(&Lock {
|
||||
filepath: PathBuf::from("a.lock"),
|
||||
is_blocking: false,
|
||||
});
|
||||
assert!(lock_a_res2.is_err());
|
||||
}
|
||||
let lock_a_res = directory.acquire_lock(&Lock { filepath: PathBuf::from("a.lock"), is_blocking: false });
|
||||
let lock_a_res = directory.acquire_lock(&Lock {
|
||||
filepath: PathBuf::from("a.lock"),
|
||||
is_blocking: false,
|
||||
});
|
||||
assert!(lock_a_res.is_ok());
|
||||
}
|
||||
|
||||
|
||||
|
||||
fn test_lock_blocking(directory: &mut Directory) {
|
||||
let lock_a_res = directory.acquire_lock(&Lock { filepath: PathBuf::from("a.lock"), is_blocking: true });
|
||||
let lock_a_res = directory.acquire_lock(&Lock {
|
||||
filepath: PathBuf::from("a.lock"),
|
||||
is_blocking: true,
|
||||
});
|
||||
assert!(lock_a_res.is_ok());
|
||||
std::thread::spawn(move || { //< lock_a_res is sent to the thread.
|
||||
std::thread::spawn(move || {
|
||||
//< lock_a_res is sent to the thread.
|
||||
std::thread::sleep(time::Duration::from_millis(10));
|
||||
// explicitely droping lock_a_res. It would have been sufficient to just force it
|
||||
// to be part of the move, but the intent seems clearer that way.
|
||||
@@ -149,13 +163,20 @@ fn test_lock_blocking(directory: &mut Directory) {
|
||||
});
|
||||
{
|
||||
// A non-blocking call should fail, as the thread is running and holding the lock.
|
||||
let lock_a_res = directory.acquire_lock(&Lock { filepath: PathBuf::from("a.lock"), is_blocking: false });
|
||||
let lock_a_res = directory.acquire_lock(&Lock {
|
||||
filepath: PathBuf::from("a.lock"),
|
||||
is_blocking: false,
|
||||
});
|
||||
assert!(lock_a_res.is_err());
|
||||
}
|
||||
{ // the blocking call should wait for at least 10ms.
|
||||
{
|
||||
// the blocking call should wait for at least 10ms.
|
||||
let start = time::Instant::now();
|
||||
let lock_a_res = directory.acquire_lock(&Lock { filepath: PathBuf::from("a.lock"), is_blocking: true });
|
||||
let lock_a_res = directory.acquire_lock(&Lock {
|
||||
filepath: PathBuf::from("a.lock"),
|
||||
is_blocking: true,
|
||||
});
|
||||
assert!(lock_a_res.is_ok());
|
||||
assert!(start.elapsed().subsec_millis() >= 10);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
use std::io;
|
||||
|
||||
use directory::error::LockError;
|
||||
use directory::error::{IOError, OpenDirectoryError, OpenReadError, OpenWriteError};
|
||||
use fastfield::FastFieldNotAvailableError;
|
||||
use query;
|
||||
@@ -10,7 +11,6 @@ use serde_json;
|
||||
use std::fmt;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::PoisonError;
|
||||
use directory::error::LockError;
|
||||
|
||||
pub struct DataCorruption {
|
||||
filepath: Option<PathBuf>,
|
||||
@@ -57,7 +57,7 @@ pub enum TantivyError {
|
||||
#[fail(display = "Index already exists")]
|
||||
IndexAlreadyExists,
|
||||
/// Failed to acquire file lock
|
||||
#[fail(display = "Failed to acquire Lockfile: {:?}. {:?}", _0, _1)]
|
||||
#[fail(display = "Failed to acquire Lockfile: {:?}. {:?}", _0, _1)]
|
||||
LockFailure(LockError, Option<String>),
|
||||
/// IO Error.
|
||||
#[fail(display = "An IO error occurred: '{}'", _0)]
|
||||
|
||||
@@ -9,9 +9,11 @@ use core::SegmentId;
|
||||
use core::SegmentMeta;
|
||||
use core::SegmentReader;
|
||||
use crossbeam::channel;
|
||||
use directory::DirectoryLock;
|
||||
use docset::DocSet;
|
||||
use error::TantivyError;
|
||||
use fastfield::write_delete_bitset;
|
||||
use futures::{Canceled, Future};
|
||||
use indexer::delete_queue::{DeleteCursor, DeleteQueue};
|
||||
use indexer::doc_opstamp_mapping::DocToOpstampMapping;
|
||||
use indexer::operation::DeleteOperation;
|
||||
@@ -28,8 +30,6 @@ use std::sync::Arc;
|
||||
use std::thread;
|
||||
use std::thread::JoinHandle;
|
||||
use Result;
|
||||
use directory::DirectoryLock;
|
||||
use futures::{Future, Canceled};
|
||||
|
||||
// Size of the margin for the heap. A segment is closed when the remaining memory
|
||||
// in the heap goes below MARGIN_IN_BYTES.
|
||||
@@ -459,7 +459,10 @@ impl IndexWriter {
|
||||
/// Merges a given list of segments
|
||||
///
|
||||
/// `segment_ids` is required to be non-empty.
|
||||
pub fn merge(&mut self, segment_ids: &[SegmentId]) -> Result<impl Future<Item=SegmentMeta, Error=Canceled>> {
|
||||
pub fn merge(
|
||||
&mut self,
|
||||
segment_ids: &[SegmentId],
|
||||
) -> Result<impl Future<Item = SegmentMeta, Error = Canceled>> {
|
||||
self.segment_updater.start_merge(segment_ids)
|
||||
}
|
||||
|
||||
@@ -652,12 +655,12 @@ impl IndexWriter {
|
||||
mod tests {
|
||||
|
||||
use super::initial_table_size;
|
||||
use directory::error::LockError;
|
||||
use error::*;
|
||||
use indexer::NoMergePolicy;
|
||||
use schema::{self, Document};
|
||||
use Index;
|
||||
use Term;
|
||||
use directory::error::LockError;
|
||||
|
||||
#[test]
|
||||
fn test_lockfile_stops_duplicates() {
|
||||
|
||||
@@ -76,9 +76,7 @@ impl SegmentRegister {
|
||||
}
|
||||
|
||||
pub fn get(&self, segment_id: &SegmentId) -> Option<SegmentEntry> {
|
||||
self.segment_states
|
||||
.get(segment_id)
|
||||
.map(|segment_entry| segment_entry.clone())
|
||||
self.segment_states.get(segment_id).cloned()
|
||||
}
|
||||
|
||||
pub fn new(segment_metas: Vec<SegmentMeta>, delete_cursor: &DeleteCursor) -> SegmentRegister {
|
||||
|
||||
@@ -129,7 +129,6 @@ extern crate base64;
|
||||
extern crate bit_set;
|
||||
extern crate bitpacking;
|
||||
extern crate byteorder;
|
||||
extern crate scoped_pool;
|
||||
extern crate combine;
|
||||
extern crate crossbeam;
|
||||
extern crate fnv;
|
||||
@@ -144,6 +143,7 @@ extern crate num_cpus;
|
||||
extern crate owning_ref;
|
||||
extern crate regex;
|
||||
extern crate rust_stemmers;
|
||||
extern crate scoped_pool;
|
||||
extern crate serde;
|
||||
extern crate stable_deref_trait;
|
||||
extern crate tempdir;
|
||||
|
||||
Reference in New Issue
Block a user