Compare commits

...

1 Commits
0.24 ... slog

Author SHA1 Message Date
Paul Masurel
1c81b8171f Switch to slog
Closes #111
2020-09-30 19:55:54 +09:00
21 changed files with 310 additions and 121 deletions

View File

@@ -1,7 +1,8 @@
Tantivy 0.14.0
=========================
- Remove dependency to atomicwrites #833 .Implemented by @pmasurel upon suggestion and research from @asafigan).
- Remove dependency to atomicwrites #833. Implemented by @pmasurel upon suggestion and research from @asafigan).
- Migrated tantivy error from the now deprecated `failure` crate to `thiserror` #760. (@hirevo)
- Switched to structure logging (via the `slog` crate). (@pmasurel)
Tantivy 0.13.1
===================

View File

@@ -23,7 +23,8 @@ memmap = {version = "0.7", optional=true}
lz4 = {version="1", optional=true}
snap = "1"
tempfile = {version="3", optional=true}
log = "0.4"
slog = "2.5"
slog-stdlog = "4"
serde = {version="1", features=["derive"]}
serde_json = "1"
num_cpus = "1"

View File

@@ -1,5 +1,6 @@
use crossbeam::channel;
use rayon::{ThreadPool, ThreadPoolBuilder};
use slog::{error, Logger};
/// Search executor whether search request are single thread or multithread.
///
@@ -43,6 +44,7 @@ impl Executor {
&self,
f: F,
args: AIterator,
logger: Logger,
) -> crate::Result<Vec<R>> {
match self {
Executor::SingleThread => args.map(f).collect::<crate::Result<_>>(),
@@ -57,7 +59,7 @@ impl Executor {
let (idx, arg) = arg_with_idx;
let fruit = f(arg);
if let Err(err) = fruit_sender.send((idx, fruit)) {
error!("Failed to send search task. It probably means all search threads have panicked. {:?}", err);
error!(logger, "Failed to send search task. It probably means all search threads have panicked. {:?}", err);
}
});
}
@@ -87,17 +89,21 @@ impl Executor {
#[cfg(test)]
mod tests {
use slog::{o, Discard, Logger};
use super::Executor;
#[test]
#[should_panic(expected = "panic should propagate")]
fn test_panic_propagates_single_thread() {
let logger = Logger::root(Discard, o!());
let _result: Vec<usize> = Executor::single_thread()
.map(
|_| {
panic!("panic should propagate");
},
vec![0].into_iter(),
logger,
)
.unwrap();
}
@@ -105,6 +111,7 @@ mod tests {
#[test]
#[should_panic] //< unfortunately the panic message is not propagated
fn test_panic_propagates_multi_thread() {
let logger = Logger::root(Discard, o!());
let _result: Vec<usize> = Executor::multi_thread(1, "search-test")
.unwrap()
.map(
@@ -112,14 +119,16 @@ mod tests {
panic!("panic should propagate");
},
vec![0].into_iter(),
logger,
)
.unwrap();
}
#[test]
fn test_map_singlethread() {
let logger = Logger::root(Discard, o!());
let result: Vec<usize> = Executor::single_thread()
.map(|i| Ok(i * 2), 0..1_000)
.map(|i| Ok(i * 2), 0..1_000, logger)
.unwrap();
assert_eq!(result.len(), 1_000);
for i in 0..1_000 {
@@ -129,9 +138,10 @@ mod tests {
#[test]
fn test_map_multithread() {
let logger = Logger::root(Discard, o!());
let result: Vec<usize> = Executor::multi_thread(3, "search-test")
.unwrap()
.map(|i| Ok(i * 2), 0..10)
.map(|i| Ok(i * 2), 0..10, logger)
.unwrap();
assert_eq!(result.len(), 10);
for i in 0..10 {

View File

@@ -21,6 +21,7 @@ use crate::schema::FieldType;
use crate::schema::Schema;
use crate::tokenizer::{TextAnalyzer, TokenizerManager};
use crate::IndexWriter;
use slog::Logger;
use std::borrow::BorrowMut;
use std::collections::HashSet;
use std::fmt;
@@ -57,6 +58,11 @@ pub struct Index {
}
impl Index {
pub(crate) fn logger(&self) -> &Logger {
self.directory.logger()
}
/// Examines the directory to see if it contains an index.
///
/// Effectively, it only checks for the presence of the `meta.json` file.
@@ -147,13 +153,13 @@ impl Index {
/// If a directory previously existed, it will be erased.
pub fn create<Dir: Directory>(dir: Dir, schema: Schema) -> crate::Result<Index> {
let directory = ManagedDirectory::wrap(dir)?;
Index::from_directory(directory, schema)
Index::new_from_directory(directory, schema)
}
/// Create a new index from a directory.
///
/// This will overwrite existing meta.json
fn from_directory(mut directory: ManagedDirectory, schema: Schema) -> crate::Result<Index> {
fn new_from_directory(mut directory: ManagedDirectory, schema: Schema) -> crate::Result<Index> {
save_new_metas(schema.clone(), directory.borrow_mut())?;
let metas = IndexMeta::with_schema(schema);
Index::create_from_metas(directory, &metas, SegmentMetaInventory::default())
@@ -244,6 +250,8 @@ impl Index {
/// Open the index using the provided directory
pub fn open<D: Directory>(directory: D) -> crate::Result<Index> {
let logger: &Logger = directory.logger();
slog::info!(logger, "index-open"; "directory" => format!("{:?}", directory));
let directory = ManagedDirectory::wrap(directory)?;
let inventory = SegmentMetaInventory::default();
let metas = load_metas(&directory, &inventory)?;

View File

@@ -143,6 +143,7 @@ impl Searcher {
collector.collect_segment(weight.as_ref(), segment_ord as u32, segment_reader)
},
segment_readers.iter().enumerate(),
self.index.logger().clone(),
)?;
collector.merge_fruits(fruits)
}

View File

@@ -21,6 +21,12 @@ use std::sync::atomic;
#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct SegmentId(Uuid);
impl ToString for SegmentId {
fn to_string(&self) -> String {
self.short_uuid_string()
}
}
#[cfg(test)]
static AUTO_INC_COUNTER: Lazy<atomic::AtomicUsize> = Lazy::new(|| atomic::AtomicUsize::default());

View File

@@ -17,6 +17,7 @@ use crate::store::StoreReader;
use crate::termdict::TermDictionary;
use crate::DocId;
use fail::fail_point;
use slog::{warn, Logger};
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
@@ -53,6 +54,7 @@ pub struct SegmentReader {
store_source: ReadOnlySource,
delete_bitset_opt: Option<DeleteBitSet>,
schema: Schema,
logger: Logger,
}
impl SegmentReader {
@@ -200,6 +202,7 @@ impl SegmentReader {
positions_composite,
positions_idx_composite,
schema,
logger: segment.index().logger().clone(),
})
}
@@ -229,7 +232,11 @@ impl SegmentReader {
let record_option_opt = field_type.get_index_record_option();
if record_option_opt.is_none() {
warn!("Field {:?} does not seem indexed.", field_entry.name());
warn!(
self.logger,
"Field {:?} does not seem indexed.",
field_entry.name()
);
}
let postings_source_opt = self.postings_composite.open_read(field);

View File

@@ -1,3 +1,5 @@
use slog::{error, Logger};
use crate::directory::directory_lock::Lock;
use crate::directory::error::LockError;
use crate::directory::error::{DeleteError, OpenReadError, OpenWriteError};
@@ -64,7 +66,10 @@ impl<T: Send + Sync + 'static> From<Box<T>> for DirectoryLock {
impl Drop for DirectoryLockGuard {
fn drop(&mut self) {
if let Err(e) = self.directory.delete(&*self.path) {
error!("Failed to remove the lock file. {:?}", e);
error!(
self.directory.logger(),
"Failed to remove the lock file. {:?}", e
);
}
}
}
@@ -209,6 +214,9 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
/// `OnCommit` `ReloadPolicy`. Not implementing watch in a `Directory` only prevents the
/// `OnCommit` `ReloadPolicy` to work properly.
fn watch(&self, watch_callback: WatchCallback) -> crate::Result<WatchHandle>;
/// Returns the `slog::Logger` configured for the `Directory`.
fn logger(&self) -> &Logger;
}
/// DirectoryClone

View File

@@ -11,9 +11,9 @@ use crate::error::DataCorruption;
use crate::Directory;
use crc32fast::Hasher;
use slog::{debug, error, info};
use std::collections::HashSet;
use std::io;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::result;
use std::sync::RwLockWriteGuard;
@@ -56,9 +56,9 @@ fn save_managed_paths(
directory: &mut dyn Directory,
wlock: &RwLockWriteGuard<'_, MetaInformation>,
) -> io::Result<()> {
let mut w = serde_json::to_vec(&wlock.managed_paths)?;
writeln!(&mut w)?;
directory.atomic_write(&MANAGED_FILEPATH, &w[..])?;
let mut managed_json = serde_json::to_string_pretty(&wlock.managed_paths)?;
managed_json.push_str("\n");
directory.atomic_write(&MANAGED_FILEPATH, managed_json.as_bytes())?;
Ok(())
}
@@ -118,7 +118,7 @@ impl ManagedDirectory {
&mut self,
get_living_files: L,
) -> crate::Result<GarbageCollectionResult> {
info!("Garbage collect");
info!(self.directory.logger(), "gc"; "stage"=>"start");
let mut files_to_delete = vec![];
// It is crucial to get the living files after acquiring the
@@ -153,7 +153,7 @@ impl ManagedDirectory {
}
}
Err(err) => {
error!("Failed to acquire lock for GC");
error!(self.logger(), "Failed to acquire lock for GC");
return Err(crate::TantivyError::from(err));
}
}
@@ -165,7 +165,7 @@ impl ManagedDirectory {
for file_to_delete in files_to_delete {
match self.delete(&file_to_delete) {
Ok(_) => {
info!("Deleted {:?}", file_to_delete);
debug!(self.logger(), "deleted-success"; "file"=>format!("{:?}", file_to_delete));
deleted_files.push(file_to_delete);
}
Err(file_error) => {
@@ -178,7 +178,7 @@ impl ManagedDirectory {
if !cfg!(target_os = "windows") {
// On windows, delete is expected to fail if the file
// is mmapped.
error!("Failed to delete {:?}", file_to_delete);
error!(self.logger(), "delete-file-fail"; "path"=>file_to_delete.to_str().unwrap_or("<invalid-utf8>"));
}
}
}
@@ -200,6 +200,10 @@ impl ManagedDirectory {
save_managed_paths(self.directory.as_mut(), &meta_informations_wlock)?;
}
info!(self.directory.logger(), "gc"; "stage"=>"end",
"num-sucess-file-deletes"=>deleted_files.len(),
"num-failed-file-deletes"=>failed_to_delete_files.len());
Ok(GarbageCollectionResult {
deleted_files,
failed_to_delete_files,
@@ -274,6 +278,7 @@ impl ManagedDirectory {
impl Directory for ManagedDirectory {
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> {
slog::debug!(self.logger(), "open-read"; "path" => path.to_str().unwrap_or("<invalid-utf8>"));
let read_only_source = self.directory.open_read(path)?;
let (footer, reader) = Footer::extract_footer(read_only_source).map_err(|io_error| {
OpenReadError::IOError {
@@ -286,6 +291,7 @@ impl Directory for ManagedDirectory {
}
fn open_write(&mut self, path: &Path) -> result::Result<WritePtr, OpenWriteError> {
slog::debug!(self.logger(), "open-write"; "path" => path.to_str().unwrap_or("<invalid-utf8>"));
self.register_file_as_managed(path)
.map_err(|io_error| OpenWriteError::IOError {
io_error,
@@ -300,9 +306,11 @@ impl Directory for ManagedDirectory {
))))
}
fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> {
fn atomic_write(&mut self, path: &Path, content: &[u8]) -> io::Result<()> {
let content_str = std::str::from_utf8(content).unwrap_or("<content-not-utf-8>");
slog::debug!(self.logger(), "Atomic write"; "path" => format!("{:?}", path), "content_length"=>content_str);
self.register_file_as_managed(path)?;
self.directory.atomic_write(path, data)
self.directory.atomic_write(path, content)
}
fn atomic_read(&self, path: &Path) -> result::Result<Vec<u8>, OpenReadError> {
@@ -324,6 +332,10 @@ impl Directory for ManagedDirectory {
fn watch(&self, watch_callback: WatchCallback) -> crate::Result<WatchHandle> {
self.directory.watch(watch_callback)
}
fn logger(&self) -> &slog::Logger {
self.directory.logger()
}
}
impl Clone for ManagedDirectory {

View File

@@ -17,6 +17,8 @@ use notify::RawEvent;
use notify::RecursiveMode;
use notify::Watcher;
use serde::{Deserialize, Serialize};
use slog::{debug, o, Drain, Logger};
use slog_stdlog::StdLog;
use std::collections::HashMap;
use std::convert::From;
use std::fmt;
@@ -34,11 +36,6 @@ use std::sync::Weak;
use std::thread;
use tempfile::TempDir;
/// Create a default io error given a string.
pub(crate) fn make_io_err(msg: String) -> io::Error {
io::Error::new(io::ErrorKind::Other, msg)
}
/// Returns None iff the file exists, can be read, but is empty (and hence
/// cannot be mmapped)
fn open_mmap(full_path: &Path) -> result::Result<Option<Mmap>, OpenReadError> {
@@ -149,7 +146,7 @@ struct WatcherWrapper {
}
impl WatcherWrapper {
pub fn new(path: &Path) -> Result<Self, OpenDirectoryError> {
pub(crate) fn new(path: &Path, logger: Logger) -> Result<Self, OpenDirectoryError> {
let (tx, watcher_recv): (Sender<RawEvent>, Receiver<RawEvent>) = channel();
// We need to initialize the
let watcher = notify::raw_watcher(tx)
@@ -163,7 +160,8 @@ impl WatcherWrapper {
panic!("Unknown error while starting watching directory {:?}", path);
}
})?;
let watcher_router: Arc<WatchCallbackList> = Default::default();
let watcher_router: Arc<WatchCallbackList> =
Arc::new(WatchCallbackList::with_logger(logger));
let watcher_router_clone = watcher_router.clone();
thread::Builder::new()
.name("meta-file-watch-thread".to_string())
@@ -226,15 +224,21 @@ struct MmapDirectoryInner {
mmap_cache: RwLock<MmapCache>,
_temp_directory: Option<TempDir>,
watcher: RwLock<Option<WatcherWrapper>>,
logger: Logger,
}
impl MmapDirectoryInner {
fn new(root_path: PathBuf, temp_directory: Option<TempDir>) -> MmapDirectoryInner {
fn new(
root_path: PathBuf,
temp_directory: Option<TempDir>,
logger: Logger,
) -> MmapDirectoryInner {
MmapDirectoryInner {
root_path,
mmap_cache: Default::default(),
_temp_directory: temp_directory,
watcher: RwLock::new(None),
logger,
}
}
@@ -246,7 +250,7 @@ impl MmapDirectoryInner {
// The downside is that we might create a watch wrapper that is not useful.
let need_initialization = self.watcher.read().unwrap().is_none();
if need_initialization {
let watch_wrapper = WatcherWrapper::new(&self.root_path)?;
let watch_wrapper = WatcherWrapper::new(&self.root_path, self.logger.clone())?;
let mut watch_wlock = self.watcher.write().unwrap();
// the watcher could have been initialized when we released the lock, and
// we do not want to lose the watched files that were set.
@@ -269,8 +273,8 @@ impl fmt::Debug for MmapDirectory {
}
impl MmapDirectory {
fn new(root_path: PathBuf, temp_directory: Option<TempDir>) -> MmapDirectory {
let inner = MmapDirectoryInner::new(root_path, temp_directory);
fn new(root_path: PathBuf, temp_directory: Option<TempDir>, logger: Logger) -> MmapDirectory {
let inner = MmapDirectoryInner::new(root_path, temp_directory, logger);
MmapDirectory {
inner: Arc::new(inner),
}
@@ -282,17 +286,18 @@ impl MmapDirectory {
/// For your unit tests, prefer the RAMDirectory.
pub fn create_from_tempdir() -> Result<MmapDirectory, OpenDirectoryError> {
let tempdir = TempDir::new().map_err(OpenDirectoryError::FailedToCreateTempDir)?;
Ok(MmapDirectory::new(
tempdir.path().to_path_buf(),
Some(tempdir),
))
let logger = Logger::root(StdLog.fuse(), o!());
Ok(MmapDirectory::new(tempdir.path().to_owned(), Some(tempdir), logger))
}
/// Opens a MmapDirectory in a directory.
///
/// Returns an error if the `directory_path` does not
/// exist or if it is not a directory.
pub fn open<P: AsRef<Path>>(directory_path: P) -> Result<MmapDirectory, OpenDirectoryError> {
pub fn open_with_logger<P: AsRef<Path>>(
directory_path: P,
logger: Logger,
) -> Result<MmapDirectory, OpenDirectoryError> {
let directory_path: &Path = directory_path.as_ref();
if !directory_path.exists() {
Err(OpenDirectoryError::DoesNotExist(PathBuf::from(
@@ -303,10 +308,20 @@ impl MmapDirectory {
directory_path,
)))
} else {
Ok(MmapDirectory::new(PathBuf::from(directory_path), None))
Ok(MmapDirectory::new(
PathBuf::from(directory_path),
None,
logger,
))
}
}
/// Creates an `MmapDirectory` at the given path.
pub fn open<P: AsRef<Path>>(directory_path: P) -> Result<MmapDirectory, OpenDirectoryError> {
let logger = Logger::root(StdLog.fuse(), o!());
Self::open_with_logger(directory_path, logger)
}
/// Joins a relative_path to the directory `root_path`
/// to create a proper complete `filepath`.
fn resolve_path(&self, relative_path: &Path) -> PathBuf {
@@ -366,11 +381,12 @@ impl MmapDirectory {
struct ReleaseLockFile {
_file: File,
path: PathBuf,
logger: Logger,
}
impl Drop for ReleaseLockFile {
fn drop(&mut self) {
debug!("Releasing lock {:?}", self.path);
debug!(self.logger, "Releasing lock {:?}", self.path);
}
}
@@ -409,17 +425,16 @@ impl TerminatingWrite for SafeFileWriter {
impl Directory for MmapDirectory {
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> {
debug!("Open Read {:?}", path);
let full_path = self.resolve_path(path);
let mut mmap_cache = self.inner.mmap_cache.write().map_err(|_| {
let msg = format!(
"Failed to acquired write lock \
on mmap cache while reading {:?}",
path
);
let io_error = io::Error::new(io::ErrorKind::Other, msg);
OpenReadError::IOError {
io_error: make_io_err(msg),
io_error,
filepath: path.to_owned(),
}
})?;
@@ -457,9 +472,7 @@ impl Directory for MmapDirectory {
}
fn open_write(&mut self, path: &Path) -> Result<WritePtr, OpenWriteError> {
debug!("Open Write {:?}", path);
let full_path = self.resolve_path(path);
let open_res = OpenOptions::new()
.write(true)
.create_new(true)
@@ -519,7 +532,6 @@ impl Directory for MmapDirectory {
}
fn atomic_write(&mut self, path: &Path, content: &[u8]) -> io::Result<()> {
debug!("Atomic Write {:?}", path);
let mut tempfile = tempfile::Builder::new().tempfile_in(&self.inner.root_path)?;
tempfile.write_all(content)?;
tempfile.flush()?;
@@ -541,16 +553,22 @@ impl Directory for MmapDirectory {
} else {
file.try_lock_exclusive().map_err(|_| LockError::LockBusy)?
}
let logger = self.inner.logger.clone();
// dropping the file handle will release the lock.
Ok(DirectoryLock::from(Box::new(ReleaseLockFile {
path: lock.filepath.clone(),
_file: file,
logger,
})))
}
fn watch(&self, watch_callback: WatchCallback) -> crate::Result<WatchHandle> {
self.inner.watch(watch_callback)
}
fn logger(&self) -> &Logger {
&self.inner.logger
}
}
#[cfg(test)]
@@ -660,7 +678,8 @@ mod tests {
let counter_clone = counter.clone();
let tmp_dir = tempfile::TempDir::new().unwrap();
let tmp_dirpath = tmp_dir.path().to_owned();
let mut watch_wrapper = WatcherWrapper::new(&tmp_dirpath).unwrap();
let logger = Logger::root(slog::Discard, o!());
let mut watch_wrapper = WatcherWrapper::new(&tmp_dirpath, logger).unwrap();
let tmp_file = tmp_dirpath.join(*META_FILEPATH);
let _handle = watch_wrapper.watch(Box::new(move || {
counter_clone.fetch_add(1, Ordering::SeqCst);

View File

@@ -23,7 +23,8 @@ pub use self::directory::{Directory, DirectoryClone};
pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK};
pub use self::ram_directory::RAMDirectory;
pub use self::read_only_source::ReadOnlySource;
pub use self::watch_event_router::{WatchCallback, WatchCallbackList, WatchHandle};
pub(crate) use self::watch_event_router::WatchCallbackList;
pub use self::watch_event_router::{WatchCallback, WatchHandle};
use std::io::{self, BufWriter, Write};
use std::path::PathBuf;
/// Outcome of the Garbage collection

View File

@@ -5,6 +5,8 @@ use crate::directory::WatchCallbackList;
use crate::directory::{Directory, ReadOnlySource, WatchCallback, WatchHandle};
use crate::directory::{TerminatingWrite, WritePtr};
use fail::fail_point;
use slog::{o, Drain, Logger};
use slog_stdlog::StdLog;
use std::collections::HashMap;
use std::fmt;
use std::io::{self, BufWriter, Cursor, Seek, SeekFrom, Write};
@@ -66,7 +68,7 @@ impl Write for VecWriter {
fn flush(&mut self) -> io::Result<()> {
self.is_flushed = true;
let mut fs = self.shared_directory.fs.write().unwrap();
let mut fs = self.shared_directory.fs.inner_directory.write().unwrap();
fs.write(self.path.clone(), self.data.get_ref());
Ok(())
}
@@ -78,13 +80,19 @@ impl TerminatingWrite for VecWriter {
}
}
#[derive(Default)]
struct InnerDirectory {
fs: HashMap<PathBuf, ReadOnlySource>,
watch_router: WatchCallbackList,
}
impl InnerDirectory {
fn with_logger(logger: Logger) -> Self {
InnerDirectory {
fs: Default::default(),
watch_router: WatchCallbackList::with_logger(logger.clone()),
}
}
fn write(&mut self, path: PathBuf, data: &[u8]) -> bool {
let data = ReadOnlySource::new(Vec::from(data));
self.fs.insert(path, data).is_some()
@@ -117,20 +125,32 @@ impl InnerDirectory {
}
}
impl Default for RAMDirectory {
fn default() -> RAMDirectory {
let logger = Logger::root(StdLog.fuse(), o!());
Self::with_logger(logger)
}
}
impl fmt::Debug for RAMDirectory {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "RAMDirectory")
}
}
struct Inner {
inner_directory: RwLock<InnerDirectory>,
logger: Logger,
}
/// A Directory storing everything in anonymous memory.
///
/// It is mainly meant for unit testing.
/// Writes are only made visible upon flushing.
///
#[derive(Clone, Default)]
#[derive(Clone)]
pub struct RAMDirectory {
fs: Arc<RwLock<InnerDirectory>>,
fs: Arc<Inner>,
}
impl RAMDirectory {
@@ -139,10 +159,21 @@ impl RAMDirectory {
Self::default()
}
/// Create a `RAMDirectory` with a custom logger.
pub fn with_logger(logger: Logger) -> RAMDirectory {
let inner_directory = InnerDirectory::with_logger(logger.clone()).into();
RAMDirectory {
fs: Arc::new(Inner {
inner_directory,
logger,
}),
}
}
/// Returns the sum of the size of the different files
/// in the RAMDirectory.
pub fn total_mem_usage(&self) -> usize {
self.fs.read().unwrap().total_mem_usage()
self.fs.inner_directory.read().unwrap().total_mem_usage()
}
/// Write a copy of all of the files saved in the RAMDirectory in the target `Directory`.
@@ -152,7 +183,7 @@ impl RAMDirectory {
///
/// If an error is encounterred, files may be persisted partially.
pub fn persist(&self, dest: &mut dyn Directory) -> crate::Result<()> {
let wlock = self.fs.write().unwrap();
let wlock = self.fs.inner_directory.write().unwrap();
for (path, source) in wlock.fs.iter() {
let mut dest_wrt = dest.open_write(path)?;
dest_wrt.write_all(source.as_slice())?;
@@ -164,7 +195,7 @@ impl RAMDirectory {
impl Directory for RAMDirectory {
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> {
self.fs.read().unwrap().open_read(path)
self.fs.inner_directory.read().unwrap().open_read(path)
}
fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
@@ -174,15 +205,15 @@ impl Directory for RAMDirectory {
filepath: path.to_path_buf(),
})
});
self.fs.write().unwrap().delete(path)
self.fs.inner_directory.write().unwrap().delete(path)
}
fn exists(&self, path: &Path) -> bool {
self.fs.read().unwrap().exists(path)
self.fs.inner_directory.read().unwrap().exists(path)
}
fn open_write(&mut self, path: &Path) -> Result<WritePtr, OpenWriteError> {
let mut fs = self.fs.write().unwrap();
let mut fs = self.fs.inner_directory.write().unwrap();
let path_buf = PathBuf::from(path);
let vec_writer = VecWriter::new(path_buf.clone(), self.clone());
let exists = fs.write(path_buf.clone(), &[]);
@@ -206,19 +237,38 @@ impl Directory for RAMDirectory {
let path_buf = PathBuf::from(path);
// Reserve the path to prevent calls to .write() to succeed.
self.fs.write().unwrap().write(path_buf.clone(), &[]);
self.fs
.inner_directory
.write()
.unwrap()
.write(path_buf.clone(), &[]);
let mut vec_writer = VecWriter::new(path_buf, self.clone());
vec_writer.write_all(data)?;
vec_writer.flush()?;
if path == Path::new(&*META_FILEPATH) {
let _ = self.fs.write().unwrap().watch_router.broadcast();
let _ = self
.fs
.inner_directory
.write()
.unwrap()
.watch_router
.broadcast();
}
Ok(())
}
fn watch(&self, watch_callback: WatchCallback) -> crate::Result<WatchHandle> {
Ok(self.fs.write().unwrap().watch(watch_callback))
Ok(self
.fs
.inner_directory
.write()
.unwrap()
.watch(watch_callback))
}
fn logger(&self) -> &Logger {
&self.fs.logger
}
}

View File

@@ -1,5 +1,6 @@
use futures::channel::oneshot;
use futures::{Future, TryFutureExt};
use slog::{error, Logger};
use std::sync::Arc;
use std::sync::RwLock;
use std::sync::Weak;
@@ -11,9 +12,9 @@ pub type WatchCallback = Box<dyn Fn() + Sync + Send>;
///
/// It registers callbacks (See `.subscribe(...)`) and
/// calls them upon calls to `.broadcast(...)`.
#[derive(Default)]
pub struct WatchCallbackList {
pub(crate) struct WatchCallbackList {
router: RwLock<Vec<Weak<WatchCallback>>>,
logger: Logger,
}
/// Controls how long a directory should watch for a file change.
@@ -32,6 +33,13 @@ impl WatchHandle {
}
impl WatchCallbackList {
pub fn with_logger(logger: Logger) -> Self {
WatchCallbackList {
logger,
router: Default::default(),
}
}
/// Subscribes a new callback and returns a handle that controls the lifetime of the callback.
pub fn subscribe(&self, watch_callback: WatchCallback) -> WatchHandle {
let watch_callback_arc = Arc::new(watch_callback);
@@ -74,8 +82,8 @@ impl WatchCallbackList {
});
if let Err(err) = spawn_res {
error!(
"Failed to spawn thread to call watch callbacks. Cause: {:?}",
err
self.logger,
"Failed to spawn thread to call watch callbacks. Cause: {:?}", err
);
}
result
@@ -86,13 +94,18 @@ impl WatchCallbackList {
mod tests {
use crate::directory::WatchCallbackList;
use futures::executor::block_on;
use slog::{o, Discard, Logger};
use std::mem;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
fn default_watch_callback_list() -> WatchCallbackList {
WatchCallbackList::with_logger(Logger::root(Discard, o!()))
}
#[test]
fn test_watch_event_router_simple() {
let watch_event_router = WatchCallbackList::default();
let watch_event_router = default_watch_callback_list();
let counter: Arc<AtomicUsize> = Default::default();
let counter_clone = counter.clone();
let inc_callback = Box::new(move || {
@@ -119,7 +132,7 @@ mod tests {
#[test]
fn test_watch_event_router_multiple_callback_same_key() {
let watch_event_router = WatchCallbackList::default();
let watch_event_router = default_watch_callback_list();
let counter: Arc<AtomicUsize> = Default::default();
let inc_callback = |inc: usize| {
let counter_clone = counter.clone();
@@ -148,7 +161,7 @@ mod tests {
#[test]
fn test_watch_event_router_multiple_callback_different_key() {
let watch_event_router = WatchCallbackList::default();
let watch_event_router = default_watch_callback_list();
let counter: Arc<AtomicUsize> = Default::default();
let counter_clone = counter.clone();
let inc_callback = Box::new(move || {

View File

@@ -27,6 +27,7 @@ use crate::Opstamp;
use crossbeam::channel;
use futures::executor::block_on;
use futures::future::Future;
use slog::{error, info, Logger};
use smallvec::smallvec;
use smallvec::SmallVec;
use std::mem;
@@ -195,20 +196,21 @@ fn index_documents(
grouped_document_iterator: &mut dyn Iterator<Item = OperationGroup>,
segment_updater: &mut SegmentUpdater,
mut delete_cursor: DeleteCursor,
logger: &Logger,
) -> crate::Result<bool> {
let schema = segment.schema();
info!(logger, "segment-index"; "stage"=>"start");
let mut segment_writer = SegmentWriter::for_segment(memory_budget, segment.clone(), &schema)?;
let mut buffer_limit_reached = false;
for document_group in grouped_document_iterator {
for doc in document_group {
segment_writer.add_document(doc, &schema)?;
}
let mem_usage = segment_writer.mem_usage();
if mem_usage >= memory_budget - MARGIN_IN_BYTES {
info!(
"Buffer limit reached, flushing segment with maxdoc={}.",
segment_writer.max_doc()
);
buffer_limit_reached = true;
break;
}
}
@@ -228,6 +230,14 @@ fn index_documents(
let segment_with_max_doc = segment.with_max_doc(max_doc);
let last_docstamp: Opstamp = *(doc_opstamps.last().unwrap());
info!(
logger,
"segment-index";
"stage" => "serialize",
"cause" => if buffer_limit_reached { "buffer-limit" } else { "commit" },
"maxdoc" => max_doc,
"last_docstamp" => last_docstamp
);
let delete_bitset_opt = apply_deletes(
&segment_with_max_doc,
@@ -241,7 +251,18 @@ fn index_documents(
delete_cursor,
delete_bitset_opt,
);
info!(
logger,
"segment-index";
"stage" => "publish",
);
block_on(segment_updater.schedule_add_segment(segment_entry))?;
info!(
logger,
"segment-index";
"stage" => "end",
);
Ok(true)
}
@@ -344,6 +365,10 @@ impl IndexWriter {
Ok(index_writer)
}
pub(crate) fn logger(&self) -> &Logger {
self.index.logger()
}
fn drop_sender(&mut self) {
let (sender, _receiver) = channel::bounded(1);
self.operation_sender = sender;
@@ -352,6 +377,8 @@ impl IndexWriter {
/// If there are some merging threads, blocks until they all finish their work and
/// then drop the `IndexWriter`.
pub fn wait_merging_threads(mut self) -> crate::Result<()> {
info!(self.logger(), "wait-merge-threads"; "stage"=>"start");
// this will stop the indexing thread,
// dropping the last reference to the segment_updater.
self.drop_sender();
@@ -372,9 +399,9 @@ impl IndexWriter {
.map_err(|_| TantivyError::ErrorInThread("Failed to join merging thread.".into()));
if let Err(ref e) = result {
error!("Some merging thread failed {:?}", e);
error!(self.logger(), "some merge thread failed"; "cause"=>e.to_string());
}
info!(self.logger(), "wait-merge-threads"; "stage"=>"stop");
result
}
@@ -434,12 +461,16 @@ impl IndexWriter {
return Ok(());
}
let segment = index.new_segment();
let segment_id = segment.id();
index_documents(
mem_budget,
segment,
&mut document_iterator,
&mut segment_updater,
delete_cursor.clone(),
&index
.logger()
.new(slog::o!("segment"=>segment_id.to_string())),
)?;
}
})?;
@@ -553,7 +584,10 @@ impl IndexWriter {
///
/// The opstamp at the last commit is returned.
pub fn rollback(&mut self) -> crate::Result<Opstamp> {
info!("Rolling back to opstamp {}", self.committed_opstamp);
info!(
self.logger(),
"Rolling back to opstamp {}", self.committed_opstamp
);
// marks the segment updater as killed. From now on, all
// segment updates will be ignored.
self.segment_updater.kill();
@@ -610,6 +644,8 @@ impl IndexWriter {
/// using this API.
/// See [`PreparedCommit::set_payload()`](PreparedCommit.html)
pub fn prepare_commit(&mut self) -> crate::Result<PreparedCommit> {
let logger = self.logger().clone();
// Here, because we join all of the worker threads,
// all of the segment update for this commit have been
// sent.
@@ -620,7 +656,10 @@ impl IndexWriter {
//
// This will move uncommitted segments to the state of
// committed segments.
info!("Preparing commit");
let commit_opstamp = self.stamper.stamp();
info!(logger, "prepare-commit"; "opstamp" => commit_opstamp);
// this will drop the current document channel
// and recreate a new one.
@@ -636,9 +675,8 @@ impl IndexWriter {
self.add_indexing_worker()?;
}
let commit_opstamp = self.stamper.stamp();
let prepared_commit = PreparedCommit::new(self, commit_opstamp);
info!("Prepared commit {}", commit_opstamp);
info!(logger, "Prepared commit {}", commit_opstamp);
Ok(prepared_commit)
}

View File

@@ -1,6 +1,7 @@
use super::IndexWriter;
use crate::Opstamp;
use futures::executor::block_on;
use slog::info;
/// A prepared commit
pub struct PreparedCommit<'a> {
@@ -31,7 +32,7 @@ impl<'a> PreparedCommit<'a> {
}
pub fn commit(self) -> crate::Result<Opstamp> {
info!("committing {}", self.opstamp);
info!(self.index_writer.logger(), "committing {}", self.opstamp);
let _ = block_on(
self.index_writer
.segment_updater()

View File

@@ -1,3 +1,5 @@
use slog::{warn, Logger};
use super::segment_register::SegmentRegister;
use crate::core::SegmentId;
use crate::core::SegmentMeta;
@@ -42,9 +44,9 @@ impl SegmentRegisters {
///
/// It guarantees the atomicity of the
/// changes (merges especially)
#[derive(Default)]
pub struct SegmentManager {
registers: RwLock<SegmentRegisters>,
logger: Logger,
}
impl Debug for SegmentManager {
@@ -77,12 +79,14 @@ impl SegmentManager {
pub fn from_segments(
segment_metas: Vec<SegmentMeta>,
delete_cursor: &DeleteCursor,
logger: Logger,
) -> SegmentManager {
SegmentManager {
registers: RwLock::new(SegmentRegisters {
uncommitted: SegmentRegister::default(),
committed: SegmentRegister::new(segment_metas, delete_cursor),
}),
logger,
}
}
@@ -186,7 +190,7 @@ impl SegmentManager {
let segments_status = registers_lock
.segments_status(before_merge_segment_ids)
.ok_or_else(|| {
warn!("couldn't find segment in SegmentManager");
warn!(self.logger, "couldn't find segment in SegmentManager");
crate::TantivyError::InvalidArgument(
"The segments that were merged could not be found in the SegmentManager. \
This is not necessarily a bug, and can happen after a rollback for instance."

View File

@@ -23,9 +23,9 @@ use futures::channel::oneshot;
use futures::executor::{ThreadPool, ThreadPoolBuilder};
use futures::future::Future;
use futures::future::TryFutureExt;
use slog::{debug, error, info, warn};
use std::borrow::BorrowMut;
use std::collections::HashSet;
use std::io::Write;
use std::ops::Deref;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
@@ -65,12 +65,11 @@ pub fn save_new_metas(schema: Schema, directory: &mut dyn Directory) -> crate::R
///
/// This method is not part of tantivy's public API
fn save_metas(metas: &IndexMeta, directory: &mut dyn Directory) -> crate::Result<()> {
info!("save metas");
let mut buffer = serde_json::to_vec_pretty(metas)?;
let mut meta_json = serde_json::to_string_pretty(metas)?;
// Just adding a new line at the end of the buffer.
writeln!(&mut buffer)?;
directory.atomic_write(&META_FILEPATH, &buffer[..])?;
debug!("Saved metas {:?}", serde_json::to_string_pretty(&metas));
meta_json.push_str("\n");
debug!(directory.logger(), "save meta"; "content"=>&meta_json);
directory.atomic_write(&META_FILEPATH, meta_json.as_bytes())?;
Ok(())
}
@@ -97,7 +96,6 @@ impl Deref for SegmentUpdater {
async fn garbage_collect_files(
segment_updater: SegmentUpdater,
) -> crate::Result<GarbageCollectionResult> {
info!("Running garbage collection");
let mut index = segment_updater.index.clone();
index
.directory_mut()
@@ -107,14 +105,12 @@ async fn garbage_collect_files(
/// Merges a list of segments the list of segment givens in the `segment_entries`.
/// This function happens in the calling thread and is computationally expensive.
fn merge(
merged_segment: Segment,
index: &Index,
mut segment_entries: Vec<SegmentEntry>,
target_opstamp: Opstamp,
) -> crate::Result<SegmentEntry> {
// first we need to apply deletes to our segment.
let merged_segment = index.new_segment();
// First we apply all of the delet to the merged segment, up to the target opstamp.
// First we apply all of the delete to the merged segment, up to the target opstamp.
for segment_entry in &mut segment_entries {
let segment = index.segment(segment_entry.meta().clone());
advance_deletes(segment, segment_entry, target_opstamp)?;
@@ -167,7 +163,8 @@ impl SegmentUpdater {
delete_cursor: &DeleteCursor,
) -> crate::Result<SegmentUpdater> {
let segments = index.searchable_segment_metas()?;
let segment_manager = SegmentManager::from_segments(segments, delete_cursor);
let segment_manager =
SegmentManager::from_segments(segments, delete_cursor, index.logger().clone());
let pool = ThreadPoolBuilder::new()
.name_prefix("segment_updater")
.pool_size(1)
@@ -387,7 +384,18 @@ impl SegmentUpdater {
.segment_manager
.start_merge(merge_operation.segment_ids())?;
info!("Starting merge - {:?}", merge_operation.segment_ids());
let segment_ids_str: String = merge_operation
.segment_ids()
.iter()
.map(|segment_id| segment_id.to_string())
.collect::<Vec<String>>()
.join(",");
let merged_segment = self.index.new_segment();
let logger = self.index.logger().new(slog::o!("segments"=>segment_ids_str, "merged-segment"=>merged_segment.id().to_string()));
let num_merges: usize = self.merge_operations.list().len();
slog::info!(&logger, "merge"; "stage"=>"start", "num-merges" => num_merges);
let (merging_future_send, merging_future_recv) =
oneshot::channel::<crate::Result<SegmentMeta>>();
@@ -398,22 +406,20 @@ impl SegmentUpdater {
// as well as which segment is currently in merge and therefore should not be
// candidate for another merge.
match merge(
merged_segment,
&segment_updater.index,
segment_entries,
merge_operation.target_opstamp(),
) {
Ok(after_merge_segment_entry) => {
info!(&logger, "merge"; "stage" => "end");
let segment_meta = segment_updater
.end_merge(merge_operation, after_merge_segment_entry)
.await;
let _send_result = merging_future_send.send(segment_meta);
}
Err(e) => {
warn!(
"Merge of {:?} was cancelled: {:?}",
merge_operation.segment_ids().to_vec(),
e
);
error!(&logger, "merge"; "stage" => "fail", "cause"=>e.to_string());
// ... cancel merge
if cfg!(test) {
panic!("Merge failed.");
@@ -454,11 +460,12 @@ impl SegmentUpdater {
.collect::<Vec<_>>();
merge_candidates.extend(committed_merge_candidates.into_iter());
let logger = self.index.logger();
for merge_operation in merge_candidates {
if let Err(err) = self.start_merge(merge_operation) {
warn!(
"Starting the merge failed for the following reason. This is not fatal. {}",
err
logger,
"merge-start-fail (not fatal, not necessarily a problem)"; "reason" => format!("{}", err),
);
}
}
@@ -471,8 +478,11 @@ impl SegmentUpdater {
) -> impl Future<Output = crate::Result<SegmentMeta>> {
let segment_updater = self.clone();
let after_merge_segment_meta = after_merge_segment_entry.meta().clone();
let logger = self.index.logger().new(
slog::o!("segment"=>after_merge_segment_meta.id().to_string(),
"delete-opstamp"=>after_merge_segment_meta.delete_opstamp()),
);
let end_merge_future = self.schedule_future(async move {
info!("End merge {:?}", after_merge_segment_entry.meta());
{
let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone();
if let Some(delete_operation) = delete_cursor.get() {
@@ -486,6 +496,7 @@ impl SegmentUpdater {
committed_opstamp,
) {
error!(
logger,
"Merge of {:?} was cancelled (advancing deletes failed): {:?}",
merge_operation.segment_ids(),
e

View File

@@ -1,5 +1,4 @@
use super::operation::AddOperation;
use crate::core::Segment;
use crate::core::SerializableSegment;
use crate::fastfield::FastFieldsWriter;
use crate::fieldnorm::{FieldNormReaders, FieldNormsWriter};
@@ -15,6 +14,7 @@ use crate::tokenizer::{BoxTokenStream, PreTokenizedStream};
use crate::tokenizer::{FacetTokenizer, TextAnalyzer};
use crate::tokenizer::{TokenStreamChain, Tokenizer};
use crate::Opstamp;
use crate::{core::Segment, tokenizer::MAX_TOKEN_LEN};
use crate::{DocId, SegmentComponent};
use std::io;
@@ -146,6 +146,9 @@ impl SegmentWriter {
for fake_str in facets {
let mut unordered_term_id_opt = None;
FacetTokenizer.token_stream(fake_str).process(&mut |token| {
if token.text.len() > MAX_TOKEN_LEN {
return;
}
term_buffer.set_text(&token.text);
let unordered_term_id =
multifield_postings.subscribe(doc_id, &term_buffer);

View File

@@ -101,9 +101,6 @@
#[cfg_attr(test, macro_use)]
extern crate serde_json;
#[macro_use]
extern crate log;
#[macro_use]
extern crate thiserror;
@@ -148,6 +145,7 @@ pub mod schema;
pub mod space_usage;
pub mod store;
pub mod termdict;
pub use slog;
mod reader;

View File

@@ -227,23 +227,17 @@ pub trait PostingsWriter {
term_buffer.set_field(field);
let mut sink = |token: &Token| {
// We skip all tokens with a len greater than u16.
if token.text.len() <= MAX_TOKEN_LEN {
term_buffer.set_text(token.text.as_str());
self.subscribe(
term_index,
doc_id,
token.position as u32,
&term_buffer,
heap,
);
} else {
info!(
"A token exceeding MAX_TOKEN_LEN ({}>{}) was dropped. Search for \
MAX_TOKEN_LEN in the documentation for more information.",
token.text.len(),
MAX_TOKEN_LEN
);
if token.text.len() > MAX_TOKEN_LEN {
return;
}
term_buffer.set_text(token.text.as_str());
self.subscribe(
term_index,
doc_id,
token.position as u32,
&term_buffer,
heap,
);
};
token_stream.process(&mut sink)
}

View File

@@ -1,5 +1,7 @@
mod pool;
use slog::error;
pub use self::pool::LeasedItem;
use self::pool::Pool;
use crate::core::Segment;
@@ -62,6 +64,7 @@ impl IndexReaderBuilder {
/// to open different segment readers. It may take hundreds of milliseconds
/// of time and it may return an error.
pub fn try_into(self) -> crate::Result<IndexReader> {
let logger = self.index.logger().clone();
let inner_reader = InnerIndexReader {
index: self.index,
num_searchers: self.num_searchers,
@@ -80,8 +83,8 @@ impl IndexReaderBuilder {
let callback = move || {
if let Err(err) = inner_reader_arc_clone.reload() {
error!(
"Error while loading searcher after commit was detected. {:?}",
err
logger,
"Error while loading searcher after commit was detected. {:?}", err
);
}
};