Compare commits

...

1 Commits

Author SHA1 Message Date
Paul Masurel
1c81b8171f Switch to slog
Closes #111
2020-09-30 19:55:54 +09:00
21 changed files with 310 additions and 121 deletions

View File

@@ -1,7 +1,8 @@
Tantivy 0.14.0 Tantivy 0.14.0
========================= =========================
- Remove dependency to atomicwrites #833 .Implemented by @pmasurel upon suggestion and research from @asafigan). - Remove dependency to atomicwrites #833. Implemented by @pmasurel upon suggestion and research from @asafigan).
- Migrated tantivy error from the now deprecated `failure` crate to `thiserror` #760. (@hirevo) - Migrated tantivy error from the now deprecated `failure` crate to `thiserror` #760. (@hirevo)
- Switched to structure logging (via the `slog` crate). (@pmasurel)
Tantivy 0.13.1 Tantivy 0.13.1
=================== ===================

View File

@@ -23,7 +23,8 @@ memmap = {version = "0.7", optional=true}
lz4 = {version="1", optional=true} lz4 = {version="1", optional=true}
snap = "1" snap = "1"
tempfile = {version="3", optional=true} tempfile = {version="3", optional=true}
log = "0.4" slog = "2.5"
slog-stdlog = "4"
serde = {version="1", features=["derive"]} serde = {version="1", features=["derive"]}
serde_json = "1" serde_json = "1"
num_cpus = "1" num_cpus = "1"

View File

