mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2025-12-28 04:52:55 +00:00
Compare commits
1 Commits
termmap_pe
...
slog
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1c81b8171f |
@@ -1,7 +1,8 @@
|
||||
Tantivy 0.14.0
|
||||
=========================
|
||||
- Remove dependency to atomicwrites #833 .Implemented by @pmasurel upon suggestion and research from @asafigan).
|
||||
- Remove dependency to atomicwrites #833. Implemented by @pmasurel upon suggestion and research from @asafigan).
|
||||
- Migrated tantivy error from the now deprecated `failure` crate to `thiserror` #760. (@hirevo)
|
||||
- Switched to structure logging (via the `slog` crate). (@pmasurel)
|
||||
|
||||
Tantivy 0.13.1
|
||||
===================
|
||||
|
||||
@@ -23,7 +23,8 @@ memmap = {version = "0.7", optional=true}
|
||||
lz4 = {version="1", optional=true}
|
||||
snap = "1"
|
||||
tempfile = {version="3", optional=true}
|
||||
log = "0.4"
|
||||
slog = "2.5"
|
||||
slog-stdlog = "4"
|
||||
serde = {version="1", features=["derive"]}
|
||||
serde_json = "1"
|
||||
num_cpus = "1"
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use crossbeam::channel;
|
||||
use rayon::{ThreadPool, ThreadPoolBuilder};
|
||||
use slog::{error, Logger};
|
||||
|
||||
/// Search executor whether search request are single thread or multithread.
|
||||
///
|
||||
@@ -43,6 +44,7 @@ impl Executor {
|
||||
&self,
|
||||
f: F,
|
||||
args: AIterator,
|
||||
logger: Logger,
|
||||
) -> crate::Result<Vec<R>> {
|
||||
match self {
|
||||
Executor::SingleThread => args.map(f).collect::<crate::Result<_>>(),
|
||||
@@ -57,7 +59,7 @@ impl Executor {
|
||||
let (idx, arg) = arg_with_idx;
|
||||
let fruit = f(arg);
|
||||
if let Err(err) = fruit_sender.send((idx, fruit)) {
|
||||
error!("Failed to send search task. It probably means all search threads have panicked. {:?}", err);
|
||||
error!(logger, "Failed to send search task. It probably means all search threads have panicked. {:?}", err);
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -87,17 +89,21 @@ impl Executor {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use slog::{o, Discard, Logger};
|
||||
|
||||
use super::Executor;
|
||||
|
||||
#[test]
|
||||
#[should_panic(expected = "panic should propagate")]
|
||||
fn test_panic_propagates_single_thread() {
|
||||
let logger = Logger::root(Discard, o!());
|
||||
let _result: Vec<usize> = Executor::single_thread()
|
||||
.map(
|
||||
|_| {
|
||||
panic!("panic should propagate");
|
||||
},
|
||||
vec![0].into_iter(),
|
||||
logger,
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
@@ -105,6 +111,7 @@ mod tests {
|
||||
#[test]
|
||||
#[should_panic] //< unfortunately the panic message is not propagated
|
||||
fn test_panic_propagates_multi_thread() {
|
||||
let logger = Logger::root(Discard, o!());
|
||||
let _result: Vec<usize> = Executor::multi_thread(1, "search-test")
|
||||
.unwrap()
|
||||
.map(
|
||||
@@ -112,14 +119,16 @@ mod tests {
|
||||
panic!("panic should propagate");
|
||||
},
|
||||
vec![0].into_iter(),
|
||||
logger,
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_map_singlethread() {
|
||||
let logger = Logger::root(Discard, o!());
|
||||
let result: Vec<usize> = Executor::single_thread()
|
||||
.map(|i| Ok(i * 2), 0..1_000)
|
||||
.map(|i| Ok(i * 2), 0..1_000, logger)
|
||||
.unwrap();
|
||||
assert_eq!(result.len(), 1_000);
|
||||
for i in 0..1_000 {
|
||||
@@ -129,9 +138,10 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_map_multithread() {
|
||||
let logger = Logger::root(Discard, o!());
|
||||
let result: Vec<usize> = Executor::multi_thread(3, "search-test")
|
||||
.unwrap()
|
||||
.map(|i| Ok(i * 2), 0..10)
|
||||
.map(|i| Ok(i * 2), 0..10, logger)
|
||||
.unwrap();
|
||||
assert_eq!(result.len(), 10);
|
||||
for i in 0..10 {
|
||||
|
||||
@@ -21,6 +21,7 @@ use crate::schema::FieldType;
|
||||
use crate::schema::Schema;
|
||||
use crate::tokenizer::{TextAnalyzer, TokenizerManager};
|
||||
use crate::IndexWriter;
|
||||
use slog::Logger;
|
||||
use std::borrow::BorrowMut;
|
||||
use std::collections::HashSet;
|
||||
use std::fmt;
|
||||
@@ -57,6 +58,11 @@ pub struct Index {
|
||||
}
|
||||
|
||||
impl Index {
|
||||
|
||||
pub(crate) fn logger(&self) -> &Logger {
|
||||
self.directory.logger()
|
||||
}
|
||||
|
||||
/// Examines the directory to see if it contains an index.
|
||||
///
|
||||
/// Effectively, it only checks for the presence of the `meta.json` file.
|
||||
@@ -147,13 +153,13 @@ impl Index {
|
||||
/// If a directory previously existed, it will be erased.
|
||||
pub fn create<Dir: Directory>(dir: Dir, schema: Schema) -> crate::Result<Index> {
|
||||
let directory = ManagedDirectory::wrap(dir)?;
|
||||
Index::from_directory(directory, schema)
|
||||
Index::new_from_directory(directory, schema)
|
||||
}
|
||||
|
||||
/// Create a new index from a directory.
|
||||
///
|
||||
/// This will overwrite existing meta.json
|
||||
fn from_directory(mut directory: ManagedDirectory, schema: Schema) -> crate::Result<Index> {
|
||||
fn new_from_directory(mut directory: ManagedDirectory, schema: Schema) -> crate::Result<Index> {
|
||||
save_new_metas(schema.clone(), directory.borrow_mut())?;
|
||||
let metas = IndexMeta::with_schema(schema);
|
||||
Index::create_from_metas(directory, &metas, SegmentMetaInventory::default())
|
||||
@@ -244,6 +250,8 @@ impl Index {
|
||||
|
||||
/// Open the index using the provided directory
|
||||
pub fn open<D: Directory>(directory: D) -> crate::Result<Index> {
|
||||
let logger: &Logger = directory.logger();
|
||||
slog::info!(logger, "index-open"; "directory" => format!("{:?}", directory));
|
||||
let directory = ManagedDirectory::wrap(directory)?;
|
||||
let inventory = SegmentMetaInventory::default();
|
||||
let metas = load_metas(&directory, &inventory)?;
|
||||
|
||||
@@ -143,6 +143,7 @@ impl Searcher {
|
||||
collector.collect_segment(weight.as_ref(), segment_ord as u32, segment_reader)
|
||||
},
|
||||
segment_readers.iter().enumerate(),
|
||||
self.index.logger().clone(),
|
||||
)?;
|
||||
collector.merge_fruits(fruits)
|
||||
}
|
||||
|
||||
@@ -21,6 +21,12 @@ use std::sync::atomic;
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||
pub struct SegmentId(Uuid);
|
||||
|
||||
impl ToString for SegmentId {
|
||||
fn to_string(&self) -> String {
|
||||
self.short_uuid_string()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
static AUTO_INC_COUNTER: Lazy<atomic::AtomicUsize> = Lazy::new(|| atomic::AtomicUsize::default());
|
||||
|
||||
|
||||
@@ -17,6 +17,7 @@ use crate::store::StoreReader;
|
||||
use crate::termdict::TermDictionary;
|
||||
use crate::DocId;
|
||||
use fail::fail_point;
|
||||
use slog::{warn, Logger};
|
||||
use std::collections::HashMap;
|
||||
use std::fmt;
|
||||
use std::sync::Arc;
|
||||
@@ -53,6 +54,7 @@ pub struct SegmentReader {
|
||||
store_source: ReadOnlySource,
|
||||
delete_bitset_opt: Option<DeleteBitSet>,
|
||||
schema: Schema,
|
||||
logger: Logger,
|
||||
}
|
||||
|
||||
impl SegmentReader {
|
||||
@@ -200,6 +202,7 @@ impl SegmentReader {
|
||||
positions_composite,
|
||||
positions_idx_composite,
|
||||
schema,
|
||||
logger: segment.index().logger().clone(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -229,7 +232,11 @@ impl SegmentReader {
|
||||
let record_option_opt = field_type.get_index_record_option();
|
||||
|
||||
if record_option_opt.is_none() {
|
||||
warn!("Field {:?} does not seem indexed.", field_entry.name());
|
||||
warn!(
|
||||
self.logger,
|
||||
"Field {:?} does not seem indexed.",
|
||||
field_entry.name()
|
||||
);
|
||||
}
|
||||
|
||||
let postings_source_opt = self.postings_composite.open_read(field);
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
use slog::{error, Logger};
|
||||
|
||||
use crate::directory::directory_lock::Lock;
|
||||
use crate::directory::error::LockError;
|
||||
use crate::directory::error::{DeleteError, OpenReadError, OpenWriteError};
|
||||
@@ -64,7 +66,10 @@ impl<T: Send + Sync + 'static> From<Box<T>> for DirectoryLock {
|
||||
impl Drop for DirectoryLockGuard {
|
||||
fn drop(&mut self) {
|
||||
if let Err(e) = self.directory.delete(&*self.path) {
|
||||
error!("Failed to remove the lock file. {:?}", e);
|
||||
error!(
|
||||
self.directory.logger(),
|
||||
"Failed to remove the lock file. {:?}", e
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -209,6 +214,9 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
|
||||
/// `OnCommit` `ReloadPolicy`. Not implementing watch in a `Directory` only prevents the
|
||||
/// `OnCommit` `ReloadPolicy` to work properly.
|
||||
fn watch(&self, watch_callback: WatchCallback) -> crate::Result<WatchHandle>;
|
||||
|
||||
/// Returns the `slog::Logger` configured for the `Directory`.
|
||||
fn logger(&self) -> &Logger;
|
||||
}
|
||||
|
||||
/// DirectoryClone
|
||||
|
||||
@@ -11,9 +11,9 @@ use crate::error::DataCorruption;
|
||||
use crate::Directory;
|
||||
|
||||
use crc32fast::Hasher;
|
||||
use slog::{debug, error, info};
|
||||
use std::collections::HashSet;
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::result;
|
||||
use std::sync::RwLockWriteGuard;
|
||||
@@ -56,9 +56,9 @@ fn save_managed_paths(
|
||||
directory: &mut dyn Directory,
|
||||
wlock: &RwLockWriteGuard<'_, MetaInformation>,
|
||||
) -> io::Result<()> {
|
||||
let mut w = serde_json::to_vec(&wlock.managed_paths)?;
|
||||
writeln!(&mut w)?;
|
||||
directory.atomic_write(&MANAGED_FILEPATH, &w[..])?;
|
||||
let mut managed_json = serde_json::to_string_pretty(&wlock.managed_paths)?;
|
||||
managed_json.push_str("\n");
|
||||
directory.atomic_write(&MANAGED_FILEPATH, managed_json.as_bytes())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -118,7 +118,7 @@ impl ManagedDirectory {
|
||||
&mut self,
|
||||
get_living_files: L,
|
||||
) -> crate::Result<GarbageCollectionResult> {
|
||||
info!("Garbage collect");
|
||||
info!(self.directory.logger(), "gc"; "stage"=>"start");
|
||||
let mut files_to_delete = vec![];
|
||||
|
||||
// It is crucial to get the living files after acquiring the
|
||||
@@ -153,7 +153,7 @@ impl ManagedDirectory {
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
error!("Failed to acquire lock for GC");
|
||||
error!(self.logger(), "Failed to acquire lock for GC");
|
||||
return Err(crate::TantivyError::from(err));
|
||||
}
|
||||
}
|
||||
@@ -165,7 +165,7 @@ impl ManagedDirectory {
|
||||
for file_to_delete in files_to_delete {
|
||||
match self.delete(&file_to_delete) {
|
||||
Ok(_) => {
|
||||
info!("Deleted {:?}", file_to_delete);
|
||||
debug!(self.logger(), "deleted-success"; "file"=>format!("{:?}", file_to_delete));
|
||||
deleted_files.push(file_to_delete);
|
||||
}
|
||||
Err(file_error) => {
|
||||
@@ -178,7 +178,7 @@ impl ManagedDirectory {
|
||||
if !cfg!(target_os = "windows") {
|
||||
// On windows, delete is expected to fail if the file
|
||||
// is mmapped.
|
||||
error!("Failed to delete {:?}", file_to_delete);
|
||||
error!(self.logger(), "delete-file-fail"; "path"=>file_to_delete.to_str().unwrap_or("<invalid-utf8>"));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -200,6 +200,10 @@ impl ManagedDirectory {
|
||||
save_managed_paths(self.directory.as_mut(), &meta_informations_wlock)?;
|
||||
}
|
||||
|
||||
info!(self.directory.logger(), "gc"; "stage"=>"end",
|
||||
"num-sucess-file-deletes"=>deleted_files.len(),
|
||||
"num-failed-file-deletes"=>failed_to_delete_files.len());
|
||||
|
||||
Ok(GarbageCollectionResult {
|
||||
deleted_files,
|
||||
failed_to_delete_files,
|
||||
@@ -274,6 +278,7 @@ impl ManagedDirectory {
|
||||
|
||||
impl Directory for ManagedDirectory {
|
||||
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> {
|
||||
slog::debug!(self.logger(), "open-read"; "path" => path.to_str().unwrap_or("<invalid-utf8>"));
|
||||
let read_only_source = self.directory.open_read(path)?;
|
||||
let (footer, reader) = Footer::extract_footer(read_only_source).map_err(|io_error| {
|
||||
OpenReadError::IOError {
|
||||
@@ -286,6 +291,7 @@ impl Directory for ManagedDirectory {
|
||||
}
|
||||
|
||||
fn open_write(&mut self, path: &Path) -> result::Result<WritePtr, OpenWriteError> {
|
||||
slog::debug!(self.logger(), "open-write"; "path" => path.to_str().unwrap_or("<invalid-utf8>"));
|
||||
self.register_file_as_managed(path)
|
||||
.map_err(|io_error| OpenWriteError::IOError {
|
||||
io_error,
|
||||
@@ -300,9 +306,11 @@ impl Directory for ManagedDirectory {
|
||||
))))
|
||||
}
|
||||
|
||||
fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> {
|
||||
fn atomic_write(&mut self, path: &Path, content: &[u8]) -> io::Result<()> {
|
||||
let content_str = std::str::from_utf8(content).unwrap_or("<content-not-utf-8>");
|
||||
slog::debug!(self.logger(), "Atomic write"; "path" => format!("{:?}", path), "content_length"=>content_str);
|
||||
self.register_file_as_managed(path)?;
|
||||
self.directory.atomic_write(path, data)
|
||||
self.directory.atomic_write(path, content)
|
||||
}
|
||||
|
||||
fn atomic_read(&self, path: &Path) -> result::Result<Vec<u8>, OpenReadError> {
|
||||
@@ -324,6 +332,10 @@ impl Directory for ManagedDirectory {
|
||||
fn watch(&self, watch_callback: WatchCallback) -> crate::Result<WatchHandle> {
|
||||
self.directory.watch(watch_callback)
|
||||
}
|
||||
|
||||
fn logger(&self) -> &slog::Logger {
|
||||
self.directory.logger()
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for ManagedDirectory {
|
||||
|
||||
@@ -17,6 +17,8 @@ use notify::RawEvent;
|
||||
use notify::RecursiveMode;
|
||||
use notify::Watcher;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use slog::{debug, o, Drain, Logger};
|
||||
use slog_stdlog::StdLog;
|
||||
use std::collections::HashMap;
|
||||
use std::convert::From;
|
||||
use std::fmt;
|
||||
@@ -34,11 +36,6 @@ use std::sync::Weak;
|
||||
use std::thread;
|
||||
use tempfile::TempDir;
|
||||
|
||||
/// Create a default io error given a string.
|
||||
pub(crate) fn make_io_err(msg: String) -> io::Error {
|
||||
io::Error::new(io::ErrorKind::Other, msg)
|
||||
}
|
||||
|
||||
/// Returns None iff the file exists, can be read, but is empty (and hence
|
||||
/// cannot be mmapped)
|
||||
fn open_mmap(full_path: &Path) -> result::Result<Option<Mmap>, OpenReadError> {
|
||||
@@ -149,7 +146,7 @@ struct WatcherWrapper {
|
||||
}
|
||||
|
||||
impl WatcherWrapper {
|
||||
pub fn new(path: &Path) -> Result<Self, OpenDirectoryError> {
|
||||
pub(crate) fn new(path: &Path, logger: Logger) -> Result<Self, OpenDirectoryError> {
|
||||
let (tx, watcher_recv): (Sender<RawEvent>, Receiver<RawEvent>) = channel();
|
||||
// We need to initialize the
|
||||
let watcher = notify::raw_watcher(tx)
|
||||
@@ -163,7 +160,8 @@ impl WatcherWrapper {
|
||||
panic!("Unknown error while starting watching directory {:?}", path);
|
||||
}
|
||||
})?;
|
||||
let watcher_router: Arc<WatchCallbackList> = Default::default();
|
||||
let watcher_router: Arc<WatchCallbackList> =
|
||||
Arc::new(WatchCallbackList::with_logger(logger));
|
||||
let watcher_router_clone = watcher_router.clone();
|
||||
thread::Builder::new()
|
||||
.name("meta-file-watch-thread".to_string())
|
||||
@@ -226,15 +224,21 @@ struct MmapDirectoryInner {
|
||||
mmap_cache: RwLock<MmapCache>,
|
||||
_temp_directory: Option<TempDir>,
|
||||
watcher: RwLock<Option<WatcherWrapper>>,
|
||||
logger: Logger,
|
||||
}
|
||||
|
||||
impl MmapDirectoryInner {
|
||||
fn new(root_path: PathBuf, temp_directory: Option<TempDir>) -> MmapDirectoryInner {
|
||||
fn new(
|
||||
root_path: PathBuf,
|
||||
temp_directory: Option<TempDir>,
|
||||
logger: Logger,
|
||||
) -> MmapDirectoryInner {
|
||||
MmapDirectoryInner {
|
||||
root_path,
|
||||
mmap_cache: Default::default(),
|
||||
_temp_directory: temp_directory,
|
||||
watcher: RwLock::new(None),
|
||||
logger,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -246,7 +250,7 @@ impl MmapDirectoryInner {
|
||||
// The downside is that we might create a watch wrapper that is not useful.
|
||||
let need_initialization = self.watcher.read().unwrap().is_none();
|
||||
if need_initialization {
|
||||
let watch_wrapper = WatcherWrapper::new(&self.root_path)?;
|
||||
let watch_wrapper = WatcherWrapper::new(&self.root_path, self.logger.clone())?;
|
||||
let mut watch_wlock = self.watcher.write().unwrap();
|
||||
// the watcher could have been initialized when we released the lock, and
|
||||
// we do not want to lose the watched files that were set.
|
||||
@@ -269,8 +273,8 @@ impl fmt::Debug for MmapDirectory {
|
||||
}
|
||||
|
||||
impl MmapDirectory {
|
||||
fn new(root_path: PathBuf, temp_directory: Option<TempDir>) -> MmapDirectory {
|
||||
let inner = MmapDirectoryInner::new(root_path, temp_directory);
|
||||
fn new(root_path: PathBuf, temp_directory: Option<TempDir>, logger: Logger) -> MmapDirectory {
|
||||
let inner = MmapDirectoryInner::new(root_path, temp_directory, logger);
|
||||
MmapDirectory {
|
||||
inner: Arc::new(inner),
|
||||
}
|
||||
@@ -282,17 +286,18 @@ impl MmapDirectory {
|
||||
/// For your unit tests, prefer the RAMDirectory.
|
||||
pub fn create_from_tempdir() -> Result<MmapDirectory, OpenDirectoryError> {
|
||||
let tempdir = TempDir::new().map_err(OpenDirectoryError::FailedToCreateTempDir)?;
|
||||
Ok(MmapDirectory::new(
|
||||
tempdir.path().to_path_buf(),
|
||||
Some(tempdir),
|
||||
))
|
||||
let logger = Logger::root(StdLog.fuse(), o!());
|
||||
Ok(MmapDirectory::new(tempdir.path().to_owned(), Some(tempdir), logger))
|
||||
}
|
||||
|
||||
/// Opens a MmapDirectory in a directory.
|
||||
///
|
||||
/// Returns an error if the `directory_path` does not
|
||||
/// exist or if it is not a directory.
|
||||
pub fn open<P: AsRef<Path>>(directory_path: P) -> Result<MmapDirectory, OpenDirectoryError> {
|
||||
pub fn open_with_logger<P: AsRef<Path>>(
|
||||
directory_path: P,
|
||||
logger: Logger,
|
||||
) -> Result<MmapDirectory, OpenDirectoryError> {
|
||||
let directory_path: &Path = directory_path.as_ref();
|
||||
if !directory_path.exists() {
|
||||
Err(OpenDirectoryError::DoesNotExist(PathBuf::from(
|
||||
@@ -303,10 +308,20 @@ impl MmapDirectory {
|
||||
directory_path,
|
||||
)))
|
||||
} else {
|
||||
Ok(MmapDirectory::new(PathBuf::from(directory_path), None))
|
||||
Ok(MmapDirectory::new(
|
||||
PathBuf::from(directory_path),
|
||||
None,
|
||||
logger,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates an `MmapDirectory` at the given path.
|
||||
pub fn open<P: AsRef<Path>>(directory_path: P) -> Result<MmapDirectory, OpenDirectoryError> {
|
||||
let logger = Logger::root(StdLog.fuse(), o!());
|
||||
Self::open_with_logger(directory_path, logger)
|
||||
}
|
||||
|
||||
/// Joins a relative_path to the directory `root_path`
|
||||
/// to create a proper complete `filepath`.
|
||||
fn resolve_path(&self, relative_path: &Path) -> PathBuf {
|
||||
@@ -366,11 +381,12 @@ impl MmapDirectory {
|
||||
struct ReleaseLockFile {
|
||||
_file: File,
|
||||
path: PathBuf,
|
||||
logger: Logger,
|
||||
}
|
||||
|
||||
impl Drop for ReleaseLockFile {
|
||||
fn drop(&mut self) {
|
||||
debug!("Releasing lock {:?}", self.path);
|
||||
debug!(self.logger, "Releasing lock {:?}", self.path);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -409,17 +425,16 @@ impl TerminatingWrite for SafeFileWriter {
|
||||
|
||||
impl Directory for MmapDirectory {
|
||||
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> {
|
||||
debug!("Open Read {:?}", path);
|
||||
let full_path = self.resolve_path(path);
|
||||
|
||||
let mut mmap_cache = self.inner.mmap_cache.write().map_err(|_| {
|
||||
let msg = format!(
|
||||
"Failed to acquired write lock \
|
||||
on mmap cache while reading {:?}",
|
||||
path
|
||||
);
|
||||
let io_error = io::Error::new(io::ErrorKind::Other, msg);
|
||||
OpenReadError::IOError {
|
||||
io_error: make_io_err(msg),
|
||||
io_error,
|
||||
filepath: path.to_owned(),
|
||||
}
|
||||
})?;
|
||||
@@ -457,9 +472,7 @@ impl Directory for MmapDirectory {
|
||||
}
|
||||
|
||||
fn open_write(&mut self, path: &Path) -> Result<WritePtr, OpenWriteError> {
|
||||
debug!("Open Write {:?}", path);
|
||||
let full_path = self.resolve_path(path);
|
||||
|
||||
let open_res = OpenOptions::new()
|
||||
.write(true)
|
||||
.create_new(true)
|
||||
@@ -519,7 +532,6 @@ impl Directory for MmapDirectory {
|
||||
}
|
||||
|
||||
fn atomic_write(&mut self, path: &Path, content: &[u8]) -> io::Result<()> {
|
||||
debug!("Atomic Write {:?}", path);
|
||||
let mut tempfile = tempfile::Builder::new().tempfile_in(&self.inner.root_path)?;
|
||||
tempfile.write_all(content)?;
|
||||
tempfile.flush()?;
|
||||
@@ -541,16 +553,22 @@ impl Directory for MmapDirectory {
|
||||
} else {
|
||||
file.try_lock_exclusive().map_err(|_| LockError::LockBusy)?
|
||||
}
|
||||
let logger = self.inner.logger.clone();
|
||||
// dropping the file handle will release the lock.
|
||||
Ok(DirectoryLock::from(Box::new(ReleaseLockFile {
|
||||
path: lock.filepath.clone(),
|
||||
_file: file,
|
||||
logger,
|
||||
})))
|
||||
}
|
||||
|
||||
fn watch(&self, watch_callback: WatchCallback) -> crate::Result<WatchHandle> {
|
||||
self.inner.watch(watch_callback)
|
||||
}
|
||||
|
||||
fn logger(&self) -> &Logger {
|
||||
&self.inner.logger
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -660,7 +678,8 @@ mod tests {
|
||||
let counter_clone = counter.clone();
|
||||
let tmp_dir = tempfile::TempDir::new().unwrap();
|
||||
let tmp_dirpath = tmp_dir.path().to_owned();
|
||||
let mut watch_wrapper = WatcherWrapper::new(&tmp_dirpath).unwrap();
|
||||
let logger = Logger::root(slog::Discard, o!());
|
||||
let mut watch_wrapper = WatcherWrapper::new(&tmp_dirpath, logger).unwrap();
|
||||
let tmp_file = tmp_dirpath.join(*META_FILEPATH);
|
||||
let _handle = watch_wrapper.watch(Box::new(move || {
|
||||
counter_clone.fetch_add(1, Ordering::SeqCst);
|
||||
|
||||
@@ -23,7 +23,8 @@ pub use self::directory::{Directory, DirectoryClone};
|
||||
pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK};
|
||||
pub use self::ram_directory::RAMDirectory;
|
||||
pub use self::read_only_source::ReadOnlySource;
|
||||
pub use self::watch_event_router::{WatchCallback, WatchCallbackList, WatchHandle};
|
||||
pub(crate) use self::watch_event_router::WatchCallbackList;
|
||||
pub use self::watch_event_router::{WatchCallback, WatchHandle};
|
||||
use std::io::{self, BufWriter, Write};
|
||||
use std::path::PathBuf;
|
||||
/// Outcome of the Garbage collection
|
||||
|
||||
@@ -5,6 +5,8 @@ use crate::directory::WatchCallbackList;
|
||||
use crate::directory::{Directory, ReadOnlySource, WatchCallback, WatchHandle};
|
||||
use crate::directory::{TerminatingWrite, WritePtr};
|
||||
use fail::fail_point;
|
||||
use slog::{o, Drain, Logger};
|
||||
use slog_stdlog::StdLog;
|
||||
use std::collections::HashMap;
|
||||
use std::fmt;
|
||||
use std::io::{self, BufWriter, Cursor, Seek, SeekFrom, Write};
|
||||
@@ -66,7 +68,7 @@ impl Write for VecWriter {
|
||||
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
self.is_flushed = true;
|
||||
let mut fs = self.shared_directory.fs.write().unwrap();
|
||||
let mut fs = self.shared_directory.fs.inner_directory.write().unwrap();
|
||||
fs.write(self.path.clone(), self.data.get_ref());
|
||||
Ok(())
|
||||
}
|
||||
@@ -78,13 +80,19 @@ impl TerminatingWrite for VecWriter {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct InnerDirectory {
|
||||
fs: HashMap<PathBuf, ReadOnlySource>,
|
||||
watch_router: WatchCallbackList,
|
||||
}
|
||||
|
||||
impl InnerDirectory {
|
||||
fn with_logger(logger: Logger) -> Self {
|
||||
InnerDirectory {
|
||||
fs: Default::default(),
|
||||
watch_router: WatchCallbackList::with_logger(logger.clone()),
|
||||
}
|
||||
}
|
||||
|
||||
fn write(&mut self, path: PathBuf, data: &[u8]) -> bool {
|
||||
let data = ReadOnlySource::new(Vec::from(data));
|
||||
self.fs.insert(path, data).is_some()
|
||||
@@ -117,20 +125,32 @@ impl InnerDirectory {
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for RAMDirectory {
|
||||
fn default() -> RAMDirectory {
|
||||
let logger = Logger::root(StdLog.fuse(), o!());
|
||||
Self::with_logger(logger)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for RAMDirectory {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "RAMDirectory")
|
||||
}
|
||||
}
|
||||
|
||||
struct Inner {
|
||||
inner_directory: RwLock<InnerDirectory>,
|
||||
logger: Logger,
|
||||
}
|
||||
|
||||
/// A Directory storing everything in anonymous memory.
|
||||
///
|
||||
/// It is mainly meant for unit testing.
|
||||
/// Writes are only made visible upon flushing.
|
||||
///
|
||||
#[derive(Clone, Default)]
|
||||
#[derive(Clone)]
|
||||
pub struct RAMDirectory {
|
||||
fs: Arc<RwLock<InnerDirectory>>,
|
||||
fs: Arc<Inner>,
|
||||
}
|
||||
|
||||
impl RAMDirectory {
|
||||
@@ -139,10 +159,21 @@ impl RAMDirectory {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
/// Create a `RAMDirectory` with a custom logger.
|
||||
pub fn with_logger(logger: Logger) -> RAMDirectory {
|
||||
let inner_directory = InnerDirectory::with_logger(logger.clone()).into();
|
||||
RAMDirectory {
|
||||
fs: Arc::new(Inner {
|
||||
inner_directory,
|
||||
logger,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the sum of the size of the different files
|
||||
/// in the RAMDirectory.
|
||||
pub fn total_mem_usage(&self) -> usize {
|
||||
self.fs.read().unwrap().total_mem_usage()
|
||||
self.fs.inner_directory.read().unwrap().total_mem_usage()
|
||||
}
|
||||
|
||||
/// Write a copy of all of the files saved in the RAMDirectory in the target `Directory`.
|
||||
@@ -152,7 +183,7 @@ impl RAMDirectory {
|
||||
///
|
||||
/// If an error is encounterred, files may be persisted partially.
|
||||
pub fn persist(&self, dest: &mut dyn Directory) -> crate::Result<()> {
|
||||
let wlock = self.fs.write().unwrap();
|
||||
let wlock = self.fs.inner_directory.write().unwrap();
|
||||
for (path, source) in wlock.fs.iter() {
|
||||
let mut dest_wrt = dest.open_write(path)?;
|
||||
dest_wrt.write_all(source.as_slice())?;
|
||||
@@ -164,7 +195,7 @@ impl RAMDirectory {
|
||||
|
||||
impl Directory for RAMDirectory {
|
||||
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> {
|
||||
self.fs.read().unwrap().open_read(path)
|
||||
self.fs.inner_directory.read().unwrap().open_read(path)
|
||||
}
|
||||
|
||||
fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
|
||||
@@ -174,15 +205,15 @@ impl Directory for RAMDirectory {
|
||||
filepath: path.to_path_buf(),
|
||||
})
|
||||
});
|
||||
self.fs.write().unwrap().delete(path)
|
||||
self.fs.inner_directory.write().unwrap().delete(path)
|
||||
}
|
||||
|
||||
fn exists(&self, path: &Path) -> bool {
|
||||
self.fs.read().unwrap().exists(path)
|
||||
self.fs.inner_directory.read().unwrap().exists(path)
|
||||
}
|
||||
|
||||
fn open_write(&mut self, path: &Path) -> Result<WritePtr, OpenWriteError> {
|
||||
let mut fs = self.fs.write().unwrap();
|
||||
let mut fs = self.fs.inner_directory.write().unwrap();
|
||||
let path_buf = PathBuf::from(path);
|
||||
let vec_writer = VecWriter::new(path_buf.clone(), self.clone());
|
||||
let exists = fs.write(path_buf.clone(), &[]);
|
||||
@@ -206,19 +237,38 @@ impl Directory for RAMDirectory {
|
||||
let path_buf = PathBuf::from(path);
|
||||
|
||||
// Reserve the path to prevent calls to .write() to succeed.
|
||||
self.fs.write().unwrap().write(path_buf.clone(), &[]);
|
||||
self.fs
|
||||
.inner_directory
|
||||
.write()
|
||||
.unwrap()
|
||||
.write(path_buf.clone(), &[]);
|
||||
|
||||
let mut vec_writer = VecWriter::new(path_buf, self.clone());
|
||||
vec_writer.write_all(data)?;
|
||||
vec_writer.flush()?;
|
||||
if path == Path::new(&*META_FILEPATH) {
|
||||
let _ = self.fs.write().unwrap().watch_router.broadcast();
|
||||
let _ = self
|
||||
.fs
|
||||
.inner_directory
|
||||
.write()
|
||||
.unwrap()
|
||||
.watch_router
|
||||
.broadcast();
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn watch(&self, watch_callback: WatchCallback) -> crate::Result<WatchHandle> {
|
||||
Ok(self.fs.write().unwrap().watch(watch_callback))
|
||||
Ok(self
|
||||
.fs
|
||||
.inner_directory
|
||||
.write()
|
||||
.unwrap()
|
||||
.watch(watch_callback))
|
||||
}
|
||||
|
||||
fn logger(&self) -> &Logger {
|
||||
&self.fs.logger
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use futures::channel::oneshot;
|
||||
use futures::{Future, TryFutureExt};
|
||||
use slog::{error, Logger};
|
||||
use std::sync::Arc;
|
||||
use std::sync::RwLock;
|
||||
use std::sync::Weak;
|
||||
@@ -11,9 +12,9 @@ pub type WatchCallback = Box<dyn Fn() + Sync + Send>;
|
||||
///
|
||||
/// It registers callbacks (See `.subscribe(...)`) and
|
||||
/// calls them upon calls to `.broadcast(...)`.
|
||||
#[derive(Default)]
|
||||
pub struct WatchCallbackList {
|
||||
pub(crate) struct WatchCallbackList {
|
||||
router: RwLock<Vec<Weak<WatchCallback>>>,
|
||||
logger: Logger,
|
||||
}
|
||||
|
||||
/// Controls how long a directory should watch for a file change.
|
||||
@@ -32,6 +33,13 @@ impl WatchHandle {
|
||||
}
|
||||
|
||||
impl WatchCallbackList {
|
||||
pub fn with_logger(logger: Logger) -> Self {
|
||||
WatchCallbackList {
|
||||
logger,
|
||||
router: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Subscribes a new callback and returns a handle that controls the lifetime of the callback.
|
||||
pub fn subscribe(&self, watch_callback: WatchCallback) -> WatchHandle {
|
||||
let watch_callback_arc = Arc::new(watch_callback);
|
||||
@@ -74,8 +82,8 @@ impl WatchCallbackList {
|
||||
});
|
||||
if let Err(err) = spawn_res {
|
||||
error!(
|
||||
"Failed to spawn thread to call watch callbacks. Cause: {:?}",
|
||||
err
|
||||
self.logger,
|
||||
"Failed to spawn thread to call watch callbacks. Cause: {:?}", err
|
||||
);
|
||||
}
|
||||
result
|
||||
@@ -86,13 +94,18 @@ impl WatchCallbackList {
|
||||
mod tests {
|
||||
use crate::directory::WatchCallbackList;
|
||||
use futures::executor::block_on;
|
||||
use slog::{o, Discard, Logger};
|
||||
use std::mem;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
fn default_watch_callback_list() -> WatchCallbackList {
|
||||
WatchCallbackList::with_logger(Logger::root(Discard, o!()))
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_watch_event_router_simple() {
|
||||
let watch_event_router = WatchCallbackList::default();
|
||||
let watch_event_router = default_watch_callback_list();
|
||||
let counter: Arc<AtomicUsize> = Default::default();
|
||||
let counter_clone = counter.clone();
|
||||
let inc_callback = Box::new(move || {
|
||||
@@ -119,7 +132,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_watch_event_router_multiple_callback_same_key() {
|
||||
let watch_event_router = WatchCallbackList::default();
|
||||
let watch_event_router = default_watch_callback_list();
|
||||
let counter: Arc<AtomicUsize> = Default::default();
|
||||
let inc_callback = |inc: usize| {
|
||||
let counter_clone = counter.clone();
|
||||
@@ -148,7 +161,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_watch_event_router_multiple_callback_different_key() {
|
||||
let watch_event_router = WatchCallbackList::default();
|
||||
let watch_event_router = default_watch_callback_list();
|
||||
let counter: Arc<AtomicUsize> = Default::default();
|
||||
let counter_clone = counter.clone();
|
||||
let inc_callback = Box::new(move || {
|
||||
|
||||
@@ -27,6 +27,7 @@ use crate::Opstamp;
|
||||
use crossbeam::channel;
|
||||
use futures::executor::block_on;
|
||||
use futures::future::Future;
|
||||
use slog::{error, info, Logger};
|
||||
use smallvec::smallvec;
|
||||
use smallvec::SmallVec;
|
||||
use std::mem;
|
||||
@@ -195,20 +196,21 @@ fn index_documents(
|
||||
grouped_document_iterator: &mut dyn Iterator<Item = OperationGroup>,
|
||||
segment_updater: &mut SegmentUpdater,
|
||||
mut delete_cursor: DeleteCursor,
|
||||
logger: &Logger,
|
||||
) -> crate::Result<bool> {
|
||||
let schema = segment.schema();
|
||||
|
||||
info!(logger, "segment-index"; "stage"=>"start");
|
||||
let mut segment_writer = SegmentWriter::for_segment(memory_budget, segment.clone(), &schema)?;
|
||||
let mut buffer_limit_reached = false;
|
||||
for document_group in grouped_document_iterator {
|
||||
for doc in document_group {
|
||||
segment_writer.add_document(doc, &schema)?;
|
||||
}
|
||||
let mem_usage = segment_writer.mem_usage();
|
||||
if mem_usage >= memory_budget - MARGIN_IN_BYTES {
|
||||
info!(
|
||||
"Buffer limit reached, flushing segment with maxdoc={}.",
|
||||
segment_writer.max_doc()
|
||||
);
|
||||
buffer_limit_reached = true;
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -228,6 +230,14 @@ fn index_documents(
|
||||
let segment_with_max_doc = segment.with_max_doc(max_doc);
|
||||
|
||||
let last_docstamp: Opstamp = *(doc_opstamps.last().unwrap());
|
||||
info!(
|
||||
logger,
|
||||
"segment-index";
|
||||
"stage" => "serialize",
|
||||
"cause" => if buffer_limit_reached { "buffer-limit" } else { "commit" },
|
||||
"maxdoc" => max_doc,
|
||||
"last_docstamp" => last_docstamp
|
||||
);
|
||||
|
||||
let delete_bitset_opt = apply_deletes(
|
||||
&segment_with_max_doc,
|
||||
@@ -241,7 +251,18 @@ fn index_documents(
|
||||
delete_cursor,
|
||||
delete_bitset_opt,
|
||||
);
|
||||
|
||||
info!(
|
||||
logger,
|
||||
"segment-index";
|
||||
"stage" => "publish",
|
||||
);
|
||||
block_on(segment_updater.schedule_add_segment(segment_entry))?;
|
||||
info!(
|
||||
logger,
|
||||
"segment-index";
|
||||
"stage" => "end",
|
||||
);
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
@@ -344,6 +365,10 @@ impl IndexWriter {
|
||||
Ok(index_writer)
|
||||
}
|
||||
|
||||
pub(crate) fn logger(&self) -> &Logger {
|
||||
self.index.logger()
|
||||
}
|
||||
|
||||
fn drop_sender(&mut self) {
|
||||
let (sender, _receiver) = channel::bounded(1);
|
||||
self.operation_sender = sender;
|
||||
@@ -352,6 +377,8 @@ impl IndexWriter {
|
||||
/// If there are some merging threads, blocks until they all finish their work and
|
||||
/// then drop the `IndexWriter`.
|
||||
pub fn wait_merging_threads(mut self) -> crate::Result<()> {
|
||||
info!(self.logger(), "wait-merge-threads"; "stage"=>"start");
|
||||
|
||||
// this will stop the indexing thread,
|
||||
// dropping the last reference to the segment_updater.
|
||||
self.drop_sender();
|
||||
@@ -372,9 +399,9 @@ impl IndexWriter {
|
||||
.map_err(|_| TantivyError::ErrorInThread("Failed to join merging thread.".into()));
|
||||
|
||||
if let Err(ref e) = result {
|
||||
error!("Some merging thread failed {:?}", e);
|
||||
error!(self.logger(), "some merge thread failed"; "cause"=>e.to_string());
|
||||
}
|
||||
|
||||
info!(self.logger(), "wait-merge-threads"; "stage"=>"stop");
|
||||
result
|
||||
}
|
||||
|
||||
@@ -434,12 +461,16 @@ impl IndexWriter {
|
||||
return Ok(());
|
||||
}
|
||||
let segment = index.new_segment();
|
||||
let segment_id = segment.id();
|
||||
index_documents(
|
||||
mem_budget,
|
||||
segment,
|
||||
&mut document_iterator,
|
||||
&mut segment_updater,
|
||||
delete_cursor.clone(),
|
||||
&index
|
||||
.logger()
|
||||
.new(slog::o!("segment"=>segment_id.to_string())),
|
||||
)?;
|
||||
}
|
||||
})?;
|
||||
@@ -553,7 +584,10 @@ impl IndexWriter {
|
||||
///
|
||||
/// The opstamp at the last commit is returned.
|
||||
pub fn rollback(&mut self) -> crate::Result<Opstamp> {
|
||||
info!("Rolling back to opstamp {}", self.committed_opstamp);
|
||||
info!(
|
||||
self.logger(),
|
||||
"Rolling back to opstamp {}", self.committed_opstamp
|
||||
);
|
||||
// marks the segment updater as killed. From now on, all
|
||||
// segment updates will be ignored.
|
||||
self.segment_updater.kill();
|
||||
@@ -610,6 +644,8 @@ impl IndexWriter {
|
||||
/// using this API.
|
||||
/// See [`PreparedCommit::set_payload()`](PreparedCommit.html)
|
||||
pub fn prepare_commit(&mut self) -> crate::Result<PreparedCommit> {
|
||||
let logger = self.logger().clone();
|
||||
|
||||
// Here, because we join all of the worker threads,
|
||||
// all of the segment update for this commit have been
|
||||
// sent.
|
||||
@@ -620,7 +656,10 @@ impl IndexWriter {
|
||||
//
|
||||
// This will move uncommitted segments to the state of
|
||||
// committed segments.
|
||||
info!("Preparing commit");
|
||||
|
||||
let commit_opstamp = self.stamper.stamp();
|
||||
|
||||
info!(logger, "prepare-commit"; "opstamp" => commit_opstamp);
|
||||
|
||||
// this will drop the current document channel
|
||||
// and recreate a new one.
|
||||
@@ -636,9 +675,8 @@ impl IndexWriter {
|
||||
self.add_indexing_worker()?;
|
||||
}
|
||||
|
||||
let commit_opstamp = self.stamper.stamp();
|
||||
let prepared_commit = PreparedCommit::new(self, commit_opstamp);
|
||||
info!("Prepared commit {}", commit_opstamp);
|
||||
info!(logger, "Prepared commit {}", commit_opstamp);
|
||||
Ok(prepared_commit)
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use super::IndexWriter;
|
||||
use crate::Opstamp;
|
||||
use futures::executor::block_on;
|
||||
use slog::info;
|
||||
|
||||
/// A prepared commit
|
||||
pub struct PreparedCommit<'a> {
|
||||
@@ -31,7 +32,7 @@ impl<'a> PreparedCommit<'a> {
|
||||
}
|
||||
|
||||
pub fn commit(self) -> crate::Result<Opstamp> {
|
||||
info!("committing {}", self.opstamp);
|
||||
info!(self.index_writer.logger(), "committing {}", self.opstamp);
|
||||
let _ = block_on(
|
||||
self.index_writer
|
||||
.segment_updater()
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
use slog::{warn, Logger};
|
||||
|
||||
use super::segment_register::SegmentRegister;
|
||||
use crate::core::SegmentId;
|
||||
use crate::core::SegmentMeta;
|
||||
@@ -42,9 +44,9 @@ impl SegmentRegisters {
|
||||
///
|
||||
/// It guarantees the atomicity of the
|
||||
/// changes (merges especially)
|
||||
#[derive(Default)]
|
||||
pub struct SegmentManager {
|
||||
registers: RwLock<SegmentRegisters>,
|
||||
logger: Logger,
|
||||
}
|
||||
|
||||
impl Debug for SegmentManager {
|
||||
@@ -77,12 +79,14 @@ impl SegmentManager {
|
||||
pub fn from_segments(
|
||||
segment_metas: Vec<SegmentMeta>,
|
||||
delete_cursor: &DeleteCursor,
|
||||
logger: Logger,
|
||||
) -> SegmentManager {
|
||||
SegmentManager {
|
||||
registers: RwLock::new(SegmentRegisters {
|
||||
uncommitted: SegmentRegister::default(),
|
||||
committed: SegmentRegister::new(segment_metas, delete_cursor),
|
||||
}),
|
||||
logger,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -186,7 +190,7 @@ impl SegmentManager {
|
||||
let segments_status = registers_lock
|
||||
.segments_status(before_merge_segment_ids)
|
||||
.ok_or_else(|| {
|
||||
warn!("couldn't find segment in SegmentManager");
|
||||
warn!(self.logger, "couldn't find segment in SegmentManager");
|
||||
crate::TantivyError::InvalidArgument(
|
||||
"The segments that were merged could not be found in the SegmentManager. \
|
||||
This is not necessarily a bug, and can happen after a rollback for instance."
|
||||
|
||||
@@ -23,9 +23,9 @@ use futures::channel::oneshot;
|
||||
use futures::executor::{ThreadPool, ThreadPoolBuilder};
|
||||
use futures::future::Future;
|
||||
use futures::future::TryFutureExt;
|
||||
use slog::{debug, error, info, warn};
|
||||
use std::borrow::BorrowMut;
|
||||
use std::collections::HashSet;
|
||||
use std::io::Write;
|
||||
use std::ops::Deref;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
@@ -65,12 +65,11 @@ pub fn save_new_metas(schema: Schema, directory: &mut dyn Directory) -> crate::R
|
||||
///
|
||||
/// This method is not part of tantivy's public API
|
||||
fn save_metas(metas: &IndexMeta, directory: &mut dyn Directory) -> crate::Result<()> {
|
||||
info!("save metas");
|
||||
let mut buffer = serde_json::to_vec_pretty(metas)?;
|
||||
let mut meta_json = serde_json::to_string_pretty(metas)?;
|
||||
// Just adding a new line at the end of the buffer.
|
||||
writeln!(&mut buffer)?;
|
||||
directory.atomic_write(&META_FILEPATH, &buffer[..])?;
|
||||
debug!("Saved metas {:?}", serde_json::to_string_pretty(&metas));
|
||||
meta_json.push_str("\n");
|
||||
debug!(directory.logger(), "save meta"; "content"=>&meta_json);
|
||||
directory.atomic_write(&META_FILEPATH, meta_json.as_bytes())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -97,7 +96,6 @@ impl Deref for SegmentUpdater {
|
||||
async fn garbage_collect_files(
|
||||
segment_updater: SegmentUpdater,
|
||||
) -> crate::Result<GarbageCollectionResult> {
|
||||
info!("Running garbage collection");
|
||||
let mut index = segment_updater.index.clone();
|
||||
index
|
||||
.directory_mut()
|
||||
@@ -107,14 +105,12 @@ async fn garbage_collect_files(
|
||||
/// Merges a list of segments the list of segment givens in the `segment_entries`.
|
||||
/// This function happens in the calling thread and is computationally expensive.
|
||||
fn merge(
|
||||
merged_segment: Segment,
|
||||
index: &Index,
|
||||
mut segment_entries: Vec<SegmentEntry>,
|
||||
target_opstamp: Opstamp,
|
||||
) -> crate::Result<SegmentEntry> {
|
||||
// first we need to apply deletes to our segment.
|
||||
let merged_segment = index.new_segment();
|
||||
|
||||
// First we apply all of the delet to the merged segment, up to the target opstamp.
|
||||
// First we apply all of the delete to the merged segment, up to the target opstamp.
|
||||
for segment_entry in &mut segment_entries {
|
||||
let segment = index.segment(segment_entry.meta().clone());
|
||||
advance_deletes(segment, segment_entry, target_opstamp)?;
|
||||
@@ -167,7 +163,8 @@ impl SegmentUpdater {
|
||||
delete_cursor: &DeleteCursor,
|
||||
) -> crate::Result<SegmentUpdater> {
|
||||
let segments = index.searchable_segment_metas()?;
|
||||
let segment_manager = SegmentManager::from_segments(segments, delete_cursor);
|
||||
let segment_manager =
|
||||
SegmentManager::from_segments(segments, delete_cursor, index.logger().clone());
|
||||
let pool = ThreadPoolBuilder::new()
|
||||
.name_prefix("segment_updater")
|
||||
.pool_size(1)
|
||||
@@ -387,7 +384,18 @@ impl SegmentUpdater {
|
||||
.segment_manager
|
||||
.start_merge(merge_operation.segment_ids())?;
|
||||
|
||||
info!("Starting merge - {:?}", merge_operation.segment_ids());
|
||||
let segment_ids_str: String = merge_operation
|
||||
.segment_ids()
|
||||
.iter()
|
||||
.map(|segment_id| segment_id.to_string())
|
||||
.collect::<Vec<String>>()
|
||||
.join(",");
|
||||
|
||||
let merged_segment = self.index.new_segment();
|
||||
let logger = self.index.logger().new(slog::o!("segments"=>segment_ids_str, "merged-segment"=>merged_segment.id().to_string()));
|
||||
|
||||
let num_merges: usize = self.merge_operations.list().len();
|
||||
slog::info!(&logger, "merge"; "stage"=>"start", "num-merges" => num_merges);
|
||||
|
||||
let (merging_future_send, merging_future_recv) =
|
||||
oneshot::channel::<crate::Result<SegmentMeta>>();
|
||||
@@ -398,22 +406,20 @@ impl SegmentUpdater {
|
||||
// as well as which segment is currently in merge and therefore should not be
|
||||
// candidate for another merge.
|
||||
match merge(
|
||||
merged_segment,
|
||||
&segment_updater.index,
|
||||
segment_entries,
|
||||
merge_operation.target_opstamp(),
|
||||
) {
|
||||
Ok(after_merge_segment_entry) => {
|
||||
info!(&logger, "merge"; "stage" => "end");
|
||||
let segment_meta = segment_updater
|
||||
.end_merge(merge_operation, after_merge_segment_entry)
|
||||
.await;
|
||||
let _send_result = merging_future_send.send(segment_meta);
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"Merge of {:?} was cancelled: {:?}",
|
||||
merge_operation.segment_ids().to_vec(),
|
||||
e
|
||||
);
|
||||
error!(&logger, "merge"; "stage" => "fail", "cause"=>e.to_string());
|
||||
// ... cancel merge
|
||||
if cfg!(test) {
|
||||
panic!("Merge failed.");
|
||||
@@ -454,11 +460,12 @@ impl SegmentUpdater {
|
||||
.collect::<Vec<_>>();
|
||||
merge_candidates.extend(committed_merge_candidates.into_iter());
|
||||
|
||||
let logger = self.index.logger();
|
||||
for merge_operation in merge_candidates {
|
||||
if let Err(err) = self.start_merge(merge_operation) {
|
||||
warn!(
|
||||
"Starting the merge failed for the following reason. This is not fatal. {}",
|
||||
err
|
||||
logger,
|
||||
"merge-start-fail (not fatal, not necessarily a problem)"; "reason" => format!("{}", err),
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -471,8 +478,11 @@ impl SegmentUpdater {
|
||||
) -> impl Future<Output = crate::Result<SegmentMeta>> {
|
||||
let segment_updater = self.clone();
|
||||
let after_merge_segment_meta = after_merge_segment_entry.meta().clone();
|
||||
let logger = self.index.logger().new(
|
||||
slog::o!("segment"=>after_merge_segment_meta.id().to_string(),
|
||||
"delete-opstamp"=>after_merge_segment_meta.delete_opstamp()),
|
||||
);
|
||||
let end_merge_future = self.schedule_future(async move {
|
||||
info!("End merge {:?}", after_merge_segment_entry.meta());
|
||||
{
|
||||
let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone();
|
||||
if let Some(delete_operation) = delete_cursor.get() {
|
||||
@@ -486,6 +496,7 @@ impl SegmentUpdater {
|
||||
committed_opstamp,
|
||||
) {
|
||||
error!(
|
||||
logger,
|
||||
"Merge of {:?} was cancelled (advancing deletes failed): {:?}",
|
||||
merge_operation.segment_ids(),
|
||||
e
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
use super::operation::AddOperation;
|
||||
use crate::core::Segment;
|
||||
use crate::core::SerializableSegment;
|
||||
use crate::fastfield::FastFieldsWriter;
|
||||
use crate::fieldnorm::{FieldNormReaders, FieldNormsWriter};
|
||||
@@ -15,6 +14,7 @@ use crate::tokenizer::{BoxTokenStream, PreTokenizedStream};
|
||||
use crate::tokenizer::{FacetTokenizer, TextAnalyzer};
|
||||
use crate::tokenizer::{TokenStreamChain, Tokenizer};
|
||||
use crate::Opstamp;
|
||||
use crate::{core::Segment, tokenizer::MAX_TOKEN_LEN};
|
||||
use crate::{DocId, SegmentComponent};
|
||||
use std::io;
|
||||
|
||||
@@ -146,6 +146,9 @@ impl SegmentWriter {
|
||||
for fake_str in facets {
|
||||
let mut unordered_term_id_opt = None;
|
||||
FacetTokenizer.token_stream(fake_str).process(&mut |token| {
|
||||
if token.text.len() > MAX_TOKEN_LEN {
|
||||
return;
|
||||
}
|
||||
term_buffer.set_text(&token.text);
|
||||
let unordered_term_id =
|
||||
multifield_postings.subscribe(doc_id, &term_buffer);
|
||||
|
||||
@@ -101,9 +101,6 @@
|
||||
#[cfg_attr(test, macro_use)]
|
||||
extern crate serde_json;
|
||||
|
||||
#[macro_use]
|
||||
extern crate log;
|
||||
|
||||
#[macro_use]
|
||||
extern crate thiserror;
|
||||
|
||||
@@ -148,6 +145,7 @@ pub mod schema;
|
||||
pub mod space_usage;
|
||||
pub mod store;
|
||||
pub mod termdict;
|
||||
pub use slog;
|
||||
|
||||
mod reader;
|
||||
|
||||
|
||||
@@ -227,23 +227,17 @@ pub trait PostingsWriter {
|
||||
term_buffer.set_field(field);
|
||||
let mut sink = |token: &Token| {
|
||||
// We skip all tokens with a len greater than u16.
|
||||
if token.text.len() <= MAX_TOKEN_LEN {
|
||||
term_buffer.set_text(token.text.as_str());
|
||||
self.subscribe(
|
||||
term_index,
|
||||
doc_id,
|
||||
token.position as u32,
|
||||
&term_buffer,
|
||||
heap,
|
||||
);
|
||||
} else {
|
||||
info!(
|
||||
"A token exceeding MAX_TOKEN_LEN ({}>{}) was dropped. Search for \
|
||||
MAX_TOKEN_LEN in the documentation for more information.",
|
||||
token.text.len(),
|
||||
MAX_TOKEN_LEN
|
||||
);
|
||||
if token.text.len() > MAX_TOKEN_LEN {
|
||||
return;
|
||||
}
|
||||
term_buffer.set_text(token.text.as_str());
|
||||
self.subscribe(
|
||||
term_index,
|
||||
doc_id,
|
||||
token.position as u32,
|
||||
&term_buffer,
|
||||
heap,
|
||||
);
|
||||
};
|
||||
token_stream.process(&mut sink)
|
||||
}
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
mod pool;
|
||||
|
||||
use slog::error;
|
||||
|
||||
pub use self::pool::LeasedItem;
|
||||
use self::pool::Pool;
|
||||
use crate::core::Segment;
|
||||
@@ -62,6 +64,7 @@ impl IndexReaderBuilder {
|
||||
/// to open different segment readers. It may take hundreds of milliseconds
|
||||
/// of time and it may return an error.
|
||||
pub fn try_into(self) -> crate::Result<IndexReader> {
|
||||
let logger = self.index.logger().clone();
|
||||
let inner_reader = InnerIndexReader {
|
||||
index: self.index,
|
||||
num_searchers: self.num_searchers,
|
||||
@@ -80,8 +83,8 @@ impl IndexReaderBuilder {
|
||||
let callback = move || {
|
||||
if let Err(err) = inner_reader_arc_clone.reload() {
|
||||
error!(
|
||||
"Error while loading searcher after commit was detected. {:?}",
|
||||
err
|
||||
logger,
|
||||
"Error while loading searcher after commit was detected. {:?}", err
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user