diff --git a/CHANGELOG.md b/CHANGELOG.md index 95318f007..6840eaa24 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,8 @@ Tantivy 0.14.0 ========================= -- Remove dependency to atomicwrites #833 .Implemented by @pmasurel upon suggestion and research from @asafigan). +- Remove dependency to atomicwrites #833. Implemented by @pmasurel upon suggestion and research from @asafigan). - Migrated tantivy error from the now deprecated `failure` crate to `thiserror` #760. (@hirevo) +- Switched to structure logging (via the `slog` crate). (@pmasurel) Tantivy 0.13.1 =================== diff --git a/Cargo.toml b/Cargo.toml index 3f2148e72..c59291d5b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,8 @@ memmap = {version = "0.7", optional=true} lz4 = {version="1", optional=true} snap = "1" tempfile = {version="3", optional=true} -log = "0.4" +slog = "2.5" +slog-stdlog = "4" serde = {version="1", features=["derive"]} serde_json = "1" num_cpus = "1" diff --git a/src/core/executor.rs b/src/core/executor.rs index 8ac39a7eb..c07da7075 100644 --- a/src/core/executor.rs +++ b/src/core/executor.rs @@ -1,5 +1,6 @@ use crossbeam::channel; use rayon::{ThreadPool, ThreadPoolBuilder}; +use slog::{error, Logger}; /// Search executor whether search request are single thread or multithread. /// @@ -43,6 +44,7 @@ impl Executor { &self, f: F, args: AIterator, + logger: Logger, ) -> crate::Result> { match self { Executor::SingleThread => args.map(f).collect::>(), @@ -57,7 +59,7 @@ impl Executor { let (idx, arg) = arg_with_idx; let fruit = f(arg); if let Err(err) = fruit_sender.send((idx, fruit)) { - error!("Failed to send search task. It probably means all search threads have panicked. {:?}", err); + error!(logger, "Failed to send search task. It probably means all search threads have panicked. {:?}", err); } }); } @@ -87,17 +89,21 @@ impl Executor { #[cfg(test)] mod tests { + use slog::{o, Discard, Logger}; + use super::Executor; #[test] #[should_panic(expected = "panic should propagate")] fn test_panic_propagates_single_thread() { + let logger = Logger::root(Discard, o!()); let _result: Vec = Executor::single_thread() .map( |_| { panic!("panic should propagate"); }, vec![0].into_iter(), + logger, ) .unwrap(); } @@ -105,6 +111,7 @@ mod tests { #[test] #[should_panic] //< unfortunately the panic message is not propagated fn test_panic_propagates_multi_thread() { + let logger = Logger::root(Discard, o!()); let _result: Vec = Executor::multi_thread(1, "search-test") .unwrap() .map( @@ -112,14 +119,16 @@ mod tests { panic!("panic should propagate"); }, vec![0].into_iter(), + logger, ) .unwrap(); } #[test] fn test_map_singlethread() { + let logger = Logger::root(Discard, o!()); let result: Vec = Executor::single_thread() - .map(|i| Ok(i * 2), 0..1_000) + .map(|i| Ok(i * 2), 0..1_000, logger) .unwrap(); assert_eq!(result.len(), 1_000); for i in 0..1_000 { @@ -129,9 +138,10 @@ mod tests { #[test] fn test_map_multithread() { + let logger = Logger::root(Discard, o!()); let result: Vec = Executor::multi_thread(3, "search-test") .unwrap() - .map(|i| Ok(i * 2), 0..10) + .map(|i| Ok(i * 2), 0..10, logger) .unwrap(); assert_eq!(result.len(), 10); for i in 0..10 { diff --git a/src/core/index.rs b/src/core/index.rs index 989b5766b..64c7152de 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -21,6 +21,7 @@ use crate::schema::FieldType; use crate::schema::Schema; use crate::tokenizer::{TextAnalyzer, TokenizerManager}; use crate::IndexWriter; +use slog::Logger; use std::borrow::BorrowMut; use std::collections::HashSet; use std::fmt; @@ -57,6 +58,11 @@ pub struct Index { } impl Index { + + pub(crate) fn logger(&self) -> &Logger { + self.directory.logger() + } + /// Examines the directory to see if it contains an index. /// /// Effectively, it only checks for the presence of the `meta.json` file. @@ -147,13 +153,13 @@ impl Index { /// If a directory previously existed, it will be erased. pub fn create(dir: Dir, schema: Schema) -> crate::Result { let directory = ManagedDirectory::wrap(dir)?; - Index::from_directory(directory, schema) + Index::new_from_directory(directory, schema) } /// Create a new index from a directory. /// /// This will overwrite existing meta.json - fn from_directory(mut directory: ManagedDirectory, schema: Schema) -> crate::Result { + fn new_from_directory(mut directory: ManagedDirectory, schema: Schema) -> crate::Result { save_new_metas(schema.clone(), directory.borrow_mut())?; let metas = IndexMeta::with_schema(schema); Index::create_from_metas(directory, &metas, SegmentMetaInventory::default()) @@ -244,6 +250,8 @@ impl Index { /// Open the index using the provided directory pub fn open(directory: D) -> crate::Result { + let logger: &Logger = directory.logger(); + slog::info!(logger, "index-open"; "directory" => format!("{:?}", directory)); let directory = ManagedDirectory::wrap(directory)?; let inventory = SegmentMetaInventory::default(); let metas = load_metas(&directory, &inventory)?; diff --git a/src/core/searcher.rs b/src/core/searcher.rs index 8e8775efd..35b79429c 100644 --- a/src/core/searcher.rs +++ b/src/core/searcher.rs @@ -143,6 +143,7 @@ impl Searcher { collector.collect_segment(weight.as_ref(), segment_ord as u32, segment_reader) }, segment_readers.iter().enumerate(), + self.index.logger().clone(), )?; collector.merge_fruits(fruits) } diff --git a/src/core/segment_id.rs b/src/core/segment_id.rs index 0fd16aa2a..608f837f3 100644 --- a/src/core/segment_id.rs +++ b/src/core/segment_id.rs @@ -21,6 +21,12 @@ use std::sync::atomic; #[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct SegmentId(Uuid); +impl ToString for SegmentId { + fn to_string(&self) -> String { + self.short_uuid_string() + } +} + #[cfg(test)] static AUTO_INC_COUNTER: Lazy = Lazy::new(|| atomic::AtomicUsize::default()); diff --git a/src/core/segment_reader.rs b/src/core/segment_reader.rs index b941d44d3..b61bb02e1 100644 --- a/src/core/segment_reader.rs +++ b/src/core/segment_reader.rs @@ -17,6 +17,7 @@ use crate::store::StoreReader; use crate::termdict::TermDictionary; use crate::DocId; use fail::fail_point; +use slog::{warn, Logger}; use std::collections::HashMap; use std::fmt; use std::sync::Arc; @@ -53,6 +54,7 @@ pub struct SegmentReader { store_source: ReadOnlySource, delete_bitset_opt: Option, schema: Schema, + logger: Logger, } impl SegmentReader { @@ -200,6 +202,7 @@ impl SegmentReader { positions_composite, positions_idx_composite, schema, + logger: segment.index().logger().clone(), }) } @@ -229,7 +232,11 @@ impl SegmentReader { let record_option_opt = field_type.get_index_record_option(); if record_option_opt.is_none() { - warn!("Field {:?} does not seem indexed.", field_entry.name()); + warn!( + self.logger, + "Field {:?} does not seem indexed.", + field_entry.name() + ); } let postings_source_opt = self.postings_composite.open_read(field); diff --git a/src/directory/directory.rs b/src/directory/directory.rs index 900d398c5..d8c71dece 100644 --- a/src/directory/directory.rs +++ b/src/directory/directory.rs @@ -1,3 +1,5 @@ +use slog::{error, Logger}; + use crate::directory::directory_lock::Lock; use crate::directory::error::LockError; use crate::directory::error::{DeleteError, OpenReadError, OpenWriteError}; @@ -64,7 +66,10 @@ impl From> for DirectoryLock { impl Drop for DirectoryLockGuard { fn drop(&mut self) { if let Err(e) = self.directory.delete(&*self.path) { - error!("Failed to remove the lock file. {:?}", e); + error!( + self.directory.logger(), + "Failed to remove the lock file. {:?}", e + ); } } } @@ -209,6 +214,9 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static { /// `OnCommit` `ReloadPolicy`. Not implementing watch in a `Directory` only prevents the /// `OnCommit` `ReloadPolicy` to work properly. fn watch(&self, watch_callback: WatchCallback) -> crate::Result; + + /// Returns the `slog::Logger` configured for the `Directory`. + fn logger(&self) -> &Logger; } /// DirectoryClone diff --git a/src/directory/managed_directory.rs b/src/directory/managed_directory.rs index 7d4e6198c..8519c4f82 100644 --- a/src/directory/managed_directory.rs +++ b/src/directory/managed_directory.rs @@ -11,9 +11,9 @@ use crate::error::DataCorruption; use crate::Directory; use crc32fast::Hasher; +use slog::{debug, error, info}; use std::collections::HashSet; use std::io; -use std::io::Write; use std::path::{Path, PathBuf}; use std::result; use std::sync::RwLockWriteGuard; @@ -56,9 +56,9 @@ fn save_managed_paths( directory: &mut dyn Directory, wlock: &RwLockWriteGuard<'_, MetaInformation>, ) -> io::Result<()> { - let mut w = serde_json::to_vec(&wlock.managed_paths)?; - writeln!(&mut w)?; - directory.atomic_write(&MANAGED_FILEPATH, &w[..])?; + let mut managed_json = serde_json::to_string_pretty(&wlock.managed_paths)?; + managed_json.push_str("\n"); + directory.atomic_write(&MANAGED_FILEPATH, managed_json.as_bytes())?; Ok(()) } @@ -118,7 +118,7 @@ impl ManagedDirectory { &mut self, get_living_files: L, ) -> crate::Result { - info!("Garbage collect"); + info!(self.directory.logger(), "gc"; "stage"=>"start"); let mut files_to_delete = vec![]; // It is crucial to get the living files after acquiring the @@ -153,7 +153,7 @@ impl ManagedDirectory { } } Err(err) => { - error!("Failed to acquire lock for GC"); + error!(self.logger(), "Failed to acquire lock for GC"); return Err(crate::TantivyError::from(err)); } } @@ -165,7 +165,7 @@ impl ManagedDirectory { for file_to_delete in files_to_delete { match self.delete(&file_to_delete) { Ok(_) => { - info!("Deleted {:?}", file_to_delete); + debug!(self.logger(), "deleted-success"; "file"=>format!("{:?}", file_to_delete)); deleted_files.push(file_to_delete); } Err(file_error) => { @@ -178,7 +178,7 @@ impl ManagedDirectory { if !cfg!(target_os = "windows") { // On windows, delete is expected to fail if the file // is mmapped. - error!("Failed to delete {:?}", file_to_delete); + error!(self.logger(), "delete-file-fail"; "path"=>file_to_delete.to_str().unwrap_or("")); } } } @@ -200,6 +200,10 @@ impl ManagedDirectory { save_managed_paths(self.directory.as_mut(), &meta_informations_wlock)?; } + info!(self.directory.logger(), "gc"; "stage"=>"end", + "num-sucess-file-deletes"=>deleted_files.len(), + "num-failed-file-deletes"=>failed_to_delete_files.len()); + Ok(GarbageCollectionResult { deleted_files, failed_to_delete_files, @@ -274,6 +278,7 @@ impl ManagedDirectory { impl Directory for ManagedDirectory { fn open_read(&self, path: &Path) -> result::Result { + slog::debug!(self.logger(), "open-read"; "path" => path.to_str().unwrap_or("")); let read_only_source = self.directory.open_read(path)?; let (footer, reader) = Footer::extract_footer(read_only_source).map_err(|io_error| { OpenReadError::IOError { @@ -286,6 +291,7 @@ impl Directory for ManagedDirectory { } fn open_write(&mut self, path: &Path) -> result::Result { + slog::debug!(self.logger(), "open-write"; "path" => path.to_str().unwrap_or("")); self.register_file_as_managed(path) .map_err(|io_error| OpenWriteError::IOError { io_error, @@ -300,9 +306,11 @@ impl Directory for ManagedDirectory { )))) } - fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> { + fn atomic_write(&mut self, path: &Path, content: &[u8]) -> io::Result<()> { + let content_str = std::str::from_utf8(content).unwrap_or(""); + slog::debug!(self.logger(), "Atomic write"; "path" => format!("{:?}", path), "content_length"=>content_str); self.register_file_as_managed(path)?; - self.directory.atomic_write(path, data) + self.directory.atomic_write(path, content) } fn atomic_read(&self, path: &Path) -> result::Result, OpenReadError> { @@ -324,6 +332,10 @@ impl Directory for ManagedDirectory { fn watch(&self, watch_callback: WatchCallback) -> crate::Result { self.directory.watch(watch_callback) } + + fn logger(&self) -> &slog::Logger { + self.directory.logger() + } } impl Clone for ManagedDirectory { diff --git a/src/directory/mmap_directory.rs b/src/directory/mmap_directory.rs index b360eb25b..f4de447c4 100644 --- a/src/directory/mmap_directory.rs +++ b/src/directory/mmap_directory.rs @@ -17,6 +17,8 @@ use notify::RawEvent; use notify::RecursiveMode; use notify::Watcher; use serde::{Deserialize, Serialize}; +use slog::{debug, o, Drain, Logger}; +use slog_stdlog::StdLog; use std::collections::HashMap; use std::convert::From; use std::fmt; @@ -34,11 +36,6 @@ use std::sync::Weak; use std::thread; use tempfile::TempDir; -/// Create a default io error given a string. -pub(crate) fn make_io_err(msg: String) -> io::Error { - io::Error::new(io::ErrorKind::Other, msg) -} - /// Returns None iff the file exists, can be read, but is empty (and hence /// cannot be mmapped) fn open_mmap(full_path: &Path) -> result::Result, OpenReadError> { @@ -149,7 +146,7 @@ struct WatcherWrapper { } impl WatcherWrapper { - pub fn new(path: &Path) -> Result { + pub(crate) fn new(path: &Path, logger: Logger) -> Result { let (tx, watcher_recv): (Sender, Receiver) = channel(); // We need to initialize the let watcher = notify::raw_watcher(tx) @@ -163,7 +160,8 @@ impl WatcherWrapper { panic!("Unknown error while starting watching directory {:?}", path); } })?; - let watcher_router: Arc = Default::default(); + let watcher_router: Arc = + Arc::new(WatchCallbackList::with_logger(logger)); let watcher_router_clone = watcher_router.clone(); thread::Builder::new() .name("meta-file-watch-thread".to_string()) @@ -226,15 +224,21 @@ struct MmapDirectoryInner { mmap_cache: RwLock, _temp_directory: Option, watcher: RwLock>, + logger: Logger, } impl MmapDirectoryInner { - fn new(root_path: PathBuf, temp_directory: Option) -> MmapDirectoryInner { + fn new( + root_path: PathBuf, + temp_directory: Option, + logger: Logger, + ) -> MmapDirectoryInner { MmapDirectoryInner { root_path, mmap_cache: Default::default(), _temp_directory: temp_directory, watcher: RwLock::new(None), + logger, } } @@ -246,7 +250,7 @@ impl MmapDirectoryInner { // The downside is that we might create a watch wrapper that is not useful. let need_initialization = self.watcher.read().unwrap().is_none(); if need_initialization { - let watch_wrapper = WatcherWrapper::new(&self.root_path)?; + let watch_wrapper = WatcherWrapper::new(&self.root_path, self.logger.clone())?; let mut watch_wlock = self.watcher.write().unwrap(); // the watcher could have been initialized when we released the lock, and // we do not want to lose the watched files that were set. @@ -269,8 +273,8 @@ impl fmt::Debug for MmapDirectory { } impl MmapDirectory { - fn new(root_path: PathBuf, temp_directory: Option) -> MmapDirectory { - let inner = MmapDirectoryInner::new(root_path, temp_directory); + fn new(root_path: PathBuf, temp_directory: Option, logger: Logger) -> MmapDirectory { + let inner = MmapDirectoryInner::new(root_path, temp_directory, logger); MmapDirectory { inner: Arc::new(inner), } @@ -282,17 +286,18 @@ impl MmapDirectory { /// For your unit tests, prefer the RAMDirectory. pub fn create_from_tempdir() -> Result { let tempdir = TempDir::new().map_err(OpenDirectoryError::FailedToCreateTempDir)?; - Ok(MmapDirectory::new( - tempdir.path().to_path_buf(), - Some(tempdir), - )) + let logger = Logger::root(StdLog.fuse(), o!()); + Ok(MmapDirectory::new(tempdir.path().to_owned(), Some(tempdir), logger)) } /// Opens a MmapDirectory in a directory. /// /// Returns an error if the `directory_path` does not /// exist or if it is not a directory. - pub fn open>(directory_path: P) -> Result { + pub fn open_with_logger>( + directory_path: P, + logger: Logger, + ) -> Result { let directory_path: &Path = directory_path.as_ref(); if !directory_path.exists() { Err(OpenDirectoryError::DoesNotExist(PathBuf::from( @@ -303,10 +308,20 @@ impl MmapDirectory { directory_path, ))) } else { - Ok(MmapDirectory::new(PathBuf::from(directory_path), None)) + Ok(MmapDirectory::new( + PathBuf::from(directory_path), + None, + logger, + )) } } + /// Creates an `MmapDirectory` at the given path. + pub fn open>(directory_path: P) -> Result { + let logger = Logger::root(StdLog.fuse(), o!()); + Self::open_with_logger(directory_path, logger) + } + /// Joins a relative_path to the directory `root_path` /// to create a proper complete `filepath`. fn resolve_path(&self, relative_path: &Path) -> PathBuf { @@ -366,11 +381,12 @@ impl MmapDirectory { struct ReleaseLockFile { _file: File, path: PathBuf, + logger: Logger, } impl Drop for ReleaseLockFile { fn drop(&mut self) { - debug!("Releasing lock {:?}", self.path); + debug!(self.logger, "Releasing lock {:?}", self.path); } } @@ -409,17 +425,16 @@ impl TerminatingWrite for SafeFileWriter { impl Directory for MmapDirectory { fn open_read(&self, path: &Path) -> result::Result { - debug!("Open Read {:?}", path); let full_path = self.resolve_path(path); - let mut mmap_cache = self.inner.mmap_cache.write().map_err(|_| { let msg = format!( "Failed to acquired write lock \ on mmap cache while reading {:?}", path ); + let io_error = io::Error::new(io::ErrorKind::Other, msg); OpenReadError::IOError { - io_error: make_io_err(msg), + io_error, filepath: path.to_owned(), } })?; @@ -457,9 +472,7 @@ impl Directory for MmapDirectory { } fn open_write(&mut self, path: &Path) -> Result { - debug!("Open Write {:?}", path); let full_path = self.resolve_path(path); - let open_res = OpenOptions::new() .write(true) .create_new(true) @@ -519,7 +532,6 @@ impl Directory for MmapDirectory { } fn atomic_write(&mut self, path: &Path, content: &[u8]) -> io::Result<()> { - debug!("Atomic Write {:?}", path); let mut tempfile = tempfile::Builder::new().tempfile_in(&self.inner.root_path)?; tempfile.write_all(content)?; tempfile.flush()?; @@ -541,16 +553,22 @@ impl Directory for MmapDirectory { } else { file.try_lock_exclusive().map_err(|_| LockError::LockBusy)? } + let logger = self.inner.logger.clone(); // dropping the file handle will release the lock. Ok(DirectoryLock::from(Box::new(ReleaseLockFile { path: lock.filepath.clone(), _file: file, + logger, }))) } fn watch(&self, watch_callback: WatchCallback) -> crate::Result { self.inner.watch(watch_callback) } + + fn logger(&self) -> &Logger { + &self.inner.logger + } } #[cfg(test)] @@ -660,7 +678,8 @@ mod tests { let counter_clone = counter.clone(); let tmp_dir = tempfile::TempDir::new().unwrap(); let tmp_dirpath = tmp_dir.path().to_owned(); - let mut watch_wrapper = WatcherWrapper::new(&tmp_dirpath).unwrap(); + let logger = Logger::root(slog::Discard, o!()); + let mut watch_wrapper = WatcherWrapper::new(&tmp_dirpath, logger).unwrap(); let tmp_file = tmp_dirpath.join(*META_FILEPATH); let _handle = watch_wrapper.watch(Box::new(move || { counter_clone.fetch_add(1, Ordering::SeqCst); diff --git a/src/directory/mod.rs b/src/directory/mod.rs index df5e55d81..4d438944d 100644 --- a/src/directory/mod.rs +++ b/src/directory/mod.rs @@ -23,7 +23,8 @@ pub use self::directory::{Directory, DirectoryClone}; pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK}; pub use self::ram_directory::RAMDirectory; pub use self::read_only_source::ReadOnlySource; -pub use self::watch_event_router::{WatchCallback, WatchCallbackList, WatchHandle}; +pub(crate) use self::watch_event_router::WatchCallbackList; +pub use self::watch_event_router::{WatchCallback, WatchHandle}; use std::io::{self, BufWriter, Write}; use std::path::PathBuf; /// Outcome of the Garbage collection diff --git a/src/directory/ram_directory.rs b/src/directory/ram_directory.rs index 3b9a58182..ccc45f09e 100644 --- a/src/directory/ram_directory.rs +++ b/src/directory/ram_directory.rs @@ -5,6 +5,8 @@ use crate::directory::WatchCallbackList; use crate::directory::{Directory, ReadOnlySource, WatchCallback, WatchHandle}; use crate::directory::{TerminatingWrite, WritePtr}; use fail::fail_point; +use slog::{o, Drain, Logger}; +use slog_stdlog::StdLog; use std::collections::HashMap; use std::fmt; use std::io::{self, BufWriter, Cursor, Seek, SeekFrom, Write}; @@ -66,7 +68,7 @@ impl Write for VecWriter { fn flush(&mut self) -> io::Result<()> { self.is_flushed = true; - let mut fs = self.shared_directory.fs.write().unwrap(); + let mut fs = self.shared_directory.fs.inner_directory.write().unwrap(); fs.write(self.path.clone(), self.data.get_ref()); Ok(()) } @@ -78,13 +80,19 @@ impl TerminatingWrite for VecWriter { } } -#[derive(Default)] struct InnerDirectory { fs: HashMap, watch_router: WatchCallbackList, } impl InnerDirectory { + fn with_logger(logger: Logger) -> Self { + InnerDirectory { + fs: Default::default(), + watch_router: WatchCallbackList::with_logger(logger.clone()), + } + } + fn write(&mut self, path: PathBuf, data: &[u8]) -> bool { let data = ReadOnlySource::new(Vec::from(data)); self.fs.insert(path, data).is_some() @@ -117,20 +125,32 @@ impl InnerDirectory { } } +impl Default for RAMDirectory { + fn default() -> RAMDirectory { + let logger = Logger::root(StdLog.fuse(), o!()); + Self::with_logger(logger) + } +} + impl fmt::Debug for RAMDirectory { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "RAMDirectory") } } +struct Inner { + inner_directory: RwLock, + logger: Logger, +} + /// A Directory storing everything in anonymous memory. /// /// It is mainly meant for unit testing. /// Writes are only made visible upon flushing. /// -#[derive(Clone, Default)] +#[derive(Clone)] pub struct RAMDirectory { - fs: Arc>, + fs: Arc, } impl RAMDirectory { @@ -139,10 +159,21 @@ impl RAMDirectory { Self::default() } + /// Create a `RAMDirectory` with a custom logger. + pub fn with_logger(logger: Logger) -> RAMDirectory { + let inner_directory = InnerDirectory::with_logger(logger.clone()).into(); + RAMDirectory { + fs: Arc::new(Inner { + inner_directory, + logger, + }), + } + } + /// Returns the sum of the size of the different files /// in the RAMDirectory. pub fn total_mem_usage(&self) -> usize { - self.fs.read().unwrap().total_mem_usage() + self.fs.inner_directory.read().unwrap().total_mem_usage() } /// Write a copy of all of the files saved in the RAMDirectory in the target `Directory`. @@ -152,7 +183,7 @@ impl RAMDirectory { /// /// If an error is encounterred, files may be persisted partially. pub fn persist(&self, dest: &mut dyn Directory) -> crate::Result<()> { - let wlock = self.fs.write().unwrap(); + let wlock = self.fs.inner_directory.write().unwrap(); for (path, source) in wlock.fs.iter() { let mut dest_wrt = dest.open_write(path)?; dest_wrt.write_all(source.as_slice())?; @@ -164,7 +195,7 @@ impl RAMDirectory { impl Directory for RAMDirectory { fn open_read(&self, path: &Path) -> result::Result { - self.fs.read().unwrap().open_read(path) + self.fs.inner_directory.read().unwrap().open_read(path) } fn delete(&self, path: &Path) -> result::Result<(), DeleteError> { @@ -174,15 +205,15 @@ impl Directory for RAMDirectory { filepath: path.to_path_buf(), }) }); - self.fs.write().unwrap().delete(path) + self.fs.inner_directory.write().unwrap().delete(path) } fn exists(&self, path: &Path) -> bool { - self.fs.read().unwrap().exists(path) + self.fs.inner_directory.read().unwrap().exists(path) } fn open_write(&mut self, path: &Path) -> Result { - let mut fs = self.fs.write().unwrap(); + let mut fs = self.fs.inner_directory.write().unwrap(); let path_buf = PathBuf::from(path); let vec_writer = VecWriter::new(path_buf.clone(), self.clone()); let exists = fs.write(path_buf.clone(), &[]); @@ -206,19 +237,38 @@ impl Directory for RAMDirectory { let path_buf = PathBuf::from(path); // Reserve the path to prevent calls to .write() to succeed. - self.fs.write().unwrap().write(path_buf.clone(), &[]); + self.fs + .inner_directory + .write() + .unwrap() + .write(path_buf.clone(), &[]); let mut vec_writer = VecWriter::new(path_buf, self.clone()); vec_writer.write_all(data)?; vec_writer.flush()?; if path == Path::new(&*META_FILEPATH) { - let _ = self.fs.write().unwrap().watch_router.broadcast(); + let _ = self + .fs + .inner_directory + .write() + .unwrap() + .watch_router + .broadcast(); } Ok(()) } fn watch(&self, watch_callback: WatchCallback) -> crate::Result { - Ok(self.fs.write().unwrap().watch(watch_callback)) + Ok(self + .fs + .inner_directory + .write() + .unwrap() + .watch(watch_callback)) + } + + fn logger(&self) -> &Logger { + &self.fs.logger } } diff --git a/src/directory/watch_event_router.rs b/src/directory/watch_event_router.rs index d3dc3b26f..f9dd8bbcf 100644 --- a/src/directory/watch_event_router.rs +++ b/src/directory/watch_event_router.rs @@ -1,5 +1,6 @@ use futures::channel::oneshot; use futures::{Future, TryFutureExt}; +use slog::{error, Logger}; use std::sync::Arc; use std::sync::RwLock; use std::sync::Weak; @@ -11,9 +12,9 @@ pub type WatchCallback = Box; /// /// It registers callbacks (See `.subscribe(...)`) and /// calls them upon calls to `.broadcast(...)`. -#[derive(Default)] -pub struct WatchCallbackList { +pub(crate) struct WatchCallbackList { router: RwLock>>, + logger: Logger, } /// Controls how long a directory should watch for a file change. @@ -32,6 +33,13 @@ impl WatchHandle { } impl WatchCallbackList { + pub fn with_logger(logger: Logger) -> Self { + WatchCallbackList { + logger, + router: Default::default(), + } + } + /// Subscribes a new callback and returns a handle that controls the lifetime of the callback. pub fn subscribe(&self, watch_callback: WatchCallback) -> WatchHandle { let watch_callback_arc = Arc::new(watch_callback); @@ -74,8 +82,8 @@ impl WatchCallbackList { }); if let Err(err) = spawn_res { error!( - "Failed to spawn thread to call watch callbacks. Cause: {:?}", - err + self.logger, + "Failed to spawn thread to call watch callbacks. Cause: {:?}", err ); } result @@ -86,13 +94,18 @@ impl WatchCallbackList { mod tests { use crate::directory::WatchCallbackList; use futures::executor::block_on; + use slog::{o, Discard, Logger}; use std::mem; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; + fn default_watch_callback_list() -> WatchCallbackList { + WatchCallbackList::with_logger(Logger::root(Discard, o!())) + } + #[test] fn test_watch_event_router_simple() { - let watch_event_router = WatchCallbackList::default(); + let watch_event_router = default_watch_callback_list(); let counter: Arc = Default::default(); let counter_clone = counter.clone(); let inc_callback = Box::new(move || { @@ -119,7 +132,7 @@ mod tests { #[test] fn test_watch_event_router_multiple_callback_same_key() { - let watch_event_router = WatchCallbackList::default(); + let watch_event_router = default_watch_callback_list(); let counter: Arc = Default::default(); let inc_callback = |inc: usize| { let counter_clone = counter.clone(); @@ -148,7 +161,7 @@ mod tests { #[test] fn test_watch_event_router_multiple_callback_different_key() { - let watch_event_router = WatchCallbackList::default(); + let watch_event_router = default_watch_callback_list(); let counter: Arc = Default::default(); let counter_clone = counter.clone(); let inc_callback = Box::new(move || { diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index b36ac331c..68fb95367 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -27,6 +27,7 @@ use crate::Opstamp; use crossbeam::channel; use futures::executor::block_on; use futures::future::Future; +use slog::{error, info, Logger}; use smallvec::smallvec; use smallvec::SmallVec; use std::mem; @@ -195,20 +196,21 @@ fn index_documents( grouped_document_iterator: &mut dyn Iterator, segment_updater: &mut SegmentUpdater, mut delete_cursor: DeleteCursor, + logger: &Logger, ) -> crate::Result { let schema = segment.schema(); + info!(logger, "segment-index"; "stage"=>"start"); let mut segment_writer = SegmentWriter::for_segment(memory_budget, segment.clone(), &schema)?; + let mut buffer_limit_reached = false; for document_group in grouped_document_iterator { for doc in document_group { segment_writer.add_document(doc, &schema)?; } let mem_usage = segment_writer.mem_usage(); if mem_usage >= memory_budget - MARGIN_IN_BYTES { - info!( - "Buffer limit reached, flushing segment with maxdoc={}.", - segment_writer.max_doc() - ); + buffer_limit_reached = true; + break; } } @@ -228,6 +230,14 @@ fn index_documents( let segment_with_max_doc = segment.with_max_doc(max_doc); let last_docstamp: Opstamp = *(doc_opstamps.last().unwrap()); + info!( + logger, + "segment-index"; + "stage" => "serialize", + "cause" => if buffer_limit_reached { "buffer-limit" } else { "commit" }, + "maxdoc" => max_doc, + "last_docstamp" => last_docstamp + ); let delete_bitset_opt = apply_deletes( &segment_with_max_doc, @@ -241,7 +251,18 @@ fn index_documents( delete_cursor, delete_bitset_opt, ); + + info!( + logger, + "segment-index"; + "stage" => "publish", + ); block_on(segment_updater.schedule_add_segment(segment_entry))?; + info!( + logger, + "segment-index"; + "stage" => "end", + ); Ok(true) } @@ -344,6 +365,10 @@ impl IndexWriter { Ok(index_writer) } + pub(crate) fn logger(&self) -> &Logger { + self.index.logger() + } + fn drop_sender(&mut self) { let (sender, _receiver) = channel::bounded(1); self.operation_sender = sender; @@ -352,6 +377,8 @@ impl IndexWriter { /// If there are some merging threads, blocks until they all finish their work and /// then drop the `IndexWriter`. pub fn wait_merging_threads(mut self) -> crate::Result<()> { + info!(self.logger(), "wait-merge-threads"; "stage"=>"start"); + // this will stop the indexing thread, // dropping the last reference to the segment_updater. self.drop_sender(); @@ -372,9 +399,9 @@ impl IndexWriter { .map_err(|_| TantivyError::ErrorInThread("Failed to join merging thread.".into())); if let Err(ref e) = result { - error!("Some merging thread failed {:?}", e); + error!(self.logger(), "some merge thread failed"; "cause"=>e.to_string()); } - + info!(self.logger(), "wait-merge-threads"; "stage"=>"stop"); result } @@ -434,12 +461,16 @@ impl IndexWriter { return Ok(()); } let segment = index.new_segment(); + let segment_id = segment.id(); index_documents( mem_budget, segment, &mut document_iterator, &mut segment_updater, delete_cursor.clone(), + &index + .logger() + .new(slog::o!("segment"=>segment_id.to_string())), )?; } })?; @@ -553,7 +584,10 @@ impl IndexWriter { /// /// The opstamp at the last commit is returned. pub fn rollback(&mut self) -> crate::Result { - info!("Rolling back to opstamp {}", self.committed_opstamp); + info!( + self.logger(), + "Rolling back to opstamp {}", self.committed_opstamp + ); // marks the segment updater as killed. From now on, all // segment updates will be ignored. self.segment_updater.kill(); @@ -610,6 +644,8 @@ impl IndexWriter { /// using this API. /// See [`PreparedCommit::set_payload()`](PreparedCommit.html) pub fn prepare_commit(&mut self) -> crate::Result { + let logger = self.logger().clone(); + // Here, because we join all of the worker threads, // all of the segment update for this commit have been // sent. @@ -620,7 +656,10 @@ impl IndexWriter { // // This will move uncommitted segments to the state of // committed segments. - info!("Preparing commit"); + + let commit_opstamp = self.stamper.stamp(); + + info!(logger, "prepare-commit"; "opstamp" => commit_opstamp); // this will drop the current document channel // and recreate a new one. @@ -636,9 +675,8 @@ impl IndexWriter { self.add_indexing_worker()?; } - let commit_opstamp = self.stamper.stamp(); let prepared_commit = PreparedCommit::new(self, commit_opstamp); - info!("Prepared commit {}", commit_opstamp); + info!(logger, "Prepared commit {}", commit_opstamp); Ok(prepared_commit) } diff --git a/src/indexer/prepared_commit.rs b/src/indexer/prepared_commit.rs index dad89710b..3fe7b6286 100644 --- a/src/indexer/prepared_commit.rs +++ b/src/indexer/prepared_commit.rs @@ -1,6 +1,7 @@ use super::IndexWriter; use crate::Opstamp; use futures::executor::block_on; +use slog::info; /// A prepared commit pub struct PreparedCommit<'a> { @@ -31,7 +32,7 @@ impl<'a> PreparedCommit<'a> { } pub fn commit(self) -> crate::Result { - info!("committing {}", self.opstamp); + info!(self.index_writer.logger(), "committing {}", self.opstamp); let _ = block_on( self.index_writer .segment_updater() diff --git a/src/indexer/segment_manager.rs b/src/indexer/segment_manager.rs index 5af52313d..6c54486d7 100644 --- a/src/indexer/segment_manager.rs +++ b/src/indexer/segment_manager.rs @@ -1,3 +1,5 @@ +use slog::{warn, Logger}; + use super::segment_register::SegmentRegister; use crate::core::SegmentId; use crate::core::SegmentMeta; @@ -42,9 +44,9 @@ impl SegmentRegisters { /// /// It guarantees the atomicity of the /// changes (merges especially) -#[derive(Default)] pub struct SegmentManager { registers: RwLock, + logger: Logger, } impl Debug for SegmentManager { @@ -77,12 +79,14 @@ impl SegmentManager { pub fn from_segments( segment_metas: Vec, delete_cursor: &DeleteCursor, + logger: Logger, ) -> SegmentManager { SegmentManager { registers: RwLock::new(SegmentRegisters { uncommitted: SegmentRegister::default(), committed: SegmentRegister::new(segment_metas, delete_cursor), }), + logger, } } @@ -186,7 +190,7 @@ impl SegmentManager { let segments_status = registers_lock .segments_status(before_merge_segment_ids) .ok_or_else(|| { - warn!("couldn't find segment in SegmentManager"); + warn!(self.logger, "couldn't find segment in SegmentManager"); crate::TantivyError::InvalidArgument( "The segments that were merged could not be found in the SegmentManager. \ This is not necessarily a bug, and can happen after a rollback for instance." diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index 58a38829e..fbfbca158 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -23,9 +23,9 @@ use futures::channel::oneshot; use futures::executor::{ThreadPool, ThreadPoolBuilder}; use futures::future::Future; use futures::future::TryFutureExt; +use slog::{debug, error, info, warn}; use std::borrow::BorrowMut; use std::collections::HashSet; -use std::io::Write; use std::ops::Deref; use std::path::PathBuf; use std::sync::atomic::{AtomicBool, Ordering}; @@ -65,12 +65,11 @@ pub fn save_new_metas(schema: Schema, directory: &mut dyn Directory) -> crate::R /// /// This method is not part of tantivy's public API fn save_metas(metas: &IndexMeta, directory: &mut dyn Directory) -> crate::Result<()> { - info!("save metas"); - let mut buffer = serde_json::to_vec_pretty(metas)?; + let mut meta_json = serde_json::to_string_pretty(metas)?; // Just adding a new line at the end of the buffer. - writeln!(&mut buffer)?; - directory.atomic_write(&META_FILEPATH, &buffer[..])?; - debug!("Saved metas {:?}", serde_json::to_string_pretty(&metas)); + meta_json.push_str("\n"); + debug!(directory.logger(), "save meta"; "content"=>&meta_json); + directory.atomic_write(&META_FILEPATH, meta_json.as_bytes())?; Ok(()) } @@ -97,7 +96,6 @@ impl Deref for SegmentUpdater { async fn garbage_collect_files( segment_updater: SegmentUpdater, ) -> crate::Result { - info!("Running garbage collection"); let mut index = segment_updater.index.clone(); index .directory_mut() @@ -107,14 +105,12 @@ async fn garbage_collect_files( /// Merges a list of segments the list of segment givens in the `segment_entries`. /// This function happens in the calling thread and is computationally expensive. fn merge( + merged_segment: Segment, index: &Index, mut segment_entries: Vec, target_opstamp: Opstamp, ) -> crate::Result { - // first we need to apply deletes to our segment. - let merged_segment = index.new_segment(); - - // First we apply all of the delet to the merged segment, up to the target opstamp. + // First we apply all of the delete to the merged segment, up to the target opstamp. for segment_entry in &mut segment_entries { let segment = index.segment(segment_entry.meta().clone()); advance_deletes(segment, segment_entry, target_opstamp)?; @@ -167,7 +163,8 @@ impl SegmentUpdater { delete_cursor: &DeleteCursor, ) -> crate::Result { let segments = index.searchable_segment_metas()?; - let segment_manager = SegmentManager::from_segments(segments, delete_cursor); + let segment_manager = + SegmentManager::from_segments(segments, delete_cursor, index.logger().clone()); let pool = ThreadPoolBuilder::new() .name_prefix("segment_updater") .pool_size(1) @@ -387,7 +384,18 @@ impl SegmentUpdater { .segment_manager .start_merge(merge_operation.segment_ids())?; - info!("Starting merge - {:?}", merge_operation.segment_ids()); + let segment_ids_str: String = merge_operation + .segment_ids() + .iter() + .map(|segment_id| segment_id.to_string()) + .collect::>() + .join(","); + + let merged_segment = self.index.new_segment(); + let logger = self.index.logger().new(slog::o!("segments"=>segment_ids_str, "merged-segment"=>merged_segment.id().to_string())); + + let num_merges: usize = self.merge_operations.list().len(); + slog::info!(&logger, "merge"; "stage"=>"start", "num-merges" => num_merges); let (merging_future_send, merging_future_recv) = oneshot::channel::>(); @@ -398,22 +406,20 @@ impl SegmentUpdater { // as well as which segment is currently in merge and therefore should not be // candidate for another merge. match merge( + merged_segment, &segment_updater.index, segment_entries, merge_operation.target_opstamp(), ) { Ok(after_merge_segment_entry) => { + info!(&logger, "merge"; "stage" => "end"); let segment_meta = segment_updater .end_merge(merge_operation, after_merge_segment_entry) .await; let _send_result = merging_future_send.send(segment_meta); } Err(e) => { - warn!( - "Merge of {:?} was cancelled: {:?}", - merge_operation.segment_ids().to_vec(), - e - ); + error!(&logger, "merge"; "stage" => "fail", "cause"=>e.to_string()); // ... cancel merge if cfg!(test) { panic!("Merge failed."); @@ -454,11 +460,12 @@ impl SegmentUpdater { .collect::>(); merge_candidates.extend(committed_merge_candidates.into_iter()); + let logger = self.index.logger(); for merge_operation in merge_candidates { if let Err(err) = self.start_merge(merge_operation) { warn!( - "Starting the merge failed for the following reason. This is not fatal. {}", - err + logger, + "merge-start-fail (not fatal, not necessarily a problem)"; "reason" => format!("{}", err), ); } } @@ -471,8 +478,11 @@ impl SegmentUpdater { ) -> impl Future> { let segment_updater = self.clone(); let after_merge_segment_meta = after_merge_segment_entry.meta().clone(); + let logger = self.index.logger().new( + slog::o!("segment"=>after_merge_segment_meta.id().to_string(), + "delete-opstamp"=>after_merge_segment_meta.delete_opstamp()), + ); let end_merge_future = self.schedule_future(async move { - info!("End merge {:?}", after_merge_segment_entry.meta()); { let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone(); if let Some(delete_operation) = delete_cursor.get() { @@ -486,6 +496,7 @@ impl SegmentUpdater { committed_opstamp, ) { error!( + logger, "Merge of {:?} was cancelled (advancing deletes failed): {:?}", merge_operation.segment_ids(), e diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index 5bb979702..36defafde 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -1,5 +1,4 @@ use super::operation::AddOperation; -use crate::core::Segment; use crate::core::SerializableSegment; use crate::fastfield::FastFieldsWriter; use crate::fieldnorm::{FieldNormReaders, FieldNormsWriter}; @@ -15,6 +14,7 @@ use crate::tokenizer::{BoxTokenStream, PreTokenizedStream}; use crate::tokenizer::{FacetTokenizer, TextAnalyzer}; use crate::tokenizer::{TokenStreamChain, Tokenizer}; use crate::Opstamp; +use crate::{core::Segment, tokenizer::MAX_TOKEN_LEN}; use crate::{DocId, SegmentComponent}; use std::io; @@ -146,6 +146,9 @@ impl SegmentWriter { for fake_str in facets { let mut unordered_term_id_opt = None; FacetTokenizer.token_stream(fake_str).process(&mut |token| { + if token.text.len() > MAX_TOKEN_LEN { + return; + } term_buffer.set_text(&token.text); let unordered_term_id = multifield_postings.subscribe(doc_id, &term_buffer); diff --git a/src/lib.rs b/src/lib.rs index 7cbed553f..6ca97d7b9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -101,9 +101,6 @@ #[cfg_attr(test, macro_use)] extern crate serde_json; -#[macro_use] -extern crate log; - #[macro_use] extern crate thiserror; @@ -148,6 +145,7 @@ pub mod schema; pub mod space_usage; pub mod store; pub mod termdict; +pub use slog; mod reader; diff --git a/src/postings/postings_writer.rs b/src/postings/postings_writer.rs index 3fc0b8291..8c1c81ca0 100644 --- a/src/postings/postings_writer.rs +++ b/src/postings/postings_writer.rs @@ -227,23 +227,17 @@ pub trait PostingsWriter { term_buffer.set_field(field); let mut sink = |token: &Token| { // We skip all tokens with a len greater than u16. - if token.text.len() <= MAX_TOKEN_LEN { - term_buffer.set_text(token.text.as_str()); - self.subscribe( - term_index, - doc_id, - token.position as u32, - &term_buffer, - heap, - ); - } else { - info!( - "A token exceeding MAX_TOKEN_LEN ({}>{}) was dropped. Search for \ - MAX_TOKEN_LEN in the documentation for more information.", - token.text.len(), - MAX_TOKEN_LEN - ); + if token.text.len() > MAX_TOKEN_LEN { + return; } + term_buffer.set_text(token.text.as_str()); + self.subscribe( + term_index, + doc_id, + token.position as u32, + &term_buffer, + heap, + ); }; token_stream.process(&mut sink) } diff --git a/src/reader/mod.rs b/src/reader/mod.rs index ff993de83..c7514f594 100644 --- a/src/reader/mod.rs +++ b/src/reader/mod.rs @@ -1,5 +1,7 @@ mod pool; +use slog::error; + pub use self::pool::LeasedItem; use self::pool::Pool; use crate::core::Segment; @@ -62,6 +64,7 @@ impl IndexReaderBuilder { /// to open different segment readers. It may take hundreds of milliseconds /// of time and it may return an error. pub fn try_into(self) -> crate::Result { + let logger = self.index.logger().clone(); let inner_reader = InnerIndexReader { index: self.index, num_searchers: self.num_searchers, @@ -80,8 +83,8 @@ impl IndexReaderBuilder { let callback = move || { if let Err(err) = inner_reader_arc_clone.reload() { error!( - "Error while loading searcher after commit was detected. {:?}", - err + logger, + "Error while loading searcher after commit was detected. {:?}", err ); } };