@@ -1,5 +1,6 @@
use crossbeam::channel; use crossbeam::channel;
use rayon::{ThreadPool, ThreadPoolBuilder}; use rayon::{ThreadPool, ThreadPoolBuilder};
use slog::{error, Logger};
/// Search executor whether search request are single thread or multithread. /// Search executor whether search request are single thread or multithread.
/// ///
@@ -43,6 +44,7 @@ impl Executor {
&self, &self,
f: F, f: F,
args: AIterator, args: AIterator,
logger: Logger,
) -> crate::Result<Vec<R>> { ) -> crate::Result<Vec<R>> {
match self { match self {
Executor::SingleThread => args.map(f).collect::<crate::Result<_>>(), Executor::SingleThread => args.map(f).collect::<crate::Result<_>>(),
@@ -57,7 +59,7 @@ impl Executor {
let (idx, arg) = arg_with_idx; let (idx, arg) = arg_with_idx;
let fruit = f(arg); let fruit = f(arg);
if let Err(err) = fruit_sender.send((idx, fruit)) { if let Err(err) = fruit_sender.send((idx, fruit)) {
error!("Failed to send search task. It probably means all search threads have panicked. {:?}", err); error!(logger, "Failed to send search task. It probably means all search threads have panicked. {:?}", err);
} }
}); });
} }
@@ -87,17 +89,21 @@ impl Executor {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use slog::{o, Discard, Logger};
use super::Executor; use super::Executor;
#[test] #[test]
#[should_panic(expected = "panic should propagate")] #[should_panic(expected = "panic should propagate")]
fn test_panic_propagates_single_thread() { fn test_panic_propagates_single_thread() {
let logger = Logger::root(Discard, o!());
let _result: Vec<usize> = Executor::single_thread() let _result: Vec<usize> = Executor::single_thread()
.map( .map(
|_| { |_| {
panic!("panic should propagate"); panic!("panic should propagate");
}, },
vec![0].into_iter(), vec![0].into_iter(),
logger,
) )
.unwrap(); .unwrap();
} }
@@ -105,6 +111,7 @@ mod tests {
#[test] #[test]
#[should_panic] //< unfortunately the panic message is not propagated #[should_panic] //< unfortunately the panic message is not propagated
fn test_panic_propagates_multi_thread() { fn test_panic_propagates_multi_thread() {
let logger = Logger::root(Discard, o!());
let _result: Vec<usize> = Executor::multi_thread(1, "search-test") let _result: Vec<usize> = Executor::multi_thread(1, "search-test")
.unwrap() .unwrap()
.map( .map(
@@ -112,14 +119,16 @@ mod tests {
panic!("panic should propagate"); panic!("panic should propagate");
}, },
vec![0].into_iter(), vec![0].into_iter(),
logger,
) )
.unwrap(); .unwrap();
} }
#[test] #[test]
fn test_map_singlethread() { fn test_map_singlethread() {
let logger = Logger::root(Discard, o!());
let result: Vec<usize> = Executor::single_thread() let result: Vec<usize> = Executor::single_thread()
.map(|i| Ok(i * 2), 0..1_000) .map(|i| Ok(i * 2), 0..1_000, logger)
.unwrap(); .unwrap();
assert_eq!(result.len(), 1_000); assert_eq!(result.len(), 1_000);
for i in 0..1_000 { for i in 0..1_000 {
@@ -129,9 +138,10 @@ mod tests {
#[test] #[test]
fn test_map_multithread() { fn test_map_multithread() {
let logger = Logger::root(Discard, o!());
let result: Vec<usize> = Executor::multi_thread(3, "search-test") let result: Vec<usize> = Executor::multi_thread(3, "search-test")
.unwrap() .unwrap()
.map(|i| Ok(i * 2), 0..10) .map(|i| Ok(i * 2), 0..10, logger)
.unwrap(); .unwrap();
assert_eq!(result.len(), 10); assert_eq!(result.len(), 10);
for i in 0..10 { for i in 0..10 {

View File

@@ -21,6 +21,7 @@ use crate::schema::FieldType;
use crate::schema::Schema; use crate::schema::Schema;
use crate::tokenizer::{TextAnalyzer, TokenizerManager}; use crate::tokenizer::{TextAnalyzer, TokenizerManager};
use crate::IndexWriter; use crate::IndexWriter;
use slog::Logger;
use std::borrow::BorrowMut; use std::borrow::BorrowMut;
use std::collections::HashSet; use std::collections::HashSet;
use std::fmt; use std::fmt;
@@ -57,6 +58,11 @@ pub struct Index {
} }
impl Index { impl Index {
pub(crate) fn logger(&self) -> &Logger {
self.directory.logger()
}
/// Examines the directory to see if it contains an index. /// Examines the directory to see if it contains an index.
/// ///
/// Effectively, it only checks for the presence of the `meta.json` file. /// Effectively, it only checks for the presence of the `meta.json` file.
@@ -147,13 +153,13 @@ impl Index {
/// If a directory previously existed, it will be erased. /// If a directory previously existed, it will be erased.
pub fn create<Dir: Directory>(dir: Dir, schema: Schema) -> crate::Result<Index> { pub fn create<Dir: Directory>(dir: Dir, schema: Schema) -> crate::Result<Index> {
let directory = ManagedDirectory::wrap(dir)?; let directory = ManagedDirectory::wrap(dir)?;
Index::from_directory(directory, schema) Index::new_from_directory(directory, schema)
} }
/// Create a new index from a directory. /// Create a new index from a directory.
/// ///
/// This will overwrite existing meta.json /// This will overwrite existing meta.json
fn from_directory(mut directory: ManagedDirectory, schema: Schema) -> crate::Result<Index> { fn new_from_directory(mut directory: ManagedDirectory, schema: Schema) -> crate::Result<Index> {
save_new_metas(schema.clone(), directory.borrow_mut())?; save_new_metas(schema.clone(), directory.borrow_mut())?;
let metas = IndexMeta::with_schema(schema); let metas = IndexMeta::with_schema(schema);
Index::create_from_metas(directory, &metas, SegmentMetaInventory::default()) Index::create_from_metas(directory, &metas, SegmentMetaInventory::default())
@@ -244,6 +250,8 @@ impl Index {
/// Open the index using the provided directory /// Open the index using the provided directory
pub fn open<D: Directory>(directory: D) -> crate::Result<Index> { pub fn open<D: Directory>(directory: D) -> crate::Result<Index> {
let logger: &Logger = directory.logger();
slog::info!(logger, "index-open"; "directory" => format!("{:?}", directory));
let directory = ManagedDirectory::wrap(directory)?; let directory = ManagedDirectory::wrap(directory)?;
let inventory = SegmentMetaInventory::default(); let inventory = SegmentMetaInventory::default();
let metas = load_metas(&directory, &inventory)?; let metas = load_metas(&directory, &inventory)?;

View File

@@ -143,6 +143,7 @@ impl Searcher {
collector.collect_segment(weight.as_ref(), segment_ord as u32, segment_reader) collector.collect_segment(weight.as_ref(), segment_ord as u32, segment_reader)
}, },
segment_readers.iter().enumerate(), segment_readers.iter().enumerate(),
self.index.logger().clone(),
)?; )?;
collector.merge_fruits(fruits) collector.merge_fruits(fruits)
} }

View File

@@ -21,6 +21,12 @@ use std::sync::atomic;
#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] #[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct SegmentId(Uuid); pub struct SegmentId(Uuid);
impl ToString for SegmentId {
fn to_string(&self) -> String {
self.short_uuid_string()
}
}
#[cfg(test)] #[cfg(test)]
static AUTO_INC_COUNTER: Lazy<atomic::AtomicUsize> = Lazy::new(|| atomic::AtomicUsize::default()); static AUTO_INC_COUNTER: Lazy<atomic::AtomicUsize> = Lazy::new(|| atomic::AtomicUsize::default());

View File

@@ -17,6 +17,7 @@ use crate::store::StoreReader;
use crate::termdict::TermDictionary; use crate::termdict::TermDictionary;
use crate::DocId; use crate::DocId;
use fail::fail_point; use fail::fail_point;
use slog::{warn, Logger};
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt; use std::fmt;
use std::sync::Arc; use std::sync::Arc;
@@ -53,6 +54,7 @@ pub struct SegmentReader {
store_source: ReadOnlySource, store_source: ReadOnlySource,
delete_bitset_opt: Option<DeleteBitSet>, delete_bitset_opt: Option<DeleteBitSet>,
schema: Schema, schema: Schema,
logger: Logger,
} }
impl SegmentReader { impl SegmentReader {
@@ -200,6 +202,7 @@ impl SegmentReader {
positions_composite, positions_composite,
positions_idx_composite, positions_idx_composite,
schema, schema,
logger: segment.index().logger().clone(),
}) })
} }
@@ -229,7 +232,11 @@ impl SegmentReader {
let record_option_opt = field_type.get_index_record_option(); let record_option_opt = field_type.get_index_record_option();
if record_option_opt.is_none() { if record_option_opt.is_none() {
warn!("Field {:?} does not seem indexed.", field_entry.name()); warn!(
self.logger,
"Field {:?} does not seem indexed.",
field_entry.name()
);
} }
let postings_source_opt = self.postings_composite.open_read(field); let postings_source_opt = self.postings_composite.open_read(field);

View File

@@ -1,3 +1,5 @@
use slog::{error, Logger};
use crate::directory::directory_lock::Lock; use crate::directory::directory_lock::Lock;
use crate::directory::error::LockError; use crate::directory::error::LockError;
use crate::directory::error::{DeleteError, OpenReadError, OpenWriteError}; use crate::directory::error::{DeleteError, OpenReadError, OpenWriteError};
@@ -64,7 +66,10 @@ impl<T: Send + Sync + 'static> From<Box<T>> for DirectoryLock {
impl Drop for DirectoryLockGuard { impl Drop for DirectoryLockGuard {
fn drop(&mut self) { fn drop(&mut self) {
if let Err(e) = self.directory.delete(&*self.path) { if let Err(e) = self.directory.delete(&*self.path) {
error!("Failed to remove the lock file. {:?}", e); error!(
self.directory.logger(),
"Failed to remove the lock file. {:?}", e
);
} }
} }
} }
@@ -209,6 +214,9 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
/// `OnCommit` `ReloadPolicy`. Not implementing watch in a `Directory` only prevents the /// `OnCommit` `ReloadPolicy`. Not implementing watch in a `Directory` only prevents the
/// `OnCommit` `ReloadPolicy` to work properly. /// `OnCommit` `ReloadPolicy` to work properly.
fn watch(&self, watch_callback: WatchCallback) -> crate::Result<WatchHandle>; fn watch(&self, watch_callback: WatchCallback) -> crate::Result<WatchHandle>;
/// Returns the `slog::Logger` configured for the `Directory`.
fn logger(&self) -> &Logger;
} }
/// DirectoryClone /// DirectoryClone

View File

@@ -11,9 +11,9 @@ use crate::error::DataCorruption;
use crate::Directory; use crate::Directory;
use crc32fast::Hasher; use crc32fast::Hasher;
use slog::{debug, error, info};
use std::collections::HashSet; use std::collections::HashSet;
use std::io; use std::io;
use std::io::Write;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::result; use std::result;
use std::sync::RwLockWriteGuard; use std::sync::RwLockWriteGuard;
@@ -56,9 +56,9 @@ fn save_managed_paths(
directory: &mut dyn Directory, directory: &mut dyn Directory,
wlock: &RwLockWriteGuard<'_, MetaInformation>, wlock: &RwLockWriteGuard<'_, MetaInformation>,
) -> io::Result<()> { ) -> io::Result<()> {
let mut w = serde_json::to_vec(&wlock.managed_paths)?; let mut managed_json = serde_json::to_string_pretty(&wlock.managed_paths)?;
writeln!(&mut w)?; managed_json.push_str("\n");
directory.atomic_write(&MANAGED_FILEPATH, &w[..])?; directory.atomic_write(&MANAGED_FILEPATH, managed_json.as_bytes())?;
Ok(()) Ok(())
} }
@@ -118,7 +118,7 @@ impl ManagedDirectory {
&mut self, &mut self,
get_living_files: L, get_living_files: L,
) -> crate::Result<GarbageCollectionResult> { ) -> crate::Result<GarbageCollectionResult> {
info!("Garbage collect"); info!(self.directory.logger(), "gc"; "stage"=>"start");
let mut files_to_delete = vec![]; let mut files_to_delete = vec![];
// It is crucial to get the living files after acquiring the // It is crucial to get the living files after acquiring the
@@ -153,7 +153,7 @@ impl ManagedDirectory {
} }
} }
Err(err) => { Err(err) => {
error!("Failed to acquire lock for GC"); error!(self.logger(), "Failed to acquire lock for GC");
return Err(crate::TantivyError::from(err)); return Err(crate::TantivyError::from(err));
} }
} }
@@ -165,7 +165,7 @@ impl ManagedDirectory {
for file_to_delete in files_to_delete { for file_to_delete in files_to_delete {
match self.delete(&file_to_delete) { match self.delete(&file_to_delete) {
Ok(_) => { Ok(_) => {
info!("Deleted {:?}", file_to_delete); debug!(self.logger(), "deleted-success"; "file"=>format!("{:?}", file_to_delete));
deleted_files.push(file_to_delete); deleted_files.push(file_to_delete);
} }
Err(file_error) => { Err(file_error) => {
@@ -178,7 +178,7 @@ impl ManagedDirectory {
if !cfg!(target_os = "windows") { if !cfg!(target_os = "windows") {
// On windows, delete is expected to fail if the file // On windows, delete is expected to fail if the file
// is mmapped. // is mmapped.
error!("Failed to delete {:?}", file_to_delete); error!(self.logger(), "delete-file-fail"; "path"=>file_to_delete.to_str().unwrap_or("<invalid-utf8>"));
} }
} }
} }
@@ -200,6 +200,10 @@ impl ManagedDirectory {
save_managed_paths(self.directory.as_mut(), &meta_informations_wlock)?; save_managed_paths(self.directory.as_mut(), &meta_informations_wlock)?;
} }
info!(self.directory.logger(), "gc"; "stage"=>"end",
"num-sucess-file-deletes"=>deleted_files.len(),
"num-failed-file-deletes"=>failed_to_delete_files.len());
Ok(GarbageCollectionResult { Ok(GarbageCollectionResult {
deleted_files, deleted_files,
failed_to_delete_files, failed_to_delete_files,
@@ -274,6 +278,7 @@ impl ManagedDirectory {
impl Directory for ManagedDirectory { impl Directory for ManagedDirectory {
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> { fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> {
slog::debug!(self.logger(), "open-read"; "path" => path.to_str().unwrap_or("<invalid-utf8>"));
let read_only_source = self.directory.open_read(path)?; let read_only_source = self.directory.open_read(path)?;
let (footer, reader) = Footer::extract_footer(read_only_source).map_err(|io_error| { let (footer, reader) = Footer::extract_footer(read_only_source).map_err(|io_error| {
OpenReadError::IOError { OpenReadError::IOError {
@@ -286,6 +291,7 @@ impl Directory for ManagedDirectory {
} }
fn open_write(&mut self, path: &Path) -> result::Result<WritePtr, OpenWriteError> { fn open_write(&mut self, path: &Path) -> result::Result<WritePtr, OpenWriteError> {
slog::debug!(self.logger(), "open-write"; "path" => path.to_str().unwrap_or("<invalid-utf8>"));
self.register_file_as_managed(path) self.register_file_as_managed(path)
.map_err(|io_error| OpenWriteError::IOError { .map_err(|io_error| OpenWriteError::IOError {
io_error, io_error,
@@ -300,9 +306,11 @@ impl Directory for ManagedDirectory {
)))) ))))
} }
fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> { fn atomic_write(&mut self, path: &Path, content: &[u8]) -> io::Result<()> {
let content_str = std::str::from_utf8(content).unwrap_or("<content-not-utf-8>");
slog::debug!(self.logger(), "Atomic write"; "path" => format!("{:?}", path), "content_length"=>content_str);
self.register_file_as_managed(path)?; self.register_file_as_managed(path)?;
self.directory.atomic_write(path, data) self.directory.atomic_write(path, content)
} }
fn atomic_read(&self, path: &Path) -> result::Result<Vec<u8>, OpenReadError> { fn atomic_read(&self, path: &Path) -> result::Result<Vec<u8>, OpenReadError> {
@@ -324,6 +332,10 @@ impl Directory for ManagedDirectory {
fn watch(&self, watch_callback: WatchCallback) -> crate::Result<WatchHandle> { fn watch(&self, watch_callback: WatchCallback) -> crate::Result<WatchHandle> {
self.directory.watch(watch_callback) self.directory.watch(watch_callback)
} }
fn logger(&self) -> &slog::Logger {
self.directory.logger()
}
} }
impl Clone for ManagedDirectory { impl Clone for ManagedDirectory {

View File

@@ -17,6 +17,8 @@ use notify::RawEvent;
use notify::RecursiveMode; use notify::RecursiveMode;
use notify::Watcher; use notify::Watcher;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use slog::{debug, o, Drain, Logger};
use slog_stdlog::StdLog;
use std::collections::HashMap; use std::collections::HashMap;
use std::convert::From; use std::convert::From;
use std::fmt; use std::fmt;
@@ -34,11 +36,6 @@ use std::sync::Weak;
use std::thread; use std::thread;
use tempfile::TempDir; use tempfile::TempDir;
/// Create a default io error given a string.
pub(crate) fn make_io_err(msg: String) -> io::Error {
io::Error::new(io::ErrorKind::Other, msg)
}
/// Returns None iff the file exists, can be read, but is empty (and hence /// Returns None iff the file exists, can be read, but is empty (and hence
/// cannot be mmapped) /// cannot be mmapped)
fn open_mmap(full_path: &Path) -> result::Result<Option<Mmap>, OpenReadError> { fn open_mmap(full_path: &Path) -> result::Result<Option<Mmap>, OpenReadError> {
@@ -149,7 +146,7 @@ struct WatcherWrapper {
} }
impl WatcherWrapper { impl WatcherWrapper {
pub fn new(path: &Path) -> Result<Self, OpenDirectoryError> { pub(crate) fn new(path: &Path, logger: Logger) -> Result<Self, OpenDirectoryError> {
let (tx, watcher_recv): (Sender<RawEvent>, Receiver<RawEvent>) = channel(); let (tx, watcher_recv): (Sender<RawEvent>, Receiver<RawEvent>) = channel();
// We need to initialize the // We need to initialize the
let watcher = notify::raw_watcher(tx) let watcher = notify::raw_watcher(tx)
@@ -163,7 +160,8 @@ impl WatcherWrapper {
panic!("Unknown error while starting watching directory {:?}", path); panic!("Unknown error while starting watching directory {:?}", path);
} }
})?; })?;
let watcher_router: Arc<WatchCallbackList> = Default::default(); let watcher_router: Arc<WatchCallbackList> =
Arc::new(WatchCallbackList::with_logger(logger));
let watcher_router_clone = watcher_router.clone(); let watcher_router_clone = watcher_router.clone();
thread::Builder::new() thread::Builder::new()
.name("meta-file-watch-thread".to_string()) .name("meta-file-watch-thread".to_string())
@@ -226,15 +224,21 @@ struct MmapDirectoryInner {
mmap_cache: RwLock<MmapCache>, mmap_cache: RwLock<MmapCache>,
_temp_directory: Option<TempDir>, _temp_directory: Option<TempDir>,
watcher: RwLock<Option<WatcherWrapper>>, watcher: RwLock<Option<WatcherWrapper>>,
logger: Logger,
} }
impl MmapDirectoryInner { impl MmapDirectoryInner {
fn new(root_path: PathBuf, temp_directory: Option<TempDir>) -> MmapDirectoryInner { fn new(
root_path: PathBuf,
temp_directory: Option<TempDir>,
logger: Logger,
) -> MmapDirectoryInner {
MmapDirectoryInner { MmapDirectoryInner {
root_path, root_path,
mmap_cache: Default::default(), mmap_cache: Default::default(),
_temp_directory: temp_directory, _temp_directory: temp_directory,
watcher: RwLock::new(None), watcher: RwLock::new(None),
logger,
} }
} }
@@ -246,7 +250,7 @@ impl MmapDirectoryInner {
// The downside is that we might create a watch wrapper that is not useful. // The downside is that we might create a watch wrapper that is not useful.
let need_initialization = self.watcher.read().unwrap().is_none(); let need_initialization = self.watcher.read().unwrap().is_none();
if need_initialization { if need_initialization {
let watch_wrapper = WatcherWrapper::new(&self.root_path)?; let watch_wrapper = WatcherWrapper::new(&self.root_path, self.logger.clone())?;
let mut watch_wlock = self.watcher.write().unwrap(); let mut watch_wlock = self.watcher.write().unwrap();
// the watcher could have been initialized when we released the lock, and // the watcher could have been initialized when we released the lock, and
// we do not want to lose the watched files that were set. // we do not want to lose the watched files that were set.
@@ -269,8 +273,8 @@ impl fmt::Debug for MmapDirectory {
} }
impl MmapDirectory { impl MmapDirectory {
fn new(root_path: PathBuf, temp_directory: Option<TempDir>) -> MmapDirectory { fn new(root_path: PathBuf, temp_directory: Option<TempDir>, logger: Logger) -> MmapDirectory {
let inner = MmapDirectoryInner::new(root_path, temp_directory); let inner = MmapDirectoryInner::new(root_path, temp_directory, logger);
MmapDirectory { MmapDirectory {
inner: Arc::new(inner), inner: Arc::new(inner),
} }
@@ -282,17 +286,18 @@ impl MmapDirectory {
/// For your unit tests, prefer the RAMDirectory. /// For your unit tests, prefer the RAMDirectory.
pub fn create_from_tempdir() -> Result<MmapDirectory, OpenDirectoryError> { pub fn create_from_tempdir() -> Result<MmapDirectory, OpenDirectoryError> {
let tempdir = TempDir::new().map_err(OpenDirectoryError::FailedToCreateTempDir)?; let tempdir = TempDir::new().map_err(OpenDirectoryError::FailedToCreateTempDir)?;
Ok(MmapDirectory::new( let logger = Logger::root(StdLog.fuse(), o!());
tempdir.path().to_path_buf(), Ok(MmapDirectory::new(tempdir.path().to_owned(), Some(tempdir), logger))
Some(tempdir),
))
} }
/// Opens a MmapDirectory in a directory. /// Opens a MmapDirectory in a directory.
/// ///
/// Returns an error if the `directory_path` does not /// Returns an error if the `directory_path` does not
/// exist or if it is not a directory. /// exist or if it is not a directory.
pub fn open<P: AsRef<Path>>(directory_path: P) -> Result<MmapDirectory, OpenDirectoryError> { pub fn open_with_logger<P: AsRef<Path>>(
directory_path: P,
logger: Logger,
) -> Result<MmapDirectory, OpenDirectoryError> {
let directory_path: &Path = directory_path.as_ref(); let directory_path: &Path = directory_path.as_ref();
if !directory_path.exists() { if !directory_path.exists() {
Err(OpenDirectoryError::DoesNotExist(PathBuf::from( Err(OpenDirectoryError::DoesNotExist(PathBuf::from(
@@ -303,10 +308,20 @@ impl MmapDirectory {
directory_path, directory_path,
))) )))
} else { } else {
Ok(MmapDirectory::new(PathBuf::from(directory_path), None)) Ok(MmapDirectory::new(
PathBuf::from(directory_path),
None,
logger,
))
} }
} }
/// Creates an `MmapDirectory` at the given path.
pub fn open<P: AsRef<Path>>(directory_path: P) -> Result<MmapDirectory, OpenDirectoryError> {
let logger = Logger::root(StdLog.fuse(), o!());
Self::open_with_logger(directory_path, logger)
}
/// Joins a relative_path to the directory `root_path` /// Joins a relative_path to the directory `root_path`
/// to create a proper complete `filepath`. /// to create a proper complete `filepath`.
fn resolve_path(&self, relative_path: &Path) -> PathBuf { fn resolve_path(&self, relative_path: &Path) -> PathBuf {
@@ -366,11 +381,12 @@ impl MmapDirectory {
struct ReleaseLockFile { struct ReleaseLockFile {
_file: File, _file: File,
path: PathBuf, path: PathBuf,
logger: Logger,
} }
impl Drop for ReleaseLockFile { impl Drop for ReleaseLockFile {
fn drop(&mut self) { fn drop(&mut self) {
debug!("Releasing lock {:?}", self.path); debug!(self.logger, "Releasing lock {:?}", self.path);
} }
} }
@@ -409,17 +425,16 @@ impl TerminatingWrite for SafeFileWriter {
impl Directory for MmapDirectory { impl Directory for MmapDirectory {
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> { fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> {
debug!("Open Read {:?}", path);
let full_path = self.resolve_path(path); let full_path = self.resolve_path(path);
let mut mmap_cache = self.inner.mmap_cache.write().map_err(|_| { let mut mmap_cache = self.inner.mmap_cache.write().map_err(|_| {
let msg = format!( let msg = format!(
"Failed to acquired write lock \ "Failed to acquired write lock \
on mmap cache while reading {:?}", on mmap cache while reading {:?}",
path path
); );
let io_error = io::Error::new(io::ErrorKind::Other, msg);
OpenReadError::IOError { OpenReadError::IOError {
io_error: make_io_err(msg), io_error,
filepath: path.to_owned(), filepath: path.to_owned(),
} }
})?; })?;
@@ -457,9 +472,7 @@ impl Directory for MmapDirectory {
} }
fn open_write(&mut self, path: &Path) -> Result<WritePtr, OpenWriteError> { fn open_write(&mut self, path: &Path) -> Result<WritePtr, OpenWriteError> {
debug!("Open Write {:?}", path);
let full_path = self.resolve_path(path); let full_path = self.resolve_path(path);
let open_res = OpenOptions::new() let open_res = OpenOptions::new()
.write(true) .write(true)
.create_new(true) .create_new(true)
@@ -519,7 +532,6 @@ impl Directory for MmapDirectory {
} }
fn atomic_write(&mut self, path: &Path, content: &[u8]) -> io::Result<()> { fn atomic_write(&mut self, path: &Path, content: &[u8]) -> io::Result<()> {
debug!("Atomic Write {:?}", path);
let mut tempfile = tempfile::Builder::new().tempfile_in(&self.inner.root_path)?; let mut tempfile = tempfile::Builder::new().tempfile_in(&self.inner.root_path)?;
tempfile.write_all(content)?; tempfile.write_all(content)?;
tempfile.flush()?; tempfile.flush()?;
@@ -541,16 +553,22 @@ impl Directory for MmapDirectory {
} else { } else {
file.try_lock_exclusive().map_err(|_| LockError::LockBusy)? file.try_lock_exclusive().map_err(|_| LockError::LockBusy)?
} }
let logger = self.inner.logger.clone();
// dropping the file handle will release the lock. // dropping the file handle will release the lock.
Ok(DirectoryLock::from(Box::new(ReleaseLockFile { Ok(DirectoryLock::from(Box::new(ReleaseLockFile {
path: lock.filepath.clone(), path: lock.filepath.clone(),
_file: file, _file: file,
logger,
}))) })))
} }
fn watch(&self, watch_callback: WatchCallback) -> crate::Result<WatchHandle> { fn watch(&self, watch_callback: WatchCallback) -> crate::Result<WatchHandle> {
self.inner.watch(watch_callback) self.inner.watch(watch_callback)
} }
fn logger(&self) -> &Logger {
&self.inner.logger
}
} }
#[cfg(test)] #[cfg(test)]
@@ -660,7 +678,8 @@ mod tests {
let counter_clone = counter.clone(); let counter_clone = counter.clone();
let tmp_dir = tempfile::TempDir::new().unwrap(); let tmp_dir = tempfile::TempDir::new().unwrap();
let tmp_dirpath = tmp_dir.path().to_owned(); let tmp_dirpath = tmp_dir.path().to_owned();
let mut watch_wrapper = WatcherWrapper::new(&tmp_dirpath).unwrap(); let logger = Logger::root(slog::Discard, o!());
let mut watch_wrapper = WatcherWrapper::new(&tmp_dirpath, logger).unwrap();
let tmp_file = tmp_dirpath.join(*META_FILEPATH); let tmp_file = tmp_dirpath.join(*META_FILEPATH);
let _handle = watch_wrapper.watch(Box::new(move || { let _handle = watch_wrapper.watch(Box::new(move || {
counter_clone.fetch_add(1, Ordering::SeqCst); counter_clone.fetch_add(1, Ordering::SeqCst);

View File

@@ -23,7 +23,8 @@ pub use self::directory::{Directory, DirectoryClone};
pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK}; pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK};
pub use self::ram_directory::RAMDirectory; pub use self::ram_directory::RAMDirectory;
pub use self::read_only_source::ReadOnlySource; pub use self::read_only_source::ReadOnlySource;
pub use self::watch_event_router::{WatchCallback, WatchCallbackList, WatchHandle}; pub(crate) use self::watch_event_router::WatchCallbackList;
pub use self::watch_event_router::{WatchCallback, WatchHandle};
use std::io::{self, BufWriter, Write}; use std::io::{self, BufWriter, Write};
use std::path::PathBuf; use std::path::PathBuf;
/// Outcome of the Garbage collection /// Outcome of the Garbage collection

View File

@@ -5,6 +5,8 @@ use crate::directory::WatchCallbackList;
use crate::directory::{Directory, ReadOnlySource, WatchCallback, WatchHandle}; use crate::directory::{Directory, ReadOnlySource, WatchCallback, WatchHandle};
use crate::directory::{TerminatingWrite, WritePtr}; use crate::directory::{TerminatingWrite, WritePtr};
use fail::fail_point; use fail::fail_point;
use slog::{o, Drain, Logger};
use slog_stdlog::StdLog;
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt; use std::fmt;
use std::io::{self, BufWriter, Cursor, Seek, SeekFrom, Write}; use std::io::{self, BufWriter, Cursor, Seek, SeekFrom, Write};
@@ -66,7 +68,7 @@ impl Write for VecWriter {
fn flush(&mut self) -> io::Result<()> { fn flush(&mut self) -> io::Result<()> {
self.is_flushed = true; self.is_flushed = true;
let mut fs = self.shared_directory.fs.write().unwrap(); let mut fs = self.shared_directory.fs.inner_directory.write().unwrap();
fs.write(self.path.clone(), self.data.get_ref()); fs.write(self.path.clone(), self.data.get_ref());
Ok(()) Ok(())
} }
@@ -78,13 +80,19 @@ impl TerminatingWrite for VecWriter {
} }
} }
#[derive(Default)]
struct InnerDirectory { struct InnerDirectory {
fs: HashMap<PathBuf, ReadOnlySource>, fs: HashMap<PathBuf, ReadOnlySource>,
watch_router: WatchCallbackList, watch_router: WatchCallbackList,
} }
impl InnerDirectory { impl InnerDirectory {
fn with_logger(logger: Logger) -> Self {
InnerDirectory {
fs: Default::default(),
watch_router: WatchCallbackList::with_logger(logger.clone()),
}
}
fn write(&mut self, path: PathBuf, data: &[u8]) -> bool { fn write(&mut self, path: PathBuf, data: &[u8]) -> bool {
let data = ReadOnlySource::new(Vec::from(data)); let data = ReadOnlySource::new(Vec::from(data));
self.fs.insert(path, data).is_some() self.fs.insert(path, data).is_some()
@@ -117,20 +125,32 @@ impl InnerDirectory {
} }
} }
impl Default for RAMDirectory {
fn default() -> RAMDirectory {
let logger = Logger::root(StdLog.fuse(), o!());
Self::with_logger(logger)
}
}
impl fmt::Debug for RAMDirectory { impl fmt::Debug for RAMDirectory {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "RAMDirectory") write!(f, "RAMDirectory")
} }
} }
struct Inner {
inner_directory: RwLock<InnerDirectory>,
logger: Logger,
}
/// A Directory storing everything in anonymous memory. /// A Directory storing everything in anonymous memory.
/// ///
/// It is mainly meant for unit testing. /// It is mainly meant for unit testing.
/// Writes are only made visible upon flushing. /// Writes are only made visible upon flushing.
/// ///
#[derive(Clone, Default)] #[derive(Clone)]
pub struct RAMDirectory { pub struct RAMDirectory {
fs: Arc<RwLock<InnerDirectory>>, fs: Arc<Inner>,
} }
impl RAMDirectory { impl RAMDirectory {
@@ -139,10 +159,21 @@ impl RAMDirectory {
Self::default() Self::default()
} }
/// Create a `RAMDirectory` with a custom logger.
pub fn with_logger(logger: Logger) -> RAMDirectory {
let inner_directory = InnerDirectory::with_logger(logger.clone()).into();
RAMDirectory {
fs: Arc::new(Inner {
inner_directory,
logger,
}),
}
}
/// Returns the sum of the size of the different files /// Returns the sum of the size of the different files
/// in the RAMDirectory. /// in the RAMDirectory.
pub fn total_mem_usage(&self) -> usize { pub fn total_mem_usage(&self) -> usize {
self.fs.read().unwrap().total_mem_usage() self.fs.inner_directory.read().unwrap().total_mem_usage()
} }
/// Write a copy of all of the files saved in the RAMDirectory in the target `Directory`. /// Write a copy of all of the files saved in the RAMDirectory in the target `Directory`.
@@ -152,7 +183,7 @@ impl RAMDirectory {
/// ///
/// If an error is encounterred, files may be persisted partially. /// If an error is encounterred, files may be persisted partially.
pub fn persist(&self, dest: &mut dyn Directory) -> crate::Result<()> { pub fn persist(&self, dest: &mut dyn Directory) -> crate::Result<()> {
let wlock = self.fs.write().unwrap(); let wlock = self.fs.inner_directory.write().unwrap();
for (path, source) in wlock.fs.iter() { for (path, source) in wlock.fs.iter() {
let mut dest_wrt = dest.open_write(path)?; let mut dest_wrt = dest.open_write(path)?;
dest_wrt.write_all(source.as_slice())?; dest_wrt.write_all(source.as_slice())?;
@@ -164,7 +195,7 @@ impl RAMDirectory {
impl Directory for RAMDirectory { impl Directory for RAMDirectory {
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> { fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> {
self.fs.read().unwrap().open_read(path) self.fs.inner_directory.read().unwrap().open_read(path)
} }
fn delete(&self, path: &Path) -> result::Result<(), DeleteError> { fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
@@ -174,15 +205,15 @@ impl Directory for RAMDirectory {
filepath: path.to_path_buf(), filepath: path.to_path_buf(),
}) })
}); });
self.fs.write().unwrap().delete(path) self.fs.inner_directory.write().unwrap().delete(path)
} }
fn exists(&self, path: &Path) -> bool { fn exists(&self, path: &Path) -> bool {
self.fs.read().unwrap().exists(path) self.fs.inner_directory.read().unwrap().exists(path)
} }
fn open_write(&mut self, path: &Path) -> Result<WritePtr, OpenWriteError> { fn open_write(&mut self, path: &Path) -> Result<WritePtr, OpenWriteError> {
let mut fs = self.fs.write().unwrap(); let mut fs = self.fs.inner_directory.write().unwrap();
let path_buf = PathBuf::from(path); let path_buf = PathBuf::from(path);
let vec_writer = VecWriter::new(path_buf.clone(), self.clone()); let vec_writer = VecWriter::new(path_buf.clone(), self.clone());
let exists = fs.write(path_buf.clone(), &[]); let exists = fs.write(path_buf.clone(), &[]);
@@ -206,19 +237,38 @@ impl Directory for RAMDirectory {
let path_buf = PathBuf::from(path); let path_buf = PathBuf::from(path);
// Reserve the path to prevent calls to .write() to succeed. // Reserve the path to prevent calls to .write() to succeed.
self.fs.write().unwrap().write(path_buf.clone(), &[]); self.fs
.inner_directory
.write()
.unwrap()
.write(path_buf.clone(), &[]);
let mut vec_writer = VecWriter::new(path_buf, self.clone()); let mut vec_writer = VecWriter::new(path_buf, self.clone());
vec_writer.write_all(data)?; vec_writer.write_all(data)?;
vec_writer.flush()?; vec_writer.flush()?;
if path == Path::new(&*META_FILEPATH) { if path == Path::new(&*META_FILEPATH) {
let _ = self.fs.write().unwrap().watch_router.broadcast(); let _ = self
.fs
.inner_directory
.write()
.unwrap()
.watch_router
.broadcast();
} }
Ok(()) Ok(())
} }
fn watch(&self, watch_callback: WatchCallback) -> crate::Result<WatchHandle> { fn watch(&self, watch_callback: WatchCallback) -> crate::Result<WatchHandle> {
Ok(self.fs.write().unwrap().watch(watch_callback)) Ok(self
.fs
.inner_directory
.write()
.unwrap()
.watch(watch_callback))
}
fn logger(&self) -> &Logger {
&self.fs.logger
} }
} }

View File

@@ -1,5 +1,6 @@
use futures::channel::oneshot; use futures::channel::oneshot;
use futures::{Future, TryFutureExt}; use futures::{Future, TryFutureExt};
use slog::{error, Logger};
use std::sync::Arc; use std::sync::Arc;
use std::sync::RwLock; use std::sync::RwLock;
use std::sync::Weak; use std::sync::Weak;
@@ -11,9 +12,9 @@ pub type WatchCallback = Box<dyn Fn() + Sync + Send>;
/// ///
/// It registers callbacks (See `.subscribe(...)`) and /// It registers callbacks (See `.subscribe(...)`) and
/// calls them upon calls to `.broadcast(...)`. /// calls them upon calls to `.broadcast(...)`.
#[derive(Default)] pub(crate) struct WatchCallbackList {
pub struct WatchCallbackList {
router: RwLock<Vec<Weak<WatchCallback>>>, router: RwLock<Vec<Weak<WatchCallback>>>,
logger: Logger,
} }
/// Controls how long a directory should watch for a file change. /// Controls how long a directory should watch for a file change.
@@ -32,6 +33,13 @@ impl WatchHandle {
} }
impl WatchCallbackList { impl WatchCallbackList {
pub fn with_logger(logger: Logger) -> Self {
WatchCallbackList {
logger,
router: Default::default(),
}
}
/// Subscribes a new callback and returns a handle that controls the lifetime of the callback. /// Subscribes a new callback and returns a handle that controls the lifetime of the callback.
pub fn subscribe(&self, watch_callback: WatchCallback) -> WatchHandle { pub fn subscribe(&self, watch_callback: WatchCallback) -> WatchHandle {
let watch_callback_arc = Arc::new(watch_callback); let watch_callback_arc = Arc::new(watch_callback);
@@ -74,8 +82,8 @@ impl WatchCallbackList {
}); });
if let Err(err) = spawn_res { if let Err(err) = spawn_res {
error!( error!(
"Failed to spawn thread to call watch callbacks. Cause: {:?}", self.logger,
err "Failed to spawn thread to call watch callbacks. Cause: {:?}", err
); );
} }
result result
@@ -86,13 +94,18 @@ impl WatchCallbackList {
mod tests { mod tests {
use crate::directory::WatchCallbackList; use crate::directory::WatchCallbackList;
use futures::executor::block_on; use futures::executor::block_on;
use slog::{o, Discard, Logger};
use std::mem; use std::mem;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc; use std::sync::Arc;
fn default_watch_callback_list() -> WatchCallbackList {
WatchCallbackList::with_logger(Logger::root(Discard, o!()))
}
#[test] #[test]
fn test_watch_event_router_simple() { fn test_watch_event_router_simple() {
let watch_event_router = WatchCallbackList::default(); let watch_event_router = default_watch_callback_list();
let counter: Arc<AtomicUsize> = Default::default(); let counter: Arc<AtomicUsize> = Default::default();
let counter_clone = counter.clone(); let counter_clone = counter.clone();
let inc_callback = Box::new(move || { let inc_callback = Box::new(move || {
@@ -119,7 +132,7 @@ mod tests {
#[test] #[test]
fn test_watch_event_router_multiple_callback_same_key() { fn test_watch_event_router_multiple_callback_same_key() {
let watch_event_router = WatchCallbackList::default(); let watch_event_router = default_watch_callback_list();
let counter: Arc<AtomicUsize> = Default::default(); let counter: Arc<AtomicUsize> = Default::default();
let inc_callback = |inc: usize| { let inc_callback = |inc: usize| {
let counter_clone = counter.clone(); let counter_clone = counter.clone();
@@ -148,7 +161,7 @@ mod tests {
#[test] #[test]
fn test_watch_event_router_multiple_callback_different_key() { fn test_watch_event_router_multiple_callback_different_key() {
let watch_event_router = WatchCallbackList::default(); let watch_event_router = default_watch_callback_list();
let counter: Arc<AtomicUsize> = Default::default(); let counter: Arc<AtomicUsize> = Default::default();
let counter_clone = counter.clone(); let counter_clone = counter.clone();
let inc_callback = Box::new(move || { let inc_callback = Box::new(move || {

View File

@@ -27,6 +27,7 @@ use crate::Opstamp;
use crossbeam::channel; use crossbeam::channel;
use futures::executor::block_on; use futures::executor::block_on;
use futures::future::Future; use futures::future::Future;
use slog::{error, info, Logger};
use smallvec::smallvec; use smallvec::smallvec;
use smallvec::SmallVec; use smallvec::SmallVec;
use std::mem; use std::mem;
@@ -195,20 +196,21 @@ fn index_documents(
grouped_document_iterator: &mut dyn Iterator<Item = OperationGroup>, grouped_document_iterator: &mut dyn Iterator<Item = OperationGroup>,
segment_updater: &mut SegmentUpdater, segment_updater: &mut SegmentUpdater,
mut delete_cursor: DeleteCursor, mut delete_cursor: DeleteCursor,
logger: &Logger,
) -> crate::Result<bool> { ) -> crate::Result<bool> {
let schema = segment.schema(); let schema = segment.schema();
info!(logger, "segment-index"; "stage"=>"start");
let mut segment_writer = SegmentWriter::for_segment(memory_budget, segment.clone(), &schema)?; let mut segment_writer = SegmentWriter::for_segment(memory_budget, segment.clone(), &schema)?;
let mut buffer_limit_reached = false;
for document_group in grouped_document_iterator { for document_group in grouped_document_iterator {
for doc in document_group { for doc in document_group {
segment_writer.add_document(doc, &schema)?; segment_writer.add_document(doc, &schema)?;
} }
let mem_usage = segment_writer.mem_usage(); let mem_usage = segment_writer.mem_usage();
if mem_usage >= memory_budget - MARGIN_IN_BYTES { if mem_usage >= memory_budget - MARGIN_IN_BYTES {
info!( buffer_limit_reached = true;
"Buffer limit reached, flushing segment with maxdoc={}.",
segment_writer.max_doc()
);
break; break;
} }
} }
@@ -228,6 +230,14 @@ fn index_documents(
let segment_with_max_doc = segment.with_max_doc(max_doc); let segment_with_max_doc = segment.with_max_doc(max_doc);
let last_docstamp: Opstamp = *(doc_opstamps.last().unwrap()); let last_docstamp: Opstamp = *(doc_opstamps.last().unwrap());
info!(
logger,
"segment-index";
"stage" => "serialize",
"cause" => if buffer_limit_reached { "buffer-limit" } else { "commit" },
"maxdoc" => max_doc,
"last_docstamp" => last_docstamp
);
let delete_bitset_opt = apply_deletes( let delete_bitset_opt = apply_deletes(
&segment_with_max_doc, &segment_with_max_doc,
@@ -241,7 +251,18 @@ fn index_documents(
delete_cursor, delete_cursor,
delete_bitset_opt, delete_bitset_opt,
); );
info!(
logger,
"segment-index";
"stage" => "publish",
);
block_on(segment_updater.schedule_add_segment(segment_entry))?; block_on(segment_updater.schedule_add_segment(segment_entry))?;
info!(
logger,
"segment-index";
"stage" => "end",
);
Ok(true) Ok(true)
} }
@@ -344,6 +365,10 @@ impl IndexWriter {
Ok(index_writer) Ok(index_writer)
} }
pub(crate) fn logger(&self) -> &Logger {
self.index.logger()
}
fn drop_sender(&mut self) { fn drop_sender(&mut self) {
let (sender, _receiver) = channel::bounded(1); let (sender, _receiver) = channel::bounded(1);
self.operation_sender = sender; self.operation_sender = sender;
@@ -352,6 +377,8 @@ impl IndexWriter {
/// If there are some merging threads, blocks until they all finish their work and /// If there are some merging threads, blocks until they all finish their work and
/// then drop the `IndexWriter`. /// then drop the `IndexWriter`.
pub fn wait_merging_threads(mut self) -> crate::Result<()> { pub fn wait_merging_threads(mut self) -> crate::Result<()> {
info!(self.logger(), "wait-merge-threads"; "stage"=>"start");
// this will stop the indexing thread, // this will stop the indexing thread,
// dropping the last reference to the segment_updater. // dropping the last reference to the segment_updater.
self.drop_sender(); self.drop_sender();
@@ -372,9 +399,9 @@ impl IndexWriter {
.map_err(|_| TantivyError::ErrorInThread("Failed to join merging thread.".into())); .map_err(|_| TantivyError::ErrorInThread("Failed to join merging thread.".into()));
if let Err(ref e) = result { if let Err(ref e) = result {
error!("Some merging thread failed {:?}", e); error!(self.logger(), "some merge thread failed"; "cause"=>e.to_string());
} }
info!(self.logger(), "wait-merge-threads"; "stage"=>"stop");
result result
} }
@@ -434,12 +461,16 @@ impl IndexWriter {
return Ok(()); return Ok(());
} }
let segment = index.new_segment(); let segment = index.new_segment();
let segment_id = segment.id();
index_documents( index_documents(
mem_budget, mem_budget,
segment, segment,
&mut document_iterator, &mut document_iterator,
&mut segment_updater, &mut segment_updater,
delete_cursor.clone(), delete_cursor.clone(),
&index
.logger()
.new(slog::o!("segment"=>segment_id.to_string())),
)?; )?;
} }
})?; })?;
@@ -553,7 +584,10 @@ impl IndexWriter {
/// ///
/// The opstamp at the last commit is returned. /// The opstamp at the last commit is returned.
pub fn rollback(&mut self) -> crate::Result<Opstamp> { pub fn rollback(&mut self) -> crate::Result<Opstamp> {
info!("Rolling back to opstamp {}", self.committed_opstamp); info!(
self.logger(),
"Rolling back to opstamp {}", self.committed_opstamp
);
// marks the segment updater as killed. From now on, all // marks the segment updater as killed. From now on, all
// segment updates will be ignored. // segment updates will be ignored.
self.segment_updater.kill(); self.segment_updater.kill();
@@ -610,6 +644,8 @@ impl IndexWriter {
/// using this API. /// using this API.
/// See [`PreparedCommit::set_payload()`](PreparedCommit.html) /// See [`PreparedCommit::set_payload()`](PreparedCommit.html)
pub fn prepare_commit(&mut self) -> crate::Result<PreparedCommit> { pub fn prepare_commit(&mut self) -> crate::Result<PreparedCommit> {
let logger = self.logger().clone();
// Here, because we join all of the worker threads, // Here, because we join all of the worker threads,
// all of the segment update for this commit have been // all of the segment update for this commit have been
// sent. // sent.
@@ -620,7 +656,10 @@ impl IndexWriter {
// //
// This will move uncommitted segments to the state of // This will move uncommitted segments to the state of
// committed segments. // committed segments.
info!("Preparing commit");
let commit_opstamp = self.stamper.stamp();
info!(logger, "prepare-commit"; "opstamp" => commit_opstamp);
// this will drop the current document channel // this will drop the current document channel
// and recreate a new one. // and recreate a new one.
@@ -636,9 +675,8 @@ impl IndexWriter {
self.add_indexing_worker()?; self.add_indexing_worker()?;
} }
let commit_opstamp = self.stamper.stamp();
let prepared_commit = PreparedCommit::new(self, commit_opstamp); let prepared_commit = PreparedCommit::new(self, commit_opstamp);
info!("Prepared commit {}", commit_opstamp); info!(logger, "Prepared commit {}", commit_opstamp);
Ok(prepared_commit) Ok(prepared_commit)
} }

View File

@@ -1,6 +1,7 @@
use super::IndexWriter; use super::IndexWriter;
use crate::Opstamp; use crate::Opstamp;
use futures::executor::block_on; use futures::executor::block_on;
use slog::info;
/// A prepared commit /// A prepared commit
pub struct PreparedCommit<'a> { pub struct PreparedCommit<'a> {
@@ -31,7 +32,7 @@ impl<'a> PreparedCommit<'a> {
} }
pub fn commit(self) -> crate::Result<Opstamp> { pub fn commit(self) -> crate::Result<Opstamp> {
info!("committing {}", self.opstamp); info!(self.index_writer.logger(), "committing {}", self.opstamp);
let _ = block_on( let _ = block_on(
self.index_writer self.index_writer
.segment_updater() .segment_updater()

View File

@@ -1,3 +1,5 @@
use slog::{warn, Logger};
use super::segment_register::SegmentRegister; use super::segment_register::SegmentRegister;
use crate::core::SegmentId; use crate::core::SegmentId;
use crate::core::SegmentMeta; use crate::core::SegmentMeta;
@@ -42,9 +44,9 @@ impl SegmentRegisters {
/// ///
/// It guarantees the atomicity of the /// It guarantees the atomicity of the
/// changes (merges especially) /// changes (merges especially)
#[derive(Default)]
pub struct SegmentManager { pub struct SegmentManager {
registers: RwLock<SegmentRegisters>, registers: RwLock<SegmentRegisters>,
logger: Logger,
} }
impl Debug for SegmentManager { impl Debug for SegmentManager {
@@ -77,12 +79,14 @@ impl SegmentManager {
pub fn from_segments( pub fn from_segments(
segment_metas: Vec<SegmentMeta>, segment_metas: Vec<SegmentMeta>,
delete_cursor: &DeleteCursor, delete_cursor: &DeleteCursor,
logger: Logger,
) -> SegmentManager { ) -> SegmentManager {
SegmentManager { SegmentManager {
registers: RwLock::new(SegmentRegisters { registers: RwLock::new(SegmentRegisters {
uncommitted: SegmentRegister::default(), uncommitted: SegmentRegister::default(),
committed: SegmentRegister::new(segment_metas, delete_cursor), committed: SegmentRegister::new(segment_metas, delete_cursor),
}), }),
logger,
} }
} }
@@ -186,7 +190,7 @@ impl SegmentManager {
let segments_status = registers_lock let segments_status = registers_lock
.segments_status(before_merge_segment_ids) .segments_status(before_merge_segment_ids)
.ok_or_else(|| { .ok_or_else(|| {
warn!("couldn't find segment in SegmentManager"); warn!(self.logger, "couldn't find segment in SegmentManager");
crate::TantivyError::InvalidArgument( crate::TantivyError::InvalidArgument(
"The segments that were merged could not be found in the SegmentManager. \ "The segments that were merged could not be found in the SegmentManager. \
This is not necessarily a bug, and can happen after a rollback for instance." This is not necessarily a bug, and can happen after a rollback for instance."

View File

@@ -23,9 +23,9 @@ use futures::channel::oneshot;
use futures::executor::{ThreadPool, ThreadPoolBuilder}; use futures::executor::{ThreadPool, ThreadPoolBuilder};
use futures::future::Future; use futures::future::Future;
use futures::future::TryFutureExt; use futures::future::TryFutureExt;
use slog::{debug, error, info, warn};
use std::borrow::BorrowMut; use std::borrow::BorrowMut;
use std::collections::HashSet; use std::collections::HashSet;
use std::io::Write;
use std::ops::Deref; use std::ops::Deref;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
@@ -65,12 +65,11 @@ pub fn save_new_metas(schema: Schema, directory: &mut dyn Directory) -> crate::R
/// ///
/// This method is not part of tantivy's public API /// This method is not part of tantivy's public API
fn save_metas(metas: &IndexMeta, directory: &mut dyn Directory) -> crate::Result<()> { fn save_metas(metas: &IndexMeta, directory: &mut dyn Directory) -> crate::Result<()> {
info!("save metas"); let mut meta_json = serde_json::to_string_pretty(metas)?;
let mut buffer = serde_json::to_vec_pretty(metas)?;
// Just adding a new line at the end of the buffer. // Just adding a new line at the end of the buffer.
writeln!(&mut buffer)?; meta_json.push_str("\n");
directory.atomic_write(&META_FILEPATH, &buffer[..])?; debug!(directory.logger(), "save meta"; "content"=>&meta_json);
debug!("Saved metas {:?}", serde_json::to_string_pretty(&metas)); directory.atomic_write(&META_FILEPATH, meta_json.as_bytes())?;
Ok(()) Ok(())
} }
@@ -97,7 +96,6 @@ impl Deref for SegmentUpdater {
async fn garbage_collect_files( async fn garbage_collect_files(
segment_updater: SegmentUpdater, segment_updater: SegmentUpdater,
) -> crate::Result<GarbageCollectionResult> { ) -> crate::Result<GarbageCollectionResult> {
info!("Running garbage collection");
let mut index = segment_updater.index.clone(); let mut index = segment_updater.index.clone();
index index
.directory_mut() .directory_mut()
@@ -107,14 +105,12 @@ async fn garbage_collect_files(
/// Merges a list of segments the list of segment givens in the `segment_entries`. /// Merges a list of segments the list of segment givens in the `segment_entries`.
/// This function happens in the calling thread and is computationally expensive. /// This function happens in the calling thread and is computationally expensive.
fn merge( fn merge(
merged_segment: Segment,
index: &Index, index: &Index,
mut segment_entries: Vec<SegmentEntry>, mut segment_entries: Vec<SegmentEntry>,
target_opstamp: Opstamp, target_opstamp: Opstamp,
) -> crate::Result<SegmentEntry> { ) -> crate::Result<SegmentEntry> {
// first we need to apply deletes to our segment. // First we apply all of the delete to the merged segment, up to the target opstamp.
let merged_segment = index.new_segment();
// First we apply all of the delet to the merged segment, up to the target opstamp.
for segment_entry in &mut segment_entries { for segment_entry in &mut segment_entries {
let segment = index.segment(segment_entry.meta().clone()); let segment = index.segment(segment_entry.meta().clone());
advance_deletes(segment, segment_entry, target_opstamp)?; advance_deletes(segment, segment_entry, target_opstamp)?;
@@ -167,7 +163,8 @@ impl SegmentUpdater {
delete_cursor: &DeleteCursor, delete_cursor: &DeleteCursor,
) -> crate::Result<SegmentUpdater> { ) -> crate::Result<SegmentUpdater> {
let segments = index.searchable_segment_metas()?; let segments = index.searchable_segment_metas()?;
let segment_manager = SegmentManager::from_segments(segments, delete_cursor); let segment_manager =
SegmentManager::from_segments(segments, delete_cursor, index.logger().clone());
let pool = ThreadPoolBuilder::new() let pool = ThreadPoolBuilder::new()
.name_prefix("segment_updater") .name_prefix("segment_updater")
.pool_size(1) .pool_size(1)
@@ -387,7 +384,18 @@ impl SegmentUpdater {
.segment_manager .segment_manager
.start_merge(merge_operation.segment_ids())?; .start_merge(merge_operation.segment_ids())?;
info!("Starting merge - {:?}", merge_operation.segment_ids()); let segment_ids_str: String = merge_operation
.segment_ids()
.iter()
.map(|segment_id| segment_id.to_string())
.collect::<Vec<String>>()
.join(",");
let merged_segment = self.index.new_segment();
let logger = self.index.logger().new(slog::o!("segments"=>segment_ids_str, "merged-segment"=>merged_segment.id().to_string()));
let num_merges: usize = self.merge_operations.list().len();
slog::info!(&logger, "merge"; "stage"=>"start", "num-merges" => num_merges);
let (merging_future_send, merging_future_recv) = let (merging_future_send, merging_future_recv) =
oneshot::channel::<crate::Result<SegmentMeta>>(); oneshot::channel::<crate::Result<SegmentMeta>>();
@@ -398,22 +406,20 @@ impl SegmentUpdater {
// as well as which segment is currently in merge and therefore should not be // as well as which segment is currently in merge and therefore should not be
// candidate for another merge. // candidate for another merge.
match merge( match merge(
merged_segment,
&segment_updater.index, &segment_updater.index,
segment_entries, segment_entries,
merge_operation.target_opstamp(), merge_operation.target_opstamp(),
) { ) {
Ok(after_merge_segment_entry) => { Ok(after_merge_segment_entry) => {
info!(&logger, "merge"; "stage" => "end");
let segment_meta = segment_updater let segment_meta = segment_updater
.end_merge(merge_operation, after_merge_segment_entry) .end_merge(merge_operation, after_merge_segment_entry)
.await; .await;
let _send_result = merging_future_send.send(segment_meta); let _send_result = merging_future_send.send(segment_meta);
} }
Err(e) => { Err(e) => {
warn!( error!(&logger, "merge"; "stage" => "fail", "cause"=>e.to_string());
"Merge of {:?} was cancelled: {:?}",
merge_operation.segment_ids().to_vec(),
e
);
// ... cancel merge // ... cancel merge
if cfg!(test) { if cfg!(test) {
panic!("Merge failed."); panic!("Merge failed.");
@@ -454,11 +460,12 @@ impl SegmentUpdater {
.collect::<Vec<_>>(); .collect::<Vec<_>>();
merge_candidates.extend(committed_merge_candidates.into_iter()); merge_candidates.extend(committed_merge_candidates.into_iter());
let logger = self.index.logger();
for merge_operation in merge_candidates { for merge_operation in merge_candidates {
if let Err(err) = self.start_merge(merge_operation) { if let Err(err) = self.start_merge(merge_operation) {
warn!( warn!(
"Starting the merge failed for the following reason. This is not fatal. {}", logger,
err "merge-start-fail (not fatal, not necessarily a problem)"; "reason" => format!("{}", err),
); );
} }
} }
@@ -471,8 +478,11 @@ impl SegmentUpdater {
) -> impl Future<Output = crate::Result<SegmentMeta>> { ) -> impl Future<Output = crate::Result<SegmentMeta>> {
let segment_updater = self.clone(); let segment_updater = self.clone();
let after_merge_segment_meta = after_merge_segment_entry.meta().clone(); let after_merge_segment_meta = after_merge_segment_entry.meta().clone();
let logger = self.index.logger().new(
slog::o!("segment"=>after_merge_segment_meta.id().to_string(),
"delete-opstamp"=>after_merge_segment_meta.delete_opstamp()),
);
let end_merge_future = self.schedule_future(async move { let end_merge_future = self.schedule_future(async move {
info!("End merge {:?}", after_merge_segment_entry.meta());
{ {
let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone(); let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone();
if let Some(delete_operation) = delete_cursor.get() { if let Some(delete_operation) = delete_cursor.get() {
@@ -486,6 +496,7 @@ impl SegmentUpdater {
committed_opstamp, committed_opstamp,
) { ) {
error!( error!(
logger,
"Merge of {:?} was cancelled (advancing deletes failed): {:?}", "Merge of {:?} was cancelled (advancing deletes failed): {:?}",
merge_operation.segment_ids(), merge_operation.segment_ids(),
e e

View File

@@ -1,5 +1,4 @@
use super::operation::AddOperation; use super::operation::AddOperation;
use crate::core::Segment;
use crate::core::SerializableSegment; use crate::core::SerializableSegment;
use crate::fastfield::FastFieldsWriter; use crate::fastfield::FastFieldsWriter;
use crate::fieldnorm::{FieldNormReaders, FieldNormsWriter}; use crate::fieldnorm::{FieldNormReaders, FieldNormsWriter};
@@ -15,6 +14,7 @@ use crate::tokenizer::{BoxTokenStream, PreTokenizedStream};
use crate::tokenizer::{FacetTokenizer, TextAnalyzer}; use crate::tokenizer::{FacetTokenizer, TextAnalyzer};
use crate::tokenizer::{TokenStreamChain, Tokenizer}; use crate::tokenizer::{TokenStreamChain, Tokenizer};
use crate::Opstamp; use crate::Opstamp;
use crate::{core::Segment, tokenizer::MAX_TOKEN_LEN};
use crate::{DocId, SegmentComponent}; use crate::{DocId, SegmentComponent};
use std::io; use std::io;
@@ -146,6 +146,9 @@ impl SegmentWriter {
for fake_str in facets { for fake_str in facets {
let mut unordered_term_id_opt = None; let mut unordered_term_id_opt = None;
FacetTokenizer.token_stream(fake_str).process(&mut |token| { FacetTokenizer.token_stream(fake_str).process(&mut |token| {
if token.text.len() > MAX_TOKEN_LEN {
return;
}
term_buffer.set_text(&token.text); term_buffer.set_text(&token.text);
let unordered_term_id = let unordered_term_id =
multifield_postings.subscribe(doc_id, &term_buffer); multifield_postings.subscribe(doc_id, &term_buffer);

View File

@@ -101,9 +101,6 @@
#[cfg_attr(test, macro_use)] #[cfg_attr(test, macro_use)]
extern crate serde_json; extern crate serde_json;
#[macro_use]
extern crate log;
#[macro_use] #[macro_use]
extern crate thiserror; extern crate thiserror;
@@ -148,6 +145,7 @@ pub mod schema;
pub mod space_usage; pub mod space_usage;
pub mod store; pub mod store;
pub mod termdict; pub mod termdict;
pub use slog;
mod reader; mod reader;

View File

@@ -227,23 +227,17 @@ pub trait PostingsWriter {
term_buffer.set_field(field); term_buffer.set_field(field);
let mut sink = |token: &Token| { let mut sink = |token: &Token| {
// We skip all tokens with a len greater than u16. // We skip all tokens with a len greater than u16.
if token.text.len() <= MAX_TOKEN_LEN { if token.text.len() > MAX_TOKEN_LEN {
term_buffer.set_text(token.text.as_str()); return;
self.subscribe(
term_index,
doc_id,
token.position as u32,
&term_buffer,
heap,
);
} else {
info!(
"A token exceeding MAX_TOKEN_LEN ({}>{}) was dropped. Search for \
MAX_TOKEN_LEN in the documentation for more information.",
token.text.len(),
MAX_TOKEN_LEN
);
} }
term_buffer.set_text(token.text.as_str());
self.subscribe(
term_index,
doc_id,
token.position as u32,
&term_buffer,
heap,
);
}; };
token_stream.process(&mut sink) token_stream.process(&mut sink)
} }

View File

@@ -1,5 +1,7 @@
mod pool; mod pool;
use slog::error;
pub use self::pool::LeasedItem; pub use self::pool::LeasedItem;
use self::pool::Pool; use self::pool::Pool;
use crate::core::Segment; use crate::core::Segment;
@@ -62,6 +64,7 @@ impl IndexReaderBuilder {
/// to open different segment readers. It may take hundreds of milliseconds /// to open different segment readers. It may take hundreds of milliseconds
/// of time and it may return an error. /// of time and it may return an error.
pub fn try_into(self) -> crate::Result<IndexReader> { pub fn try_into(self) -> crate::Result<IndexReader> {
let logger = self.index.logger().clone();
let inner_reader = InnerIndexReader { let inner_reader = InnerIndexReader {
index: self.index, index: self.index,
num_searchers: self.num_searchers, num_searchers: self.num_searchers,
@@ -80,8 +83,8 @@ impl IndexReaderBuilder {
let callback = move || { let callback = move || {
if let Err(err) = inner_reader_arc_clone.reload() { if let Err(err) = inner_reader_arc_clone.reload() {
error!( error!(
"Error while loading searcher after commit was detected. {:?}", logger,
err "Error while loading searcher after commit was detected. {:?}", err
); );
} }
}; };