mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2025-12-29 21:42:55 +00:00
Compare commits
2 Commits
slog
...
limit-rand
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9eb87e91cc | ||
|
|
36f43da4d8 |
@@ -1,8 +1,6 @@
|
||||
Tantivy 0.14.0
|
||||
=========================
|
||||
- Remove dependency to atomicwrites #833. Implemented by @pmasurel upon suggestion and research from @asafigan).
|
||||
- Migrated tantivy error from the now deprecated `failure` crate to `thiserror` #760. (@hirevo)
|
||||
- Switched to structure logging (via the `slog` crate). (@pmasurel)
|
||||
- Remove dependency to atomicwrites #833 .Implemented by @pmasurel upon suggestion and research from @asafigan).
|
||||
|
||||
Tantivy 0.13.1
|
||||
===================
|
||||
|
||||
@@ -23,8 +23,7 @@ memmap = {version = "0.7", optional=true}
|
||||
lz4 = {version="1", optional=true}
|
||||
snap = "1"
|
||||
tempfile = {version="3", optional=true}
|
||||
slog = "2.5"
|
||||
slog-stdlog = "4"
|
||||
log = "0.4"
|
||||
serde = {version="1", features=["derive"]}
|
||||
serde_json = "1"
|
||||
num_cpus = "1"
|
||||
@@ -43,7 +42,7 @@ bitpacking = {version="0.8", default-features = false, features=["bitpacker4x"]}
|
||||
census = "0.4"
|
||||
fnv = "1"
|
||||
owned-read = "0.4"
|
||||
thiserror = "1.0"
|
||||
failure = "0.1"
|
||||
htmlescape = "0.3"
|
||||
fail = "0.4"
|
||||
murmurhash32 = "0.2"
|
||||
|
||||
50
doc/src/index-format.md
Normal file
50
doc/src/index-format.md
Normal file
@@ -0,0 +1,50 @@
|
||||
|
||||
# Managed files
|
||||
+----------+-----------+-------------------+
|
||||
| content | footer | footer_len: u32 |
|
||||
+----------+-----------+-------------------+
|
||||
|
||||
# Term Dictionary (Composite File)
|
||||
|
||||
+---------+---------------------------+------------------------+
|
||||
| fst | term_info_store | footer_len: u64 |
|
||||
+---------+---------------------------+------------------------+
|
||||
|
||||
During a merge the term info store need to fit in memory.
|
||||
It has a cost of n bytes per term.
|
||||
|
||||
# term_info_store
|
||||
+-------------------+---------------------------+------------------------+
|
||||
| len_block_meta | block_meta | term_infos |
|
||||
+-------------------+---------------------------+------------------------+
|
||||
|
||||
# inverted_index
|
||||
+------------------------+---------------------------+------------------------+
|
||||
| total_num_tokens: u64 | posting_lists.. | term_infos |
|
||||
+------------------------+---------------------------+------------------------+
|
||||
|
||||
# postings lists
|
||||
+------------------------+---------------------------+------------------------+
|
||||
|
|
||||
+
|
||||
|
||||
# composite file
|
||||
+----------------+-----+----------------+----------------------+----------------+
|
||||
| field file 1 | ... | field field n |composite file footer | footer len: u32|
|
||||
+----------------+-----+----------------+----------------------+----------------+
|
||||
|
||||
# composite file footer
|
||||
|
||||
+-----------------+---------------------------------------+
|
||||
|num fields: vint | (file_addr, offset_delta: vint) []... |
|
||||
+-----------------+---------------------------------------+
|
||||
|
||||
# FileAddr
|
||||
+--------------+--------------+
|
||||
| field: u32 | idx: VInt |
|
||||
+--------------+--------------+
|
||||
|
||||
# Posting lists
|
||||
+-----------------------------------------+
|
||||
| skip_reader
|
||||
+-----------------------------------------+
|
||||
@@ -1,6 +1,5 @@
|
||||
use crossbeam::channel;
|
||||
use rayon::{ThreadPool, ThreadPoolBuilder};
|
||||
use slog::{error, Logger};
|
||||
|
||||
/// Search executor whether search request are single thread or multithread.
|
||||
///
|
||||
@@ -44,7 +43,6 @@ impl Executor {
|
||||
&self,
|
||||
f: F,
|
||||
args: AIterator,
|
||||
logger: Logger,
|
||||
) -> crate::Result<Vec<R>> {
|
||||
match self {
|
||||
Executor::SingleThread => args.map(f).collect::<crate::Result<_>>(),
|
||||
@@ -59,7 +57,7 @@ impl Executor {
|
||||
let (idx, arg) = arg_with_idx;
|
||||
let fruit = f(arg);
|
||||
if let Err(err) = fruit_sender.send((idx, fruit)) {
|
||||
error!(logger, "Failed to send search task. It probably means all search threads have panicked. {:?}", err);
|
||||
error!("Failed to send search task. It probably means all search threads have panicked. {:?}", err);
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -89,21 +87,17 @@ impl Executor {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use slog::{o, Discard, Logger};
|
||||
|
||||
use super::Executor;
|
||||
|
||||
#[test]
|
||||
#[should_panic(expected = "panic should propagate")]
|
||||
fn test_panic_propagates_single_thread() {
|
||||
let logger = Logger::root(Discard, o!());
|
||||
let _result: Vec<usize> = Executor::single_thread()
|
||||
.map(
|
||||
|_| {
|
||||
panic!("panic should propagate");
|
||||
},
|
||||
vec![0].into_iter(),
|
||||
logger,
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
@@ -111,7 +105,6 @@ mod tests {
|
||||
#[test]
|
||||
#[should_panic] //< unfortunately the panic message is not propagated
|
||||
fn test_panic_propagates_multi_thread() {
|
||||
let logger = Logger::root(Discard, o!());
|
||||
let _result: Vec<usize> = Executor::multi_thread(1, "search-test")
|
||||
.unwrap()
|
||||
.map(
|
||||
@@ -119,16 +112,14 @@ mod tests {
|
||||
panic!("panic should propagate");
|
||||
},
|
||||
vec![0].into_iter(),
|
||||
logger,
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_map_singlethread() {
|
||||
let logger = Logger::root(Discard, o!());
|
||||
let result: Vec<usize> = Executor::single_thread()
|
||||
.map(|i| Ok(i * 2), 0..1_000, logger)
|
||||
.map(|i| Ok(i * 2), 0..1_000)
|
||||
.unwrap();
|
||||
assert_eq!(result.len(), 1_000);
|
||||
for i in 0..1_000 {
|
||||
@@ -138,10 +129,9 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_map_multithread() {
|
||||
let logger = Logger::root(Discard, o!());
|
||||
let result: Vec<usize> = Executor::multi_thread(3, "search-test")
|
||||
.unwrap()
|
||||
.map(|i| Ok(i * 2), 0..10, logger)
|
||||
.map(|i| Ok(i * 2), 0..10)
|
||||
.unwrap();
|
||||
assert_eq!(result.len(), 10);
|
||||
for i in 0..10 {
|
||||
|
||||
@@ -21,7 +21,6 @@ use crate::schema::FieldType;
|
||||
use crate::schema::Schema;
|
||||
use crate::tokenizer::{TextAnalyzer, TokenizerManager};
|
||||
use crate::IndexWriter;
|
||||
use slog::Logger;
|
||||
use std::borrow::BorrowMut;
|
||||
use std::collections::HashSet;
|
||||
use std::fmt;
|
||||
@@ -58,14 +57,7 @@ pub struct Index {
|
||||
}
|
||||
|
||||
impl Index {
|
||||
|
||||
pub(crate) fn logger(&self) -> &Logger {
|
||||
self.directory.logger()
|
||||
}
|
||||
|
||||
/// Examines the directory to see if it contains an index.
|
||||
///
|
||||
/// Effectively, it only checks for the presence of the `meta.json` file.
|
||||
/// Examines the director to see if it contains an index
|
||||
pub fn exists<Dir: Directory>(dir: &Dir) -> bool {
|
||||
dir.exists(&META_FILEPATH)
|
||||
}
|
||||
@@ -148,18 +140,16 @@ impl Index {
|
||||
Index::create(mmap_directory, schema)
|
||||
}
|
||||
|
||||
/// Creates a new index given an implementation of the trait `Directory`.
|
||||
///
|
||||
/// If a directory previously existed, it will be erased.
|
||||
/// Creates a new index given an implementation of the trait `Directory`
|
||||
pub fn create<Dir: Directory>(dir: Dir, schema: Schema) -> crate::Result<Index> {
|
||||
let directory = ManagedDirectory::wrap(dir)?;
|
||||
Index::new_from_directory(directory, schema)
|
||||
Index::from_directory(directory, schema)
|
||||
}
|
||||
|
||||
/// Create a new index from a directory.
|
||||
///
|
||||
/// This will overwrite existing meta.json
|
||||
fn new_from_directory(mut directory: ManagedDirectory, schema: Schema) -> crate::Result<Index> {
|
||||
fn from_directory(mut directory: ManagedDirectory, schema: Schema) -> crate::Result<Index> {
|
||||
save_new_metas(schema.clone(), directory.borrow_mut())?;
|
||||
let metas = IndexMeta::with_schema(schema);
|
||||
Index::create_from_metas(directory, &metas, SegmentMetaInventory::default())
|
||||
@@ -250,8 +240,6 @@ impl Index {
|
||||
|
||||
/// Open the index using the provided directory
|
||||
pub fn open<D: Directory>(directory: D) -> crate::Result<Index> {
|
||||
let logger: &Logger = directory.logger();
|
||||
slog::info!(logger, "index-open"; "directory" => format!("{:?}", directory));
|
||||
let directory = ManagedDirectory::wrap(directory)?;
|
||||
let inventory = SegmentMetaInventory::default();
|
||||
let metas = load_metas(&directory, &inventory)?;
|
||||
|
||||
@@ -116,6 +116,7 @@ impl SegmentMeta {
|
||||
SegmentComponent::FASTFIELDS => ".fast".to_string(),
|
||||
SegmentComponent::FIELDNORMS => ".fieldnorm".to_string(),
|
||||
SegmentComponent::DELETE => format!(".{}.del", self.delete_opstamp().unwrap_or(0)),
|
||||
SegmentComponent::FIELDSTATS => ".fieldstats".to_string(),
|
||||
});
|
||||
PathBuf::from(path)
|
||||
}
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
use crate::common::BinarySerializable;
|
||||
use crate::directory::ReadOnlySource;
|
||||
use crate::positions::PositionReader;
|
||||
use crate::postings::TermInfo;
|
||||
@@ -36,14 +35,12 @@ impl InvertedIndexReader {
|
||||
postings_source: ReadOnlySource,
|
||||
positions_source: ReadOnlySource,
|
||||
positions_idx_source: ReadOnlySource,
|
||||
total_num_tokens: u64,
|
||||
record_option: IndexRecordOption,
|
||||
) -> InvertedIndexReader {
|
||||
let total_num_tokens_data = postings_source.slice(0, 8);
|
||||
let mut total_num_tokens_cursor = total_num_tokens_data.as_slice();
|
||||
let total_num_tokens = u64::deserialize(&mut total_num_tokens_cursor).unwrap_or(0u64);
|
||||
InvertedIndexReader {
|
||||
termdict,
|
||||
postings_source: postings_source.slice_from(8),
|
||||
postings_source,
|
||||
positions_source,
|
||||
positions_idx_source,
|
||||
record_option,
|
||||
@@ -89,7 +86,7 @@ impl InvertedIndexReader {
|
||||
term_info: &TermInfo,
|
||||
block_postings: &mut BlockSegmentPostings,
|
||||
) {
|
||||
let offset = term_info.postings_offset as usize;
|
||||
let offset = term_info.postings_start_offset as usize;
|
||||
let end_source = self.postings_source.len();
|
||||
let postings_slice = self.postings_source.slice(offset, end_source);
|
||||
block_postings.reset(term_info.doc_freq, postings_slice);
|
||||
@@ -117,8 +114,10 @@ impl InvertedIndexReader {
|
||||
term_info: &TermInfo,
|
||||
requested_option: IndexRecordOption,
|
||||
) -> BlockSegmentPostings {
|
||||
let offset = term_info.postings_offset as usize;
|
||||
let postings_data = self.postings_source.slice_from(offset);
|
||||
let postings_data = self.postings_source.slice(
|
||||
term_info.postings_start_offset as usize,
|
||||
term_info.postings_end_offset as usize,
|
||||
);
|
||||
BlockSegmentPostings::from_data(
|
||||
term_info.doc_freq,
|
||||
postings_data,
|
||||
|
||||
@@ -143,7 +143,6 @@ impl Searcher {
|
||||
collector.collect_segment(weight.as_ref(), segment_ord as u32, segment_reader)
|
||||
},
|
||||
segment_readers.iter().enumerate(),
|
||||
self.index.logger().clone(),
|
||||
)?;
|
||||
collector.merge_fruits(fruits)
|
||||
}
|
||||
|
||||
@@ -24,14 +24,17 @@ pub enum SegmentComponent {
|
||||
/// Accessing a document from the store is relatively slow, as it
|
||||
/// requires to decompress the entire block it belongs to.
|
||||
STORE,
|
||||
|
||||
/// Bitset describing which document of the segment is deleted.
|
||||
DELETE,
|
||||
|
||||
FIELDSTATS,
|
||||
}
|
||||
|
||||
impl SegmentComponent {
|
||||
/// Iterates through the components.
|
||||
pub fn iterator() -> slice::Iter<'static, SegmentComponent> {
|
||||
static SEGMENT_COMPONENTS: [SegmentComponent; 8] = [
|
||||
static SEGMENT_COMPONENTS: [SegmentComponent; 9] = [
|
||||
SegmentComponent::POSTINGS,
|
||||
SegmentComponent::POSITIONS,
|
||||
SegmentComponent::POSITIONSSKIP,
|
||||
@@ -40,6 +43,7 @@ impl SegmentComponent {
|
||||
SegmentComponent::TERMS,
|
||||
SegmentComponent::STORE,
|
||||
SegmentComponent::DELETE,
|
||||
SegmentComponent::FIELDSTATS,
|
||||
];
|
||||
SEGMENT_COMPONENTS.iter()
|
||||
}
|
||||
|
||||
@@ -21,12 +21,6 @@ use std::sync::atomic;
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||
pub struct SegmentId(Uuid);
|
||||
|
||||
impl ToString for SegmentId {
|
||||
fn to_string(&self) -> String {
|
||||
self.short_uuid_string()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
static AUTO_INC_COUNTER: Lazy<atomic::AtomicUsize> = Lazy::new(|| atomic::AtomicUsize::default());
|
||||
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
use crate::common::CompositeFile;
|
||||
use crate::common::HasLen;
|
||||
use crate::core::InvertedIndexReader;
|
||||
use crate::core::Segment;
|
||||
@@ -16,8 +15,8 @@ use crate::space_usage::SegmentSpaceUsage;
|
||||
use crate::store::StoreReader;
|
||||
use crate::termdict::TermDictionary;
|
||||
use crate::DocId;
|
||||
use crate::{common::CompositeFile, postings::FieldStats};
|
||||
use fail::fail_point;
|
||||
use slog::{warn, Logger};
|
||||
use std::collections::HashMap;
|
||||
use std::fmt;
|
||||
use std::sync::Arc;
|
||||
@@ -50,11 +49,11 @@ pub struct SegmentReader {
|
||||
positions_idx_composite: CompositeFile,
|
||||
fast_fields_readers: Arc<FastFieldReaders>,
|
||||
fieldnorm_readers: FieldNormReaders,
|
||||
field_stats: FieldStats,
|
||||
|
||||
store_source: ReadOnlySource,
|
||||
delete_bitset_opt: Option<DeleteBitSet>,
|
||||
schema: Schema,
|
||||
logger: Logger,
|
||||
}
|
||||
|
||||
impl SegmentReader {
|
||||
@@ -181,6 +180,9 @@ impl SegmentReader {
|
||||
let fieldnorm_data = segment.open_read(SegmentComponent::FIELDNORMS)?;
|
||||
let fieldnorm_readers = FieldNormReaders::open(fieldnorm_data)?;
|
||||
|
||||
let field_stats_data = segment.open_read(SegmentComponent::FIELDSTATS)?;
|
||||
let field_stats = FieldStats::from_source(field_stats_data.as_slice())?;
|
||||
|
||||
let delete_bitset_opt = if segment.meta().has_deletes() {
|
||||
let delete_data = segment.open_read(SegmentComponent::DELETE)?;
|
||||
Some(DeleteBitSet::open(delete_data))
|
||||
@@ -196,13 +198,13 @@ impl SegmentReader {
|
||||
postings_composite,
|
||||
fast_fields_readers: fast_field_readers,
|
||||
fieldnorm_readers,
|
||||
field_stats,
|
||||
segment_id: segment.id(),
|
||||
store_source,
|
||||
delete_bitset_opt,
|
||||
positions_composite,
|
||||
positions_idx_composite,
|
||||
schema,
|
||||
logger: segment.index().logger().clone(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -232,11 +234,7 @@ impl SegmentReader {
|
||||
let record_option_opt = field_type.get_index_record_option();
|
||||
|
||||
if record_option_opt.is_none() {
|
||||
warn!(
|
||||
self.logger,
|
||||
"Field {:?} does not seem indexed.",
|
||||
field_entry.name()
|
||||
);
|
||||
warn!("Field {:?} does not seem indexed.", field_entry.name());
|
||||
}
|
||||
|
||||
let postings_source_opt = self.postings_composite.open_read(field);
|
||||
@@ -267,11 +265,17 @@ impl SegmentReader {
|
||||
.open_read(field)
|
||||
.expect("Index corrupted. Failed to open field positions in composite file.");
|
||||
|
||||
let total_num_tokens = self
|
||||
.field_stats
|
||||
.get(field)
|
||||
.map(|field_stat| field_stat.num_tokens())
|
||||
.unwrap_or(0u64);
|
||||
let inv_idx_reader = Arc::new(InvertedIndexReader::new(
|
||||
TermDictionary::from_source(&termdict_source),
|
||||
postings_source,
|
||||
positions_source,
|
||||
positions_idx_source,
|
||||
total_num_tokens,
|
||||
record_option,
|
||||
));
|
||||
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
use slog::{error, Logger};
|
||||
|
||||
use crate::directory::directory_lock::Lock;
|
||||
use crate::directory::error::LockError;
|
||||
use crate::directory::error::{DeleteError, OpenReadError, OpenWriteError};
|
||||
@@ -66,10 +64,7 @@ impl<T: Send + Sync + 'static> From<Box<T>> for DirectoryLock {
|
||||
impl Drop for DirectoryLockGuard {
|
||||
fn drop(&mut self) {
|
||||
if let Err(e) = self.directory.delete(&*self.path) {
|
||||
error!(
|
||||
self.directory.logger(),
|
||||
"Failed to remove the lock file. {:?}", e
|
||||
);
|
||||
error!("Failed to remove the lock file. {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -85,7 +80,7 @@ fn try_acquire_lock(
|
||||
) -> Result<DirectoryLock, TryAcquireLockError> {
|
||||
let mut write = directory.open_write(filepath).map_err(|e| match e {
|
||||
OpenWriteError::FileAlreadyExists(_) => TryAcquireLockError::FileExists,
|
||||
OpenWriteError::IOError { io_error, .. } => TryAcquireLockError::IOError(io_error),
|
||||
OpenWriteError::IOError(io_error) => TryAcquireLockError::IOError(io_error.into()),
|
||||
})?;
|
||||
write.flush().map_err(TryAcquireLockError::IOError)?;
|
||||
Ok(DirectoryLock::from(Box::new(DirectoryLockGuard {
|
||||
@@ -214,9 +209,6 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
|
||||
/// `OnCommit` `ReloadPolicy`. Not implementing watch in a `Directory` only prevents the
|
||||
/// `OnCommit` `ReloadPolicy` to work properly.
|
||||
fn watch(&self, watch_callback: WatchCallback) -> crate::Result<WatchHandle>;
|
||||
|
||||
/// Returns the `slog::Logger` configured for the `Directory`.
|
||||
fn logger(&self) -> &Logger;
|
||||
}
|
||||
|
||||
/// DirectoryClone
|
||||
|
||||
@@ -1,60 +1,160 @@
|
||||
use crate::Version;
|
||||
use std::error::Error as StdError;
|
||||
use std::fmt;
|
||||
use std::io;
|
||||
use std::path::PathBuf;
|
||||
|
||||
/// Error while trying to acquire a directory lock.
|
||||
#[derive(Debug, Error)]
|
||||
#[derive(Debug, Fail)]
|
||||
pub enum LockError {
|
||||
/// Failed to acquired a lock as it is already held by another
|
||||
/// client.
|
||||
/// - In the context of a blocking lock, this means the lock was not released within some `timeout` period.
|
||||
/// - In the context of a non-blocking lock, this means the lock was busy at the moment of the call.
|
||||
#[error("Could not acquire lock as it is already held, possibly by a different process.")]
|
||||
#[fail(
|
||||
display = "Could not acquire lock as it is already held, possibly by a different process."
|
||||
)]
|
||||
LockBusy,
|
||||
/// Trying to acquire a lock failed with an `IOError`
|
||||
#[error("Failed to acquire the lock due to an io:Error.")]
|
||||
#[fail(display = "Failed to acquire the lock due to an io:Error.")]
|
||||
IOError(io::Error),
|
||||
}
|
||||
|
||||
/// General IO error with an optional path to the offending file.
|
||||
#[derive(Debug)]
|
||||
pub struct IOError {
|
||||
path: Option<PathBuf>,
|
||||
err: io::Error,
|
||||
}
|
||||
|
||||
impl Into<io::Error> for IOError {
|
||||
fn into(self) -> io::Error {
|
||||
self.err
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for IOError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self.path {
|
||||
Some(ref path) => write!(f, "io error occurred on path '{:?}': '{}'", path, self.err),
|
||||
None => write!(f, "io error occurred: '{}'", self.err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl StdError for IOError {
|
||||
fn description(&self) -> &str {
|
||||
"io error occurred"
|
||||
}
|
||||
|
||||
fn cause(&self) -> Option<&dyn StdError> {
|
||||
Some(&self.err)
|
||||
}
|
||||
}
|
||||
|
||||
impl IOError {
|
||||
pub(crate) fn with_path(path: PathBuf, err: io::Error) -> Self {
|
||||
IOError {
|
||||
path: Some(path),
|
||||
err,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<io::Error> for IOError {
|
||||
fn from(err: io::Error) -> IOError {
|
||||
IOError { path: None, err }
|
||||
}
|
||||
}
|
||||
|
||||
/// Error that may occur when opening a directory
|
||||
#[derive(Debug, Error)]
|
||||
#[derive(Debug)]
|
||||
pub enum OpenDirectoryError {
|
||||
/// The underlying directory does not exists.
|
||||
#[error("Directory does not exist: '{0}'.")]
|
||||
DoesNotExist(PathBuf),
|
||||
/// The path exists but is not a directory.
|
||||
#[error("Path exists but is not a directory: '{0}'.")]
|
||||
NotADirectory(PathBuf),
|
||||
/// Failed to create a temp directory.
|
||||
#[error("Failed to create a temporary directory: '{0}'.")]
|
||||
FailedToCreateTempDir(io::Error),
|
||||
/// IoError
|
||||
#[error("IOError '{io_error:?}' while create directory in: '{directory_path:?}'.")]
|
||||
IoError {
|
||||
/// underlying io Error.
|
||||
io_error: io::Error,
|
||||
/// directory we tried to open.
|
||||
directory_path: PathBuf,
|
||||
},
|
||||
IoError(io::Error),
|
||||
}
|
||||
|
||||
impl From<io::Error> for OpenDirectoryError {
|
||||
fn from(io_err: io::Error) -> Self {
|
||||
OpenDirectoryError::IoError(io_err)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for OpenDirectoryError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match *self {
|
||||
OpenDirectoryError::DoesNotExist(ref path) => {
|
||||
write!(f, "the underlying directory '{:?}' does not exist", path)
|
||||
}
|
||||
OpenDirectoryError::NotADirectory(ref path) => {
|
||||
write!(f, "the path '{:?}' exists but is not a directory", path)
|
||||
}
|
||||
OpenDirectoryError::IoError(ref err) => write!(
|
||||
f,
|
||||
"IOError while trying to open/create the directory. {:?}",
|
||||
err
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl StdError for OpenDirectoryError {
|
||||
fn description(&self) -> &str {
|
||||
"error occurred while opening a directory"
|
||||
}
|
||||
|
||||
fn cause(&self) -> Option<&dyn StdError> {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Error that may occur when starting to write in a file
|
||||
#[derive(Debug, Error)]
|
||||
#[derive(Debug)]
|
||||
pub enum OpenWriteError {
|
||||
/// Our directory is WORM, writing an existing file is forbidden.
|
||||
/// Checkout the `Directory` documentation.
|
||||
#[error("File already exists: '{0}'")]
|
||||
FileAlreadyExists(PathBuf),
|
||||
/// Any kind of IO error that happens when
|
||||
/// writing in the underlying IO device.
|
||||
#[error("IOError '{io_error:?}' while opening file for write: '{filepath}'.")]
|
||||
IOError {
|
||||
/// The underlying `io::Error`.
|
||||
io_error: io::Error,
|
||||
/// File path of the file that tantivy failed to open for write.
|
||||
filepath: PathBuf,
|
||||
},
|
||||
IOError(IOError),
|
||||
}
|
||||
|
||||
impl From<IOError> for OpenWriteError {
|
||||
fn from(err: IOError) -> OpenWriteError {
|
||||
OpenWriteError::IOError(err)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for OpenWriteError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match *self {
|
||||
OpenWriteError::FileAlreadyExists(ref path) => {
|
||||
write!(f, "the file '{:?}' already exists", path)
|
||||
}
|
||||
OpenWriteError::IOError(ref err) => write!(
|
||||
f,
|
||||
"an io error occurred while opening a file for writing: '{}'",
|
||||
err
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl StdError for OpenWriteError {
|
||||
fn description(&self) -> &str {
|
||||
"error occurred while opening a file for writing"
|
||||
}
|
||||
|
||||
fn cause(&self) -> Option<&dyn StdError> {
|
||||
match *self {
|
||||
OpenWriteError::FileAlreadyExists(_) => None,
|
||||
OpenWriteError::IOError(ref err) => Some(err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Type of index incompatibility between the library and the index found on disk
|
||||
@@ -117,41 +217,55 @@ impl fmt::Debug for Incompatibility {
|
||||
}
|
||||
|
||||
/// Error that may occur when accessing a file read
|
||||
#[derive(Debug, Error)]
|
||||
#[derive(Debug)]
|
||||
pub enum OpenReadError {
|
||||
/// The file does not exists.
|
||||
#[error("Files does not exists: {0:?}")]
|
||||
FileDoesNotExist(PathBuf),
|
||||
/// Any kind of io::Error.
|
||||
#[error(
|
||||
"IOError: '{io_error:?}' happened while opening the following file for Read: {filepath}."
|
||||
)]
|
||||
IOError {
|
||||
/// The underlying `io::Error`.
|
||||
io_error: io::Error,
|
||||
/// File path of the file that tantivy failed to open for read.
|
||||
filepath: PathBuf,
|
||||
},
|
||||
/// This library does not support the index version found in file footer.
|
||||
#[error("Index version unsupported: {0:?}")]
|
||||
IncompatibleIndex(Incompatibility),
|
||||
}
|
||||
|
||||
/// Error that may occur when trying to delete a file
|
||||
#[derive(Debug, Error)]
|
||||
pub enum DeleteError {
|
||||
/// The file does not exists.
|
||||
#[error("File does not exists: '{0}'.")]
|
||||
FileDoesNotExist(PathBuf),
|
||||
/// Any kind of IO error that happens when
|
||||
/// interacting with the underlying IO device.
|
||||
#[error("The following IO error happened while deleting file '{filepath}': '{io_error:?}'.")]
|
||||
IOError {
|
||||
/// The underlying `io::Error`.
|
||||
io_error: io::Error,
|
||||
/// File path of the file that tantivy failed to delete.
|
||||
filepath: PathBuf,
|
||||
},
|
||||
IOError(IOError),
|
||||
/// This library doesn't support the index version found on disk
|
||||
IncompatibleIndex(Incompatibility),
|
||||
}
|
||||
|
||||
impl From<IOError> for OpenReadError {
|
||||
fn from(err: IOError) -> OpenReadError {
|
||||
OpenReadError::IOError(err)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for OpenReadError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match *self {
|
||||
OpenReadError::FileDoesNotExist(ref path) => {
|
||||
write!(f, "the file '{:?}' does not exist", path)
|
||||
}
|
||||
OpenReadError::IOError(ref err) => write!(
|
||||
f,
|
||||
"an io error occurred while opening a file for reading: '{}'",
|
||||
err
|
||||
),
|
||||
OpenReadError::IncompatibleIndex(ref footer) => {
|
||||
write!(f, "Incompatible index format: {:?}", footer)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Error that may occur when trying to delete a file
|
||||
#[derive(Debug)]
|
||||
pub enum DeleteError {
|
||||
/// The file does not exists.
|
||||
FileDoesNotExist(PathBuf),
|
||||
/// Any kind of IO error that happens when
|
||||
/// interacting with the underlying IO device.
|
||||
IOError(IOError),
|
||||
}
|
||||
|
||||
impl From<IOError> for DeleteError {
|
||||
fn from(err: IOError) -> DeleteError {
|
||||
DeleteError::IOError(err)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Incompatibility> for OpenReadError {
|
||||
@@ -159,3 +273,29 @@ impl From<Incompatibility> for OpenReadError {
|
||||
OpenReadError::IncompatibleIndex(incompatibility)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for DeleteError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match *self {
|
||||
DeleteError::FileDoesNotExist(ref path) => {
|
||||
write!(f, "the file '{:?}' does not exist", path)
|
||||
}
|
||||
DeleteError::IOError(ref err) => {
|
||||
write!(f, "an io error occurred while deleting a file: '{}'", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl StdError for DeleteError {
|
||||
fn description(&self) -> &str {
|
||||
"error occurred while deleting a file"
|
||||
}
|
||||
|
||||
fn cause(&self) -> Option<&dyn StdError> {
|
||||
match *self {
|
||||
DeleteError::FileDoesNotExist(_) => None,
|
||||
DeleteError::IOError(ref err) => Some(err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use crate::core::{MANAGED_FILEPATH, META_FILEPATH};
|
||||
use crate::directory::error::{DeleteError, LockError, OpenReadError, OpenWriteError};
|
||||
use crate::directory::error::{DeleteError, IOError, LockError, OpenReadError, OpenWriteError};
|
||||
use crate::directory::footer::{Footer, FooterProxy};
|
||||
use crate::directory::DirectoryLock;
|
||||
use crate::directory::GarbageCollectionResult;
|
||||
@@ -11,9 +11,9 @@ use crate::error::DataCorruption;
|
||||
use crate::Directory;
|
||||
|
||||
use crc32fast::Hasher;
|
||||
use slog::{debug, error, info};
|
||||
use std::collections::HashSet;
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::result;
|
||||
use std::sync::RwLockWriteGuard;
|
||||
@@ -56,9 +56,9 @@ fn save_managed_paths(
|
||||
directory: &mut dyn Directory,
|
||||
wlock: &RwLockWriteGuard<'_, MetaInformation>,
|
||||
) -> io::Result<()> {
|
||||
let mut managed_json = serde_json::to_string_pretty(&wlock.managed_paths)?;
|
||||
managed_json.push_str("\n");
|
||||
directory.atomic_write(&MANAGED_FILEPATH, managed_json.as_bytes())?;
|
||||
let mut w = serde_json::to_vec(&wlock.managed_paths)?;
|
||||
writeln!(&mut w)?;
|
||||
directory.atomic_write(&MANAGED_FILEPATH, &w[..])?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -86,12 +86,7 @@ impl ManagedDirectory {
|
||||
directory: Box::new(directory),
|
||||
meta_informations: Arc::default(),
|
||||
}),
|
||||
Err(OpenReadError::IOError { io_error, filepath }) => {
|
||||
Err(crate::TantivyError::OpenReadError(OpenReadError::IOError {
|
||||
io_error,
|
||||
filepath,
|
||||
}))
|
||||
}
|
||||
Err(OpenReadError::IOError(e)) => Err(From::from(e)),
|
||||
Err(OpenReadError::IncompatibleIndex(incompatibility)) => {
|
||||
// For the moment, this should never happen `meta.json`
|
||||
// do not have any footer and cannot detect incompatibility.
|
||||
@@ -118,7 +113,7 @@ impl ManagedDirectory {
|
||||
&mut self,
|
||||
get_living_files: L,
|
||||
) -> crate::Result<GarbageCollectionResult> {
|
||||
info!(self.directory.logger(), "gc"; "stage"=>"start");
|
||||
info!("Garbage collect");
|
||||
let mut files_to_delete = vec![];
|
||||
|
||||
// It is crucial to get the living files after acquiring the
|
||||
@@ -153,7 +148,7 @@ impl ManagedDirectory {
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
error!(self.logger(), "Failed to acquire lock for GC");
|
||||
error!("Failed to acquire lock for GC");
|
||||
return Err(crate::TantivyError::from(err));
|
||||
}
|
||||
}
|
||||
@@ -165,7 +160,7 @@ impl ManagedDirectory {
|
||||
for file_to_delete in files_to_delete {
|
||||
match self.delete(&file_to_delete) {
|
||||
Ok(_) => {
|
||||
debug!(self.logger(), "deleted-success"; "file"=>format!("{:?}", file_to_delete));
|
||||
info!("Deleted {:?}", file_to_delete);
|
||||
deleted_files.push(file_to_delete);
|
||||
}
|
||||
Err(file_error) => {
|
||||
@@ -173,12 +168,12 @@ impl ManagedDirectory {
|
||||
DeleteError::FileDoesNotExist(_) => {
|
||||
deleted_files.push(file_to_delete.clone());
|
||||
}
|
||||
DeleteError::IOError { .. } => {
|
||||
DeleteError::IOError(_) => {
|
||||
failed_to_delete_files.push(file_to_delete.clone());
|
||||
if !cfg!(target_os = "windows") {
|
||||
// On windows, delete is expected to fail if the file
|
||||
// is mmapped.
|
||||
error!(self.logger(), "delete-file-fail"; "path"=>file_to_delete.to_str().unwrap_or("<invalid-utf8>"));
|
||||
error!("Failed to delete {:?}", file_to_delete);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -200,10 +195,6 @@ impl ManagedDirectory {
|
||||
save_managed_paths(self.directory.as_mut(), &meta_informations_wlock)?;
|
||||
}
|
||||
|
||||
info!(self.directory.logger(), "gc"; "stage"=>"end",
|
||||
"num-sucess-file-deletes"=>deleted_files.len(),
|
||||
"num-failed-file-deletes"=>failed_to_delete_files.len());
|
||||
|
||||
Ok(GarbageCollectionResult {
|
||||
deleted_files,
|
||||
failed_to_delete_files,
|
||||
@@ -240,11 +231,8 @@ impl ManagedDirectory {
|
||||
/// Verify checksum of a managed file
|
||||
pub fn validate_checksum(&self, path: &Path) -> result::Result<bool, OpenReadError> {
|
||||
let reader = self.directory.open_read(path)?;
|
||||
let (footer, data) =
|
||||
Footer::extract_footer(reader).map_err(|io_error| OpenReadError::IOError {
|
||||
io_error,
|
||||
filepath: path.to_path_buf(),
|
||||
})?;
|
||||
let (footer, data) = Footer::extract_footer(reader)
|
||||
.map_err(|err| IOError::with_path(path.to_path_buf(), err))?;
|
||||
let mut hasher = Hasher::new();
|
||||
hasher.update(data.as_slice());
|
||||
let crc = hasher.finalize();
|
||||
@@ -257,6 +245,7 @@ impl ManagedDirectory {
|
||||
|
||||
/// List files for which checksum does not match content
|
||||
pub fn list_damaged(&self) -> result::Result<HashSet<PathBuf>, OpenReadError> {
|
||||
let mut hashset = HashSet::new();
|
||||
let mut managed_paths = self
|
||||
.meta_informations
|
||||
.read()
|
||||
@@ -266,37 +255,27 @@ impl ManagedDirectory {
|
||||
|
||||
managed_paths.remove(*META_FILEPATH);
|
||||
|
||||
let mut damaged_files = HashSet::new();
|
||||
for path in managed_paths {
|
||||
for path in managed_paths.into_iter() {
|
||||
if !self.validate_checksum(&path)? {
|
||||
damaged_files.insert(path);
|
||||
hashset.insert(path);
|
||||
}
|
||||
}
|
||||
Ok(damaged_files)
|
||||
Ok(hashset)
|
||||
}
|
||||
}
|
||||
|
||||
impl Directory for ManagedDirectory {
|
||||
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> {
|
||||
slog::debug!(self.logger(), "open-read"; "path" => path.to_str().unwrap_or("<invalid-utf8>"));
|
||||
let read_only_source = self.directory.open_read(path)?;
|
||||
let (footer, reader) = Footer::extract_footer(read_only_source).map_err(|io_error| {
|
||||
OpenReadError::IOError {
|
||||
io_error,
|
||||
filepath: path.to_path_buf(),
|
||||
}
|
||||
})?;
|
||||
let (footer, reader) = Footer::extract_footer(read_only_source)
|
||||
.map_err(|err| IOError::with_path(path.to_path_buf(), err))?;
|
||||
footer.is_compatible()?;
|
||||
Ok(reader)
|
||||
}
|
||||
|
||||
fn open_write(&mut self, path: &Path) -> result::Result<WritePtr, OpenWriteError> {
|
||||
slog::debug!(self.logger(), "open-write"; "path" => path.to_str().unwrap_or("<invalid-utf8>"));
|
||||
self.register_file_as_managed(path)
|
||||
.map_err(|io_error| OpenWriteError::IOError {
|
||||
io_error,
|
||||
filepath: path.to_path_buf(),
|
||||
})?;
|
||||
.map_err(|e| IOError::with_path(path.to_owned(), e))?;
|
||||
Ok(io::BufWriter::new(Box::new(FooterProxy::new(
|
||||
self.directory
|
||||
.open_write(path)?
|
||||
@@ -306,11 +285,9 @@ impl Directory for ManagedDirectory {
|
||||
))))
|
||||
}
|
||||
|
||||
fn atomic_write(&mut self, path: &Path, content: &[u8]) -> io::Result<()> {
|
||||
let content_str = std::str::from_utf8(content).unwrap_or("<content-not-utf-8>");
|
||||
slog::debug!(self.logger(), "Atomic write"; "path" => format!("{:?}", path), "content_length"=>content_str);
|
||||
fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> {
|
||||
self.register_file_as_managed(path)?;
|
||||
self.directory.atomic_write(path, content)
|
||||
self.directory.atomic_write(path, data)
|
||||
}
|
||||
|
||||
fn atomic_read(&self, path: &Path) -> result::Result<Vec<u8>, OpenReadError> {
|
||||
@@ -332,10 +309,6 @@ impl Directory for ManagedDirectory {
|
||||
fn watch(&self, watch_callback: WatchCallback) -> crate::Result<WatchHandle> {
|
||||
self.directory.watch(watch_callback)
|
||||
}
|
||||
|
||||
fn logger(&self) -> &slog::Logger {
|
||||
self.directory.logger()
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for ManagedDirectory {
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
use crate::core::META_FILEPATH;
|
||||
use crate::directory::error::LockError;
|
||||
use crate::directory::error::{DeleteError, OpenDirectoryError, OpenReadError, OpenWriteError};
|
||||
use crate::directory::error::{
|
||||
DeleteError, IOError, OpenDirectoryError, OpenReadError, OpenWriteError,
|
||||
};
|
||||
use crate::directory::read_only_source::BoxedData;
|
||||
use crate::directory::AntiCallToken;
|
||||
use crate::directory::Directory;
|
||||
@@ -17,8 +19,6 @@ use notify::RawEvent;
|
||||
use notify::RecursiveMode;
|
||||
use notify::Watcher;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use slog::{debug, o, Drain, Logger};
|
||||
use slog_stdlog::StdLog;
|
||||
use std::collections::HashMap;
|
||||
use std::convert::From;
|
||||
use std::fmt;
|
||||
@@ -36,6 +36,11 @@ use std::sync::Weak;
|
||||
use std::thread;
|
||||
use tempfile::TempDir;
|
||||
|
||||
/// Create a default io error given a string.
|
||||
pub(crate) fn make_io_err(msg: String) -> io::Error {
|
||||
io::Error::new(io::ErrorKind::Other, msg)
|
||||
}
|
||||
|
||||
/// Returns None iff the file exists, can be read, but is empty (and hence
|
||||
/// cannot be mmapped)
|
||||
fn open_mmap(full_path: &Path) -> result::Result<Option<Mmap>, OpenReadError> {
|
||||
@@ -43,17 +48,13 @@ fn open_mmap(full_path: &Path) -> result::Result<Option<Mmap>, OpenReadError> {
|
||||
if e.kind() == io::ErrorKind::NotFound {
|
||||
OpenReadError::FileDoesNotExist(full_path.to_owned())
|
||||
} else {
|
||||
OpenReadError::IOError {
|
||||
io_error: e,
|
||||
filepath: full_path.to_owned(),
|
||||
}
|
||||
OpenReadError::IOError(IOError::with_path(full_path.to_owned(), e))
|
||||
}
|
||||
})?;
|
||||
|
||||
let meta_data = file.metadata().map_err(|e| OpenReadError::IOError {
|
||||
io_error: e,
|
||||
filepath: full_path.to_owned(),
|
||||
})?;
|
||||
let meta_data = file
|
||||
.metadata()
|
||||
.map_err(|e| IOError::with_path(full_path.to_owned(), e))?;
|
||||
if meta_data.len() == 0 {
|
||||
// if the file size is 0, it will not be possible
|
||||
// to mmap the file, so we return None
|
||||
@@ -63,10 +64,7 @@ fn open_mmap(full_path: &Path) -> result::Result<Option<Mmap>, OpenReadError> {
|
||||
unsafe {
|
||||
memmap::Mmap::map(&file)
|
||||
.map(Some)
|
||||
.map_err(|e| OpenReadError::IOError {
|
||||
io_error: e,
|
||||
filepath: full_path.to_owned(),
|
||||
})
|
||||
.map_err(|e| From::from(IOError::with_path(full_path.to_owned(), e)))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -146,7 +144,7 @@ struct WatcherWrapper {
|
||||
}
|
||||
|
||||
impl WatcherWrapper {
|
||||
pub(crate) fn new(path: &Path, logger: Logger) -> Result<Self, OpenDirectoryError> {
|
||||
pub fn new(path: &Path) -> Result<Self, OpenDirectoryError> {
|
||||
let (tx, watcher_recv): (Sender<RawEvent>, Receiver<RawEvent>) = channel();
|
||||
// We need to initialize the
|
||||
let watcher = notify::raw_watcher(tx)
|
||||
@@ -160,8 +158,7 @@ impl WatcherWrapper {
|
||||
panic!("Unknown error while starting watching directory {:?}", path);
|
||||
}
|
||||
})?;
|
||||
let watcher_router: Arc<WatchCallbackList> =
|
||||
Arc::new(WatchCallbackList::with_logger(logger));
|
||||
let watcher_router: Arc<WatchCallbackList> = Default::default();
|
||||
let watcher_router_clone = watcher_router.clone();
|
||||
thread::Builder::new()
|
||||
.name("meta-file-watch-thread".to_string())
|
||||
@@ -186,10 +183,6 @@ impl WatcherWrapper {
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.map_err(|io_error| OpenDirectoryError::IoError {
|
||||
io_error,
|
||||
directory_path: path.to_path_buf(),
|
||||
})?;
|
||||
Ok(WatcherWrapper {
|
||||
_watcher: Mutex::new(watcher),
|
||||
@@ -224,21 +217,15 @@ struct MmapDirectoryInner {
|
||||
mmap_cache: RwLock<MmapCache>,
|
||||
_temp_directory: Option<TempDir>,
|
||||
watcher: RwLock<Option<WatcherWrapper>>,
|
||||
logger: Logger,
|
||||
}
|
||||
|
||||
impl MmapDirectoryInner {
|
||||
fn new(
|
||||
root_path: PathBuf,
|
||||
temp_directory: Option<TempDir>,
|
||||
logger: Logger,
|
||||
) -> MmapDirectoryInner {
|
||||
fn new(root_path: PathBuf, temp_directory: Option<TempDir>) -> MmapDirectoryInner {
|
||||
MmapDirectoryInner {
|
||||
root_path,
|
||||
mmap_cache: Default::default(),
|
||||
_temp_directory: temp_directory,
|
||||
watcher: RwLock::new(None),
|
||||
logger,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -250,7 +237,7 @@ impl MmapDirectoryInner {
|
||||
// The downside is that we might create a watch wrapper that is not useful.
|
||||
let need_initialization = self.watcher.read().unwrap().is_none();
|
||||
if need_initialization {
|
||||
let watch_wrapper = WatcherWrapper::new(&self.root_path, self.logger.clone())?;
|
||||
let watch_wrapper = WatcherWrapper::new(&self.root_path)?;
|
||||
let mut watch_wlock = self.watcher.write().unwrap();
|
||||
// the watcher could have been initialized when we released the lock, and
|
||||
// we do not want to lose the watched files that were set.
|
||||
@@ -273,8 +260,8 @@ impl fmt::Debug for MmapDirectory {
|
||||
}
|
||||
|
||||
impl MmapDirectory {
|
||||
fn new(root_path: PathBuf, temp_directory: Option<TempDir>, logger: Logger) -> MmapDirectory {
|
||||
let inner = MmapDirectoryInner::new(root_path, temp_directory, logger);
|
||||
fn new(root_path: PathBuf, temp_directory: Option<TempDir>) -> MmapDirectory {
|
||||
let inner = MmapDirectoryInner::new(root_path, temp_directory);
|
||||
MmapDirectory {
|
||||
inner: Arc::new(inner),
|
||||
}
|
||||
@@ -285,19 +272,16 @@ impl MmapDirectory {
|
||||
/// This is mostly useful to test the MmapDirectory itself.
|
||||
/// For your unit tests, prefer the RAMDirectory.
|
||||
pub fn create_from_tempdir() -> Result<MmapDirectory, OpenDirectoryError> {
|
||||
let tempdir = TempDir::new().map_err(OpenDirectoryError::FailedToCreateTempDir)?;
|
||||
let logger = Logger::root(StdLog.fuse(), o!());
|
||||
Ok(MmapDirectory::new(tempdir.path().to_owned(), Some(tempdir), logger))
|
||||
let tempdir = TempDir::new().map_err(OpenDirectoryError::IoError)?;
|
||||
let tempdir_path = PathBuf::from(tempdir.path());
|
||||
Ok(MmapDirectory::new(tempdir_path, Some(tempdir)))
|
||||
}
|
||||
|
||||
/// Opens a MmapDirectory in a directory.
|
||||
///
|
||||
/// Returns an error if the `directory_path` does not
|
||||
/// exist or if it is not a directory.
|
||||
pub fn open_with_logger<P: AsRef<Path>>(
|
||||
directory_path: P,
|
||||
logger: Logger,
|
||||
) -> Result<MmapDirectory, OpenDirectoryError> {
|
||||
pub fn open<P: AsRef<Path>>(directory_path: P) -> Result<MmapDirectory, OpenDirectoryError> {
|
||||
let directory_path: &Path = directory_path.as_ref();
|
||||
if !directory_path.exists() {
|
||||
Err(OpenDirectoryError::DoesNotExist(PathBuf::from(
|
||||
@@ -308,20 +292,10 @@ impl MmapDirectory {
|
||||
directory_path,
|
||||
)))
|
||||
} else {
|
||||
Ok(MmapDirectory::new(
|
||||
PathBuf::from(directory_path),
|
||||
None,
|
||||
logger,
|
||||
))
|
||||
Ok(MmapDirectory::new(PathBuf::from(directory_path), None))
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates an `MmapDirectory` at the given path.
|
||||
pub fn open<P: AsRef<Path>>(directory_path: P) -> Result<MmapDirectory, OpenDirectoryError> {
|
||||
let logger = Logger::root(StdLog.fuse(), o!());
|
||||
Self::open_with_logger(directory_path, logger)
|
||||
}
|
||||
|
||||
/// Joins a relative_path to the directory `root_path`
|
||||
/// to create a proper complete `filepath`.
|
||||
fn resolve_path(&self, relative_path: &Path) -> PathBuf {
|
||||
@@ -381,12 +355,11 @@ impl MmapDirectory {
|
||||
struct ReleaseLockFile {
|
||||
_file: File,
|
||||
path: PathBuf,
|
||||
logger: Logger,
|
||||
}
|
||||
|
||||
impl Drop for ReleaseLockFile {
|
||||
fn drop(&mut self) {
|
||||
debug!(self.logger, "Releasing lock {:?}", self.path);
|
||||
debug!("Releasing lock {:?}", self.path);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -425,18 +398,16 @@ impl TerminatingWrite for SafeFileWriter {
|
||||
|
||||
impl Directory for MmapDirectory {
|
||||
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> {
|
||||
debug!("Open Read {:?}", path);
|
||||
let full_path = self.resolve_path(path);
|
||||
|
||||
let mut mmap_cache = self.inner.mmap_cache.write().map_err(|_| {
|
||||
let msg = format!(
|
||||
"Failed to acquired write lock \
|
||||
on mmap cache while reading {:?}",
|
||||
path
|
||||
);
|
||||
let io_error = io::Error::new(io::ErrorKind::Other, msg);
|
||||
OpenReadError::IOError {
|
||||
io_error,
|
||||
filepath: path.to_owned(),
|
||||
}
|
||||
IOError::with_path(path.to_owned(), make_io_err(msg))
|
||||
})?;
|
||||
Ok(mmap_cache
|
||||
.get_mmap(&full_path)?
|
||||
@@ -449,18 +420,14 @@ impl Directory for MmapDirectory {
|
||||
fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
|
||||
let full_path = self.resolve_path(path);
|
||||
match fs::remove_file(&full_path) {
|
||||
Ok(_) => self.sync_directory().map_err(|e| DeleteError::IOError {
|
||||
io_error: e,
|
||||
filepath: path.to_path_buf(),
|
||||
}),
|
||||
Ok(_) => self
|
||||
.sync_directory()
|
||||
.map_err(|e| IOError::with_path(path.to_owned(), e).into()),
|
||||
Err(e) => {
|
||||
if e.kind() == io::ErrorKind::NotFound {
|
||||
Err(DeleteError::FileDoesNotExist(path.to_owned()))
|
||||
} else {
|
||||
Err(DeleteError::IOError {
|
||||
io_error: e,
|
||||
filepath: path.to_path_buf(),
|
||||
})
|
||||
Err(IOError::with_path(path.to_owned(), e).into())
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -472,7 +439,9 @@ impl Directory for MmapDirectory {
|
||||
}
|
||||
|
||||
fn open_write(&mut self, path: &Path) -> Result<WritePtr, OpenWriteError> {
|
||||
debug!("Open Write {:?}", path);
|
||||
let full_path = self.resolve_path(path);
|
||||
|
||||
let open_res = OpenOptions::new()
|
||||
.write(true)
|
||||
.create_new(true)
|
||||
@@ -482,25 +451,18 @@ impl Directory for MmapDirectory {
|
||||
if err.kind() == io::ErrorKind::AlreadyExists {
|
||||
OpenWriteError::FileAlreadyExists(path.to_owned())
|
||||
} else {
|
||||
OpenWriteError::IOError {
|
||||
io_error: err,
|
||||
filepath: path.to_owned(),
|
||||
}
|
||||
IOError::with_path(path.to_owned(), err).into()
|
||||
}
|
||||
})?;
|
||||
|
||||
// making sure the file is created.
|
||||
file.flush().map_err(|io_error| OpenWriteError::IOError {
|
||||
io_error,
|
||||
filepath: path.to_owned(),
|
||||
})?;
|
||||
file.flush()
|
||||
.map_err(|e| IOError::with_path(path.to_owned(), e))?;
|
||||
|
||||
// Apparetntly, on some filesystem syncing the parent
|
||||
// directory is required.
|
||||
self.sync_directory().map_err(|e| OpenWriteError::IOError {
|
||||
io_error: e,
|
||||
filepath: path.to_owned(),
|
||||
})?;
|
||||
self.sync_directory()
|
||||
.map_err(|e| IOError::with_path(path.to_owned(), e))?;
|
||||
|
||||
let writer = SafeFileWriter::new(file);
|
||||
Ok(BufWriter::new(Box::new(writer)))
|
||||
@@ -512,26 +474,21 @@ impl Directory for MmapDirectory {
|
||||
match File::open(&full_path) {
|
||||
Ok(mut file) => {
|
||||
file.read_to_end(&mut buffer)
|
||||
.map_err(|io_error| OpenReadError::IOError {
|
||||
io_error,
|
||||
filepath: path.to_owned(),
|
||||
})?;
|
||||
.map_err(|e| IOError::with_path(path.to_owned(), e))?;
|
||||
Ok(buffer)
|
||||
}
|
||||
Err(io_error) => {
|
||||
if io_error.kind() == io::ErrorKind::NotFound {
|
||||
Err(e) => {
|
||||
if e.kind() == io::ErrorKind::NotFound {
|
||||
Err(OpenReadError::FileDoesNotExist(path.to_owned()))
|
||||
} else {
|
||||
Err(OpenReadError::IOError {
|
||||
io_error,
|
||||
filepath: path.to_owned(),
|
||||
})
|
||||
Err(IOError::with_path(path.to_owned(), e).into())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn atomic_write(&mut self, path: &Path, content: &[u8]) -> io::Result<()> {
|
||||
debug!("Atomic Write {:?}", path);
|
||||
let mut tempfile = tempfile::Builder::new().tempfile_in(&self.inner.root_path)?;
|
||||
tempfile.write_all(content)?;
|
||||
tempfile.flush()?;
|
||||
@@ -553,22 +510,16 @@ impl Directory for MmapDirectory {
|
||||
} else {
|
||||
file.try_lock_exclusive().map_err(|_| LockError::LockBusy)?
|
||||
}
|
||||
let logger = self.inner.logger.clone();
|
||||
// dropping the file handle will release the lock.
|
||||
Ok(DirectoryLock::from(Box::new(ReleaseLockFile {
|
||||
path: lock.filepath.clone(),
|
||||
_file: file,
|
||||
logger,
|
||||
})))
|
||||
}
|
||||
|
||||
fn watch(&self, watch_callback: WatchCallback) -> crate::Result<WatchHandle> {
|
||||
self.inner.watch(watch_callback)
|
||||
}
|
||||
|
||||
fn logger(&self) -> &Logger {
|
||||
&self.inner.logger
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -678,8 +629,7 @@ mod tests {
|
||||
let counter_clone = counter.clone();
|
||||
let tmp_dir = tempfile::TempDir::new().unwrap();
|
||||
let tmp_dirpath = tmp_dir.path().to_owned();
|
||||
let logger = Logger::root(slog::Discard, o!());
|
||||
let mut watch_wrapper = WatcherWrapper::new(&tmp_dirpath, logger).unwrap();
|
||||
let mut watch_wrapper = WatcherWrapper::new(&tmp_dirpath).unwrap();
|
||||
let tmp_file = tmp_dirpath.join(*META_FILEPATH);
|
||||
let _handle = watch_wrapper.watch(Box::new(move || {
|
||||
counter_clone.fetch_add(1, Ordering::SeqCst);
|
||||
|
||||
@@ -23,8 +23,7 @@ pub use self::directory::{Directory, DirectoryClone};
|
||||
pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK};
|
||||
pub use self::ram_directory::RAMDirectory;
|
||||
pub use self::read_only_source::ReadOnlySource;
|
||||
pub(crate) use self::watch_event_router::WatchCallbackList;
|
||||
pub use self::watch_event_router::{WatchCallback, WatchHandle};
|
||||
pub use self::watch_event_router::{WatchCallback, WatchCallbackList, WatchHandle};
|
||||
use std::io::{self, BufWriter, Write};
|
||||
use std::path::PathBuf;
|
||||
/// Outcome of the Garbage collection
|
||||
|
||||
@@ -5,8 +5,6 @@ use crate::directory::WatchCallbackList;
|
||||
use crate::directory::{Directory, ReadOnlySource, WatchCallback, WatchHandle};
|
||||
use crate::directory::{TerminatingWrite, WritePtr};
|
||||
use fail::fail_point;
|
||||
use slog::{o, Drain, Logger};
|
||||
use slog_stdlog::StdLog;
|
||||
use std::collections::HashMap;
|
||||
use std::fmt;
|
||||
use std::io::{self, BufWriter, Cursor, Seek, SeekFrom, Write};
|
||||
@@ -68,7 +66,7 @@ impl Write for VecWriter {
|
||||
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
self.is_flushed = true;
|
||||
let mut fs = self.shared_directory.fs.inner_directory.write().unwrap();
|
||||
let mut fs = self.shared_directory.fs.write().unwrap();
|
||||
fs.write(self.path.clone(), self.data.get_ref());
|
||||
Ok(())
|
||||
}
|
||||
@@ -80,19 +78,13 @@ impl TerminatingWrite for VecWriter {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct InnerDirectory {
|
||||
fs: HashMap<PathBuf, ReadOnlySource>,
|
||||
watch_router: WatchCallbackList,
|
||||
}
|
||||
|
||||
impl InnerDirectory {
|
||||
fn with_logger(logger: Logger) -> Self {
|
||||
InnerDirectory {
|
||||
fs: Default::default(),
|
||||
watch_router: WatchCallbackList::with_logger(logger.clone()),
|
||||
}
|
||||
}
|
||||
|
||||
fn write(&mut self, path: PathBuf, data: &[u8]) -> bool {
|
||||
let data = ReadOnlySource::new(Vec::from(data));
|
||||
self.fs.insert(path, data).is_some()
|
||||
@@ -125,32 +117,20 @@ impl InnerDirectory {
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for RAMDirectory {
|
||||
fn default() -> RAMDirectory {
|
||||
let logger = Logger::root(StdLog.fuse(), o!());
|
||||
Self::with_logger(logger)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for RAMDirectory {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "RAMDirectory")
|
||||
}
|
||||
}
|
||||
|
||||
struct Inner {
|
||||
inner_directory: RwLock<InnerDirectory>,
|
||||
logger: Logger,
|
||||
}
|
||||
|
||||
/// A Directory storing everything in anonymous memory.
|
||||
///
|
||||
/// It is mainly meant for unit testing.
|
||||
/// Writes are only made visible upon flushing.
|
||||
///
|
||||
#[derive(Clone)]
|
||||
#[derive(Clone, Default)]
|
||||
pub struct RAMDirectory {
|
||||
fs: Arc<Inner>,
|
||||
fs: Arc<RwLock<InnerDirectory>>,
|
||||
}
|
||||
|
||||
impl RAMDirectory {
|
||||
@@ -159,21 +139,10 @@ impl RAMDirectory {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
/// Create a `RAMDirectory` with a custom logger.
|
||||
pub fn with_logger(logger: Logger) -> RAMDirectory {
|
||||
let inner_directory = InnerDirectory::with_logger(logger.clone()).into();
|
||||
RAMDirectory {
|
||||
fs: Arc::new(Inner {
|
||||
inner_directory,
|
||||
logger,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the sum of the size of the different files
|
||||
/// in the RAMDirectory.
|
||||
pub fn total_mem_usage(&self) -> usize {
|
||||
self.fs.inner_directory.read().unwrap().total_mem_usage()
|
||||
self.fs.read().unwrap().total_mem_usage()
|
||||
}
|
||||
|
||||
/// Write a copy of all of the files saved in the RAMDirectory in the target `Directory`.
|
||||
@@ -183,7 +152,7 @@ impl RAMDirectory {
|
||||
///
|
||||
/// If an error is encounterred, files may be persisted partially.
|
||||
pub fn persist(&self, dest: &mut dyn Directory) -> crate::Result<()> {
|
||||
let wlock = self.fs.inner_directory.write().unwrap();
|
||||
let wlock = self.fs.write().unwrap();
|
||||
for (path, source) in wlock.fs.iter() {
|
||||
let mut dest_wrt = dest.open_write(path)?;
|
||||
dest_wrt.write_all(source.as_slice())?;
|
||||
@@ -195,25 +164,24 @@ impl RAMDirectory {
|
||||
|
||||
impl Directory for RAMDirectory {
|
||||
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> {
|
||||
self.fs.inner_directory.read().unwrap().open_read(path)
|
||||
self.fs.read().unwrap().open_read(path)
|
||||
}
|
||||
|
||||
fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
|
||||
fail_point!("RAMDirectory::delete", |_| {
|
||||
Err(DeleteError::IOError {
|
||||
io_error: io::Error::from(io::ErrorKind::Other),
|
||||
filepath: path.to_path_buf(),
|
||||
})
|
||||
use crate::directory::error::IOError;
|
||||
let io_error = IOError::from(io::Error::from(io::ErrorKind::Other));
|
||||
Err(DeleteError::from(io_error))
|
||||
});
|
||||
self.fs.inner_directory.write().unwrap().delete(path)
|
||||
self.fs.write().unwrap().delete(path)
|
||||
}
|
||||
|
||||
fn exists(&self, path: &Path) -> bool {
|
||||
self.fs.inner_directory.read().unwrap().exists(path)
|
||||
self.fs.read().unwrap().exists(path)
|
||||
}
|
||||
|
||||
fn open_write(&mut self, path: &Path) -> Result<WritePtr, OpenWriteError> {
|
||||
let mut fs = self.fs.inner_directory.write().unwrap();
|
||||
let mut fs = self.fs.write().unwrap();
|
||||
let path_buf = PathBuf::from(path);
|
||||
let vec_writer = VecWriter::new(path_buf.clone(), self.clone());
|
||||
let exists = fs.write(path_buf.clone(), &[]);
|
||||
@@ -237,38 +205,19 @@ impl Directory for RAMDirectory {
|
||||
let path_buf = PathBuf::from(path);
|
||||
|
||||
// Reserve the path to prevent calls to .write() to succeed.
|
||||
self.fs
|
||||
.inner_directory
|
||||
.write()
|
||||
.unwrap()
|
||||
.write(path_buf.clone(), &[]);
|
||||
self.fs.write().unwrap().write(path_buf.clone(), &[]);
|
||||
|
||||
let mut vec_writer = VecWriter::new(path_buf, self.clone());
|
||||
vec_writer.write_all(data)?;
|
||||
vec_writer.flush()?;
|
||||
if path == Path::new(&*META_FILEPATH) {
|
||||
let _ = self
|
||||
.fs
|
||||
.inner_directory
|
||||
.write()
|
||||
.unwrap()
|
||||
.watch_router
|
||||
.broadcast();
|
||||
let _ = self.fs.write().unwrap().watch_router.broadcast();
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn watch(&self, watch_callback: WatchCallback) -> crate::Result<WatchHandle> {
|
||||
Ok(self
|
||||
.fs
|
||||
.inner_directory
|
||||
.write()
|
||||
.unwrap()
|
||||
.watch(watch_callback))
|
||||
}
|
||||
|
||||
fn logger(&self) -> &Logger {
|
||||
&self.fs.logger
|
||||
Ok(self.fs.write().unwrap().watch(watch_callback))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use futures::channel::oneshot;
|
||||
use futures::{Future, TryFutureExt};
|
||||
use slog::{error, Logger};
|
||||
use std::sync::Arc;
|
||||
use std::sync::RwLock;
|
||||
use std::sync::Weak;
|
||||
@@ -12,9 +11,9 @@ pub type WatchCallback = Box<dyn Fn() + Sync + Send>;
|
||||
///
|
||||
/// It registers callbacks (See `.subscribe(...)`) and
|
||||
/// calls them upon calls to `.broadcast(...)`.
|
||||
pub(crate) struct WatchCallbackList {
|
||||
#[derive(Default)]
|
||||
pub struct WatchCallbackList {
|
||||
router: RwLock<Vec<Weak<WatchCallback>>>,
|
||||
logger: Logger,
|
||||
}
|
||||
|
||||
/// Controls how long a directory should watch for a file change.
|
||||
@@ -33,13 +32,6 @@ impl WatchHandle {
|
||||
}
|
||||
|
||||
impl WatchCallbackList {
|
||||
pub fn with_logger(logger: Logger) -> Self {
|
||||
WatchCallbackList {
|
||||
logger,
|
||||
router: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Subscribes a new callback and returns a handle that controls the lifetime of the callback.
|
||||
pub fn subscribe(&self, watch_callback: WatchCallback) -> WatchHandle {
|
||||
let watch_callback_arc = Arc::new(watch_callback);
|
||||
@@ -82,8 +74,8 @@ impl WatchCallbackList {
|
||||
});
|
||||
if let Err(err) = spawn_res {
|
||||
error!(
|
||||
self.logger,
|
||||
"Failed to spawn thread to call watch callbacks. Cause: {:?}", err
|
||||
"Failed to spawn thread to call watch callbacks. Cause: {:?}",
|
||||
err
|
||||
);
|
||||
}
|
||||
result
|
||||
@@ -94,18 +86,13 @@ impl WatchCallbackList {
|
||||
mod tests {
|
||||
use crate::directory::WatchCallbackList;
|
||||
use futures::executor::block_on;
|
||||
use slog::{o, Discard, Logger};
|
||||
use std::mem;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
fn default_watch_callback_list() -> WatchCallbackList {
|
||||
WatchCallbackList::with_logger(Logger::root(Discard, o!()))
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_watch_event_router_simple() {
|
||||
let watch_event_router = default_watch_callback_list();
|
||||
let watch_event_router = WatchCallbackList::default();
|
||||
let counter: Arc<AtomicUsize> = Default::default();
|
||||
let counter_clone = counter.clone();
|
||||
let inc_callback = Box::new(move || {
|
||||
@@ -132,7 +119,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_watch_event_router_multiple_callback_same_key() {
|
||||
let watch_event_router = default_watch_callback_list();
|
||||
let watch_event_router = WatchCallbackList::default();
|
||||
let counter: Arc<AtomicUsize> = Default::default();
|
||||
let inc_callback = |inc: usize| {
|
||||
let counter_clone = counter.clone();
|
||||
@@ -161,7 +148,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_watch_event_router_multiple_callback_different_key() {
|
||||
let watch_event_router = default_watch_callback_list();
|
||||
let watch_event_router = WatchCallbackList::default();
|
||||
let counter: Arc<AtomicUsize> = Default::default();
|
||||
let counter_clone = counter.clone();
|
||||
let inc_callback = Box::new(move || {
|
||||
|
||||
98
src/error.rs
98
src/error.rs
@@ -2,13 +2,11 @@
|
||||
|
||||
use std::io;
|
||||
|
||||
use crate::directory::error::{IOError, OpenDirectoryError, OpenReadError, OpenWriteError};
|
||||
use crate::directory::error::{Incompatibility, LockError};
|
||||
use crate::fastfield::FastFieldNotAvailableError;
|
||||
use crate::query;
|
||||
use crate::{
|
||||
directory::error::{OpenDirectoryError, OpenReadError, OpenWriteError},
|
||||
schema,
|
||||
};
|
||||
use crate::schema;
|
||||
use std::fmt;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::PoisonError;
|
||||
@@ -45,47 +43,44 @@ impl fmt::Debug for DataCorruption {
|
||||
}
|
||||
}
|
||||
|
||||
/// The library's error enum
|
||||
#[derive(Debug, Error)]
|
||||
/// The library's failure based error enum
|
||||
#[derive(Debug, Fail)]
|
||||
pub enum TantivyError {
|
||||
/// Failed to open the directory.
|
||||
#[error("Failed to open the directory: '{0:?}'")]
|
||||
OpenDirectoryError(#[from] OpenDirectoryError),
|
||||
/// Failed to open a file for read.
|
||||
#[error("Failed to open file for read: '{0:?}'")]
|
||||
OpenReadError(#[from] OpenReadError),
|
||||
/// Failed to open a file for write.
|
||||
#[error("Failed to open file for write: '{0:?}'")]
|
||||
OpenWriteError(#[from] OpenWriteError),
|
||||
/// Path does not exist.
|
||||
#[fail(display = "Path does not exist: '{:?}'", _0)]
|
||||
PathDoesNotExist(PathBuf),
|
||||
/// File already exists, this is a problem when we try to write into a new file.
|
||||
#[fail(display = "File already exists: '{:?}'", _0)]
|
||||
FileAlreadyExists(PathBuf),
|
||||
/// Index already exists in this directory
|
||||
#[error("Index already exists")]
|
||||
#[fail(display = "Index already exists")]
|
||||
IndexAlreadyExists,
|
||||
/// Failed to acquire file lock
|
||||
#[error("Failed to acquire Lockfile: {0:?}. {1:?}")]
|
||||
#[fail(display = "Failed to acquire Lockfile: {:?}. {:?}", _0, _1)]
|
||||
LockFailure(LockError, Option<String>),
|
||||
/// IO Error.
|
||||
#[error("An IO error occurred: '{0}'")]
|
||||
IOError(#[from] io::Error),
|
||||
#[fail(display = "An IO error occurred: '{}'", _0)]
|
||||
IOError(#[cause] IOError),
|
||||
/// Data corruption.
|
||||
#[error("Data corrupted: '{0:?}'")]
|
||||
#[fail(display = "{:?}", _0)]
|
||||
DataCorruption(DataCorruption),
|
||||
/// A thread holding the locked panicked and poisoned the lock.
|
||||
#[error("A thread holding the locked panicked and poisoned the lock")]
|
||||
#[fail(display = "A thread holding the locked panicked and poisoned the lock")]
|
||||
Poisoned,
|
||||
/// Invalid argument was passed by the user.
|
||||
#[error("An invalid argument was passed: '{0}'")]
|
||||
#[fail(display = "An invalid argument was passed: '{}'", _0)]
|
||||
InvalidArgument(String),
|
||||
/// An Error happened in one of the thread.
|
||||
#[error("An error occurred in a thread: '{0}'")]
|
||||
#[fail(display = "An error occurred in a thread: '{}'", _0)]
|
||||
ErrorInThread(String),
|
||||
/// An Error appeared related to the schema.
|
||||
#[error("Schema error: '{0}'")]
|
||||
#[fail(display = "Schema error: '{}'", _0)]
|
||||
SchemaError(String),
|
||||
/// System error. (e.g.: We failed spawning a new thread)
|
||||
#[error("System error.'{0}'")]
|
||||
#[fail(display = "System error.'{}'", _0)]
|
||||
SystemError(String),
|
||||
/// Index incompatible with current version of tantivy
|
||||
#[error("{0:?}")]
|
||||
#[fail(display = "{:?}", _0)]
|
||||
IncompatibleIndex(Incompatibility),
|
||||
}
|
||||
|
||||
@@ -94,17 +89,31 @@ impl From<DataCorruption> for TantivyError {
|
||||
TantivyError::DataCorruption(data_corruption)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<FastFieldNotAvailableError> for TantivyError {
|
||||
fn from(fastfield_error: FastFieldNotAvailableError) -> TantivyError {
|
||||
TantivyError::SchemaError(format!("{}", fastfield_error))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<LockError> for TantivyError {
|
||||
fn from(lock_error: LockError) -> TantivyError {
|
||||
TantivyError::LockFailure(lock_error, None)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<IOError> for TantivyError {
|
||||
fn from(io_error: IOError) -> TantivyError {
|
||||
TantivyError::IOError(io_error)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<io::Error> for TantivyError {
|
||||
fn from(io_error: io::Error) -> TantivyError {
|
||||
TantivyError::IOError(io_error.into())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<query::QueryParserError> for TantivyError {
|
||||
fn from(parsing_error: query::QueryParserError) -> TantivyError {
|
||||
TantivyError::InvalidArgument(format!("Query is invalid. {:?}", parsing_error))
|
||||
@@ -117,12 +126,49 @@ impl<Guard> From<PoisonError<Guard>> for TantivyError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<OpenReadError> for TantivyError {
|
||||
fn from(error: OpenReadError) -> TantivyError {
|
||||
match error {
|
||||
OpenReadError::FileDoesNotExist(filepath) => TantivyError::PathDoesNotExist(filepath),
|
||||
OpenReadError::IOError(io_error) => TantivyError::IOError(io_error),
|
||||
OpenReadError::IncompatibleIndex(incompatibility) => {
|
||||
TantivyError::IncompatibleIndex(incompatibility)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<schema::DocParsingError> for TantivyError {
|
||||
fn from(error: schema::DocParsingError) -> TantivyError {
|
||||
TantivyError::InvalidArgument(format!("Failed to parse document {:?}", error))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<OpenWriteError> for TantivyError {
|
||||
fn from(error: OpenWriteError) -> TantivyError {
|
||||
match error {
|
||||
OpenWriteError::FileAlreadyExists(filepath) => {
|
||||
TantivyError::FileAlreadyExists(filepath)
|
||||
}
|
||||
OpenWriteError::IOError(io_error) => TantivyError::IOError(io_error),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<OpenDirectoryError> for TantivyError {
|
||||
fn from(error: OpenDirectoryError) -> TantivyError {
|
||||
match error {
|
||||
OpenDirectoryError::DoesNotExist(directory_path) => {
|
||||
TantivyError::PathDoesNotExist(directory_path)
|
||||
}
|
||||
OpenDirectoryError::NotADirectory(directory_path) => {
|
||||
TantivyError::InvalidArgument(format!("{:?} is not a directory", directory_path))
|
||||
}
|
||||
OpenDirectoryError::IoError(err) => TantivyError::IOError(IOError::from(err)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<serde_json::Error> for TantivyError {
|
||||
fn from(error: serde_json::Error) -> TantivyError {
|
||||
let io_err = io::Error::from(error);
|
||||
|
||||
@@ -4,8 +4,8 @@ use std::result;
|
||||
/// `FastFieldNotAvailableError` is returned when the
|
||||
/// user requested for a fast field reader, and the field was not
|
||||
/// defined in the schema as a fast field.
|
||||
#[derive(Debug, Error)]
|
||||
#[error("Fast field not available: '{field_name:?}'")]
|
||||
#[derive(Debug, Fail)]
|
||||
#[fail(display = "Fast field not available: '{:?}'", field_name)]
|
||||
pub struct FastFieldNotAvailableError {
|
||||
field_name: String,
|
||||
}
|
||||
|
||||
@@ -27,7 +27,6 @@ use crate::Opstamp;
|
||||
use crossbeam::channel;
|
||||
use futures::executor::block_on;
|
||||
use futures::future::Future;
|
||||
use slog::{error, info, Logger};
|
||||
use smallvec::smallvec;
|
||||
use smallvec::SmallVec;
|
||||
use std::mem;
|
||||
@@ -196,21 +195,20 @@ fn index_documents(
|
||||
grouped_document_iterator: &mut dyn Iterator<Item = OperationGroup>,
|
||||
segment_updater: &mut SegmentUpdater,
|
||||
mut delete_cursor: DeleteCursor,
|
||||
logger: &Logger,
|
||||
) -> crate::Result<bool> {
|
||||
let schema = segment.schema();
|
||||
|
||||
info!(logger, "segment-index"; "stage"=>"start");
|
||||
let mut segment_writer = SegmentWriter::for_segment(memory_budget, segment.clone(), &schema)?;
|
||||
let mut buffer_limit_reached = false;
|
||||
for document_group in grouped_document_iterator {
|
||||
for doc in document_group {
|
||||
segment_writer.add_document(doc, &schema)?;
|
||||
}
|
||||
let mem_usage = segment_writer.mem_usage();
|
||||
if mem_usage >= memory_budget - MARGIN_IN_BYTES {
|
||||
buffer_limit_reached = true;
|
||||
|
||||
info!(
|
||||
"Buffer limit reached, flushing segment with maxdoc={}.",
|
||||
segment_writer.max_doc()
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -230,14 +228,6 @@ fn index_documents(
|
||||
let segment_with_max_doc = segment.with_max_doc(max_doc);
|
||||
|
||||
let last_docstamp: Opstamp = *(doc_opstamps.last().unwrap());
|
||||
info!(
|
||||
logger,
|
||||
"segment-index";
|
||||
"stage" => "serialize",
|
||||
"cause" => if buffer_limit_reached { "buffer-limit" } else { "commit" },
|
||||
"maxdoc" => max_doc,
|
||||
"last_docstamp" => last_docstamp
|
||||
);
|
||||
|
||||
let delete_bitset_opt = apply_deletes(
|
||||
&segment_with_max_doc,
|
||||
@@ -251,18 +241,7 @@ fn index_documents(
|
||||
delete_cursor,
|
||||
delete_bitset_opt,
|
||||
);
|
||||
|
||||
info!(
|
||||
logger,
|
||||
"segment-index";
|
||||
"stage" => "publish",
|
||||
);
|
||||
block_on(segment_updater.schedule_add_segment(segment_entry))?;
|
||||
info!(
|
||||
logger,
|
||||
"segment-index";
|
||||
"stage" => "end",
|
||||
);
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
@@ -365,10 +344,6 @@ impl IndexWriter {
|
||||
Ok(index_writer)
|
||||
}
|
||||
|
||||
pub(crate) fn logger(&self) -> &Logger {
|
||||
self.index.logger()
|
||||
}
|
||||
|
||||
fn drop_sender(&mut self) {
|
||||
let (sender, _receiver) = channel::bounded(1);
|
||||
self.operation_sender = sender;
|
||||
@@ -377,8 +352,6 @@ impl IndexWriter {
|
||||
/// If there are some merging threads, blocks until they all finish their work and
|
||||
/// then drop the `IndexWriter`.
|
||||
pub fn wait_merging_threads(mut self) -> crate::Result<()> {
|
||||
info!(self.logger(), "wait-merge-threads"; "stage"=>"start");
|
||||
|
||||
// this will stop the indexing thread,
|
||||
// dropping the last reference to the segment_updater.
|
||||
self.drop_sender();
|
||||
@@ -399,9 +372,9 @@ impl IndexWriter {
|
||||
.map_err(|_| TantivyError::ErrorInThread("Failed to join merging thread.".into()));
|
||||
|
||||
if let Err(ref e) = result {
|
||||
error!(self.logger(), "some merge thread failed"; "cause"=>e.to_string());
|
||||
error!("Some merging thread failed {:?}", e);
|
||||
}
|
||||
info!(self.logger(), "wait-merge-threads"; "stage"=>"stop");
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
@@ -461,16 +434,12 @@ impl IndexWriter {
|
||||
return Ok(());
|
||||
}
|
||||
let segment = index.new_segment();
|
||||
let segment_id = segment.id();
|
||||
index_documents(
|
||||
mem_budget,
|
||||
segment,
|
||||
&mut document_iterator,
|
||||
&mut segment_updater,
|
||||
delete_cursor.clone(),
|
||||
&index
|
||||
.logger()
|
||||
.new(slog::o!("segment"=>segment_id.to_string())),
|
||||
)?;
|
||||
}
|
||||
})?;
|
||||
@@ -584,10 +553,7 @@ impl IndexWriter {
|
||||
///
|
||||
/// The opstamp at the last commit is returned.
|
||||
pub fn rollback(&mut self) -> crate::Result<Opstamp> {
|
||||
info!(
|
||||
self.logger(),
|
||||
"Rolling back to opstamp {}", self.committed_opstamp
|
||||
);
|
||||
info!("Rolling back to opstamp {}", self.committed_opstamp);
|
||||
// marks the segment updater as killed. From now on, all
|
||||
// segment updates will be ignored.
|
||||
self.segment_updater.kill();
|
||||
@@ -644,8 +610,6 @@ impl IndexWriter {
|
||||
/// using this API.
|
||||
/// See [`PreparedCommit::set_payload()`](PreparedCommit.html)
|
||||
pub fn prepare_commit(&mut self) -> crate::Result<PreparedCommit> {
|
||||
let logger = self.logger().clone();
|
||||
|
||||
// Here, because we join all of the worker threads,
|
||||
// all of the segment update for this commit have been
|
||||
// sent.
|
||||
@@ -656,10 +620,7 @@ impl IndexWriter {
|
||||
//
|
||||
// This will move uncommitted segments to the state of
|
||||
// committed segments.
|
||||
|
||||
let commit_opstamp = self.stamper.stamp();
|
||||
|
||||
info!(logger, "prepare-commit"; "opstamp" => commit_opstamp);
|
||||
info!("Preparing commit");
|
||||
|
||||
// this will drop the current document channel
|
||||
// and recreate a new one.
|
||||
@@ -675,8 +636,9 @@ impl IndexWriter {
|
||||
self.add_indexing_worker()?;
|
||||
}
|
||||
|
||||
let commit_opstamp = self.stamper.stamp();
|
||||
let prepared_commit = PreparedCommit::new(self, commit_opstamp);
|
||||
info!(logger, "Prepared commit {}", commit_opstamp);
|
||||
info!("Prepared commit {}", commit_opstamp);
|
||||
Ok(prepared_commit)
|
||||
}
|
||||
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use super::IndexWriter;
|
||||
use crate::Opstamp;
|
||||
use futures::executor::block_on;
|
||||
use slog::info;
|
||||
|
||||
/// A prepared commit
|
||||
pub struct PreparedCommit<'a> {
|
||||
@@ -32,7 +31,7 @@ impl<'a> PreparedCommit<'a> {
|
||||
}
|
||||
|
||||
pub fn commit(self) -> crate::Result<Opstamp> {
|
||||
info!(self.index_writer.logger(), "committing {}", self.opstamp);
|
||||
info!("committing {}", self.opstamp);
|
||||
let _ = block_on(
|
||||
self.index_writer
|
||||
.segment_updater()
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
use slog::{warn, Logger};
|
||||
|
||||
use super::segment_register::SegmentRegister;
|
||||
use crate::core::SegmentId;
|
||||
use crate::core::SegmentMeta;
|
||||
@@ -44,9 +42,9 @@ impl SegmentRegisters {
|
||||
///
|
||||
/// It guarantees the atomicity of the
|
||||
/// changes (merges especially)
|
||||
#[derive(Default)]
|
||||
pub struct SegmentManager {
|
||||
registers: RwLock<SegmentRegisters>,
|
||||
logger: Logger,
|
||||
}
|
||||
|
||||
impl Debug for SegmentManager {
|
||||
@@ -79,14 +77,12 @@ impl SegmentManager {
|
||||
pub fn from_segments(
|
||||
segment_metas: Vec<SegmentMeta>,
|
||||
delete_cursor: &DeleteCursor,
|
||||
logger: Logger,
|
||||
) -> SegmentManager {
|
||||
SegmentManager {
|
||||
registers: RwLock::new(SegmentRegisters {
|
||||
uncommitted: SegmentRegister::default(),
|
||||
committed: SegmentRegister::new(segment_metas, delete_cursor),
|
||||
}),
|
||||
logger,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -190,7 +186,7 @@ impl SegmentManager {
|
||||
let segments_status = registers_lock
|
||||
.segments_status(before_merge_segment_ids)
|
||||
.ok_or_else(|| {
|
||||
warn!(self.logger, "couldn't find segment in SegmentManager");
|
||||
warn!("couldn't find segment in SegmentManager");
|
||||
crate::TantivyError::InvalidArgument(
|
||||
"The segments that were merged could not be found in the SegmentManager. \
|
||||
This is not necessarily a bug, and can happen after a rollback for instance."
|
||||
|
||||
@@ -23,9 +23,9 @@ use futures::channel::oneshot;
|
||||
use futures::executor::{ThreadPool, ThreadPoolBuilder};
|
||||
use futures::future::Future;
|
||||
use futures::future::TryFutureExt;
|
||||
use slog::{debug, error, info, warn};
|
||||
use std::borrow::BorrowMut;
|
||||
use std::collections::HashSet;
|
||||
use std::io::Write;
|
||||
use std::ops::Deref;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
@@ -65,11 +65,12 @@ pub fn save_new_metas(schema: Schema, directory: &mut dyn Directory) -> crate::R
|
||||
///
|
||||
/// This method is not part of tantivy's public API
|
||||
fn save_metas(metas: &IndexMeta, directory: &mut dyn Directory) -> crate::Result<()> {
|
||||
let mut meta_json = serde_json::to_string_pretty(metas)?;
|
||||
info!("save metas");
|
||||
let mut buffer = serde_json::to_vec_pretty(metas)?;
|
||||
// Just adding a new line at the end of the buffer.
|
||||
meta_json.push_str("\n");
|
||||
debug!(directory.logger(), "save meta"; "content"=>&meta_json);
|
||||
directory.atomic_write(&META_FILEPATH, meta_json.as_bytes())?;
|
||||
writeln!(&mut buffer)?;
|
||||
directory.atomic_write(&META_FILEPATH, &buffer[..])?;
|
||||
debug!("Saved metas {:?}", serde_json::to_string_pretty(&metas));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -96,6 +97,7 @@ impl Deref for SegmentUpdater {
|
||||
async fn garbage_collect_files(
|
||||
segment_updater: SegmentUpdater,
|
||||
) -> crate::Result<GarbageCollectionResult> {
|
||||
info!("Running garbage collection");
|
||||
let mut index = segment_updater.index.clone();
|
||||
index
|
||||
.directory_mut()
|
||||
@@ -105,12 +107,14 @@ async fn garbage_collect_files(
|
||||
/// Merges a list of segments the list of segment givens in the `segment_entries`.
|
||||
/// This function happens in the calling thread and is computationally expensive.
|
||||
fn merge(
|
||||
merged_segment: Segment,
|
||||
index: &Index,
|
||||
mut segment_entries: Vec<SegmentEntry>,
|
||||
target_opstamp: Opstamp,
|
||||
) -> crate::Result<SegmentEntry> {
|
||||
// First we apply all of the delete to the merged segment, up to the target opstamp.
|
||||
// first we need to apply deletes to our segment.
|
||||
let merged_segment = index.new_segment();
|
||||
|
||||
// First we apply all of the delet to the merged segment, up to the target opstamp.
|
||||
for segment_entry in &mut segment_entries {
|
||||
let segment = index.segment(segment_entry.meta().clone());
|
||||
advance_deletes(segment, segment_entry, target_opstamp)?;
|
||||
@@ -163,8 +167,7 @@ impl SegmentUpdater {
|
||||
delete_cursor: &DeleteCursor,
|
||||
) -> crate::Result<SegmentUpdater> {
|
||||
let segments = index.searchable_segment_metas()?;
|
||||
let segment_manager =
|
||||
SegmentManager::from_segments(segments, delete_cursor, index.logger().clone());
|
||||
let segment_manager = SegmentManager::from_segments(segments, delete_cursor);
|
||||
let pool = ThreadPoolBuilder::new()
|
||||
.name_prefix("segment_updater")
|
||||
.pool_size(1)
|
||||
@@ -384,18 +387,7 @@ impl SegmentUpdater {
|
||||
.segment_manager
|
||||
.start_merge(merge_operation.segment_ids())?;
|
||||
|
||||
let segment_ids_str: String = merge_operation
|
||||
.segment_ids()
|
||||
.iter()
|
||||
.map(|segment_id| segment_id.to_string())
|
||||
.collect::<Vec<String>>()
|
||||
.join(",");
|
||||
|
||||
let merged_segment = self.index.new_segment();
|
||||
let logger = self.index.logger().new(slog::o!("segments"=>segment_ids_str, "merged-segment"=>merged_segment.id().to_string()));
|
||||
|
||||
let num_merges: usize = self.merge_operations.list().len();
|
||||
slog::info!(&logger, "merge"; "stage"=>"start", "num-merges" => num_merges);
|
||||
info!("Starting merge - {:?}", merge_operation.segment_ids());
|
||||
|
||||
let (merging_future_send, merging_future_recv) =
|
||||
oneshot::channel::<crate::Result<SegmentMeta>>();
|
||||
@@ -406,20 +398,22 @@ impl SegmentUpdater {
|
||||
// as well as which segment is currently in merge and therefore should not be
|
||||
// candidate for another merge.
|
||||
match merge(
|
||||
merged_segment,
|
||||
&segment_updater.index,
|
||||
segment_entries,
|
||||
merge_operation.target_opstamp(),
|
||||
) {
|
||||
Ok(after_merge_segment_entry) => {
|
||||
info!(&logger, "merge"; "stage" => "end");
|
||||
let segment_meta = segment_updater
|
||||
.end_merge(merge_operation, after_merge_segment_entry)
|
||||
.await;
|
||||
let _send_result = merging_future_send.send(segment_meta);
|
||||
}
|
||||
Err(e) => {
|
||||
error!(&logger, "merge"; "stage" => "fail", "cause"=>e.to_string());
|
||||
warn!(
|
||||
"Merge of {:?} was cancelled: {:?}",
|
||||
merge_operation.segment_ids().to_vec(),
|
||||
e
|
||||
);
|
||||
// ... cancel merge
|
||||
if cfg!(test) {
|
||||
panic!("Merge failed.");
|
||||
@@ -460,12 +454,11 @@ impl SegmentUpdater {
|
||||
.collect::<Vec<_>>();
|
||||
merge_candidates.extend(committed_merge_candidates.into_iter());
|
||||
|
||||
let logger = self.index.logger();
|
||||
for merge_operation in merge_candidates {
|
||||
if let Err(err) = self.start_merge(merge_operation) {
|
||||
warn!(
|
||||
logger,
|
||||
"merge-start-fail (not fatal, not necessarily a problem)"; "reason" => format!("{}", err),
|
||||
"Starting the merge failed for the following reason. This is not fatal. {}",
|
||||
err
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -478,11 +471,8 @@ impl SegmentUpdater {
|
||||
) -> impl Future<Output = crate::Result<SegmentMeta>> {
|
||||
let segment_updater = self.clone();
|
||||
let after_merge_segment_meta = after_merge_segment_entry.meta().clone();
|
||||
let logger = self.index.logger().new(
|
||||
slog::o!("segment"=>after_merge_segment_meta.id().to_string(),
|
||||
"delete-opstamp"=>after_merge_segment_meta.delete_opstamp()),
|
||||
);
|
||||
let end_merge_future = self.schedule_future(async move {
|
||||
info!("End merge {:?}", after_merge_segment_entry.meta());
|
||||
{
|
||||
let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone();
|
||||
if let Some(delete_operation) = delete_cursor.get() {
|
||||
@@ -496,7 +486,6 @@ impl SegmentUpdater {
|
||||
committed_opstamp,
|
||||
) {
|
||||
error!(
|
||||
logger,
|
||||
"Merge of {:?} was cancelled (advancing deletes failed): {:?}",
|
||||
merge_operation.segment_ids(),
|
||||
e
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use super::operation::AddOperation;
|
||||
use crate::core::Segment;
|
||||
use crate::core::SerializableSegment;
|
||||
use crate::fastfield::FastFieldsWriter;
|
||||
use crate::fieldnorm::{FieldNormReaders, FieldNormsWriter};
|
||||
@@ -14,7 +15,6 @@ use crate::tokenizer::{BoxTokenStream, PreTokenizedStream};
|
||||
use crate::tokenizer::{FacetTokenizer, TextAnalyzer};
|
||||
use crate::tokenizer::{TokenStreamChain, Tokenizer};
|
||||
use crate::Opstamp;
|
||||
use crate::{core::Segment, tokenizer::MAX_TOKEN_LEN};
|
||||
use crate::{DocId, SegmentComponent};
|
||||
use std::io;
|
||||
|
||||
@@ -146,9 +146,6 @@ impl SegmentWriter {
|
||||
for fake_str in facets {
|
||||
let mut unordered_term_id_opt = None;
|
||||
FacetTokenizer.token_stream(fake_str).process(&mut |token| {
|
||||
if token.text.len() > MAX_TOKEN_LEN {
|
||||
return;
|
||||
}
|
||||
term_buffer.set_text(&token.text);
|
||||
let unordered_term_id =
|
||||
multifield_postings.subscribe(doc_id, &term_buffer);
|
||||
|
||||
@@ -102,7 +102,10 @@
|
||||
extern crate serde_json;
|
||||
|
||||
#[macro_use]
|
||||
extern crate thiserror;
|
||||
extern crate log;
|
||||
|
||||
#[macro_use]
|
||||
extern crate failure;
|
||||
|
||||
#[cfg(all(test, feature = "unstable"))]
|
||||
extern crate test;
|
||||
@@ -145,7 +148,6 @@ pub mod schema;
|
||||
pub mod space_usage;
|
||||
pub mod store;
|
||||
pub mod termdict;
|
||||
pub use slog;
|
||||
|
||||
mod reader;
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ Postings module (also called inverted index)
|
||||
mod block_search;
|
||||
mod block_segment_postings;
|
||||
pub(crate) mod compression;
|
||||
mod field_stats;
|
||||
mod postings;
|
||||
mod postings_writer;
|
||||
mod recorder;
|
||||
@@ -15,6 +16,7 @@ mod stacker;
|
||||
mod term_info;
|
||||
|
||||
pub(crate) use self::block_search::BlockSearcher;
|
||||
pub(crate) use self::field_stats::{FieldStat, FieldStats};
|
||||
|
||||
pub(crate) use self::postings_writer::MultiFieldPostingsWriter;
|
||||
pub use self::serializer::{FieldSerializer, InvertedIndexSerializer};
|
||||
|
||||
@@ -227,17 +227,23 @@ pub trait PostingsWriter {
|
||||
term_buffer.set_field(field);
|
||||
let mut sink = |token: &Token| {
|
||||
// We skip all tokens with a len greater than u16.
|
||||
if token.text.len() > MAX_TOKEN_LEN {
|
||||
return;
|
||||
if token.text.len() <= MAX_TOKEN_LEN {
|
||||
term_buffer.set_text(token.text.as_str());
|
||||
self.subscribe(
|
||||
term_index,
|
||||
doc_id,
|
||||
token.position as u32,
|
||||
&term_buffer,
|
||||
heap,
|
||||
);
|
||||
} else {
|
||||
info!(
|
||||
"A token exceeding MAX_TOKEN_LEN ({}>{}) was dropped. Search for \
|
||||
MAX_TOKEN_LEN in the documentation for more information.",
|
||||
token.text.len(),
|
||||
MAX_TOKEN_LEN
|
||||
);
|
||||
}
|
||||
term_buffer.set_text(token.text.as_str());
|
||||
self.subscribe(
|
||||
term_index,
|
||||
doc_id,
|
||||
token.position as u32,
|
||||
&term_buffer,
|
||||
heap,
|
||||
);
|
||||
};
|
||||
token_stream.process(&mut sink)
|
||||
}
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
use super::TermInfo;
|
||||
use crate::common::{BinarySerializable, VInt};
|
||||
use super::{FieldStat, FieldStats, TermInfo};
|
||||
use crate::common::{CompositeWrite, CountingWriter};
|
||||
use crate::core::Segment;
|
||||
use crate::directory::WritePtr;
|
||||
@@ -11,6 +10,10 @@ use crate::query::BM25Weight;
|
||||
use crate::schema::Schema;
|
||||
use crate::schema::{Field, FieldEntry, FieldType};
|
||||
use crate::termdict::{TermDictionaryBuilder, TermOrdinal};
|
||||
use crate::{
|
||||
common::{BinarySerializable, VInt},
|
||||
directory::TerminatingWrite,
|
||||
};
|
||||
use crate::{DocId, Score};
|
||||
use std::cmp::Ordering;
|
||||
use std::io::{self, Write};
|
||||
@@ -51,6 +54,8 @@ pub struct InvertedIndexSerializer {
|
||||
postings_write: CompositeWrite<WritePtr>,
|
||||
positions_write: CompositeWrite<WritePtr>,
|
||||
positionsidx_write: CompositeWrite<WritePtr>,
|
||||
field_stats: FieldStats,
|
||||
field_stats_write: WritePtr,
|
||||
schema: Schema,
|
||||
}
|
||||
|
||||
@@ -61,6 +66,7 @@ impl InvertedIndexSerializer {
|
||||
postings_write: CompositeWrite<WritePtr>,
|
||||
positions_write: CompositeWrite<WritePtr>,
|
||||
positionsidx_write: CompositeWrite<WritePtr>,
|
||||
field_stats_write: WritePtr,
|
||||
schema: Schema,
|
||||
) -> crate::Result<InvertedIndexSerializer> {
|
||||
Ok(InvertedIndexSerializer {
|
||||
@@ -68,18 +74,21 @@ impl InvertedIndexSerializer {
|
||||
postings_write,
|
||||
positions_write,
|
||||
positionsidx_write,
|
||||
field_stats: FieldStats::default(),
|
||||
field_stats_write,
|
||||
schema,
|
||||
})
|
||||
}
|
||||
|
||||
/// Open a new `PostingsSerializer` for the given segment
|
||||
pub fn open(segment: &mut Segment) -> crate::Result<InvertedIndexSerializer> {
|
||||
use crate::SegmentComponent::{POSITIONS, POSITIONSSKIP, POSTINGS, TERMS};
|
||||
use crate::SegmentComponent::{FIELDSTATS, POSITIONS, POSITIONSSKIP, POSTINGS, TERMS};
|
||||
InvertedIndexSerializer::create(
|
||||
CompositeWrite::wrap(segment.open_write(TERMS)?),
|
||||
CompositeWrite::wrap(segment.open_write(POSTINGS)?),
|
||||
CompositeWrite::wrap(segment.open_write(POSITIONS)?),
|
||||
CompositeWrite::wrap(segment.open_write(POSITIONSSKIP)?),
|
||||
segment.open_write(FIELDSTATS)?,
|
||||
segment.schema(),
|
||||
)
|
||||
}
|
||||
@@ -94,6 +103,8 @@ impl InvertedIndexSerializer {
|
||||
total_num_tokens: u64,
|
||||
fieldnorm_reader: Option<FieldNormReader>,
|
||||
) -> io::Result<FieldSerializer<'_>> {
|
||||
self.field_stats
|
||||
.insert(field, FieldStat::new(total_num_tokens));
|
||||
let field_entry: &FieldEntry = self.schema.get_field_entry(field);
|
||||
let term_dictionary_write = self.terms_write.for_field(field);
|
||||
let postings_write = self.postings_write.for_field(field);
|
||||
@@ -112,7 +123,10 @@ impl InvertedIndexSerializer {
|
||||
}
|
||||
|
||||
/// Closes the serializer.
|
||||
pub fn close(self) -> io::Result<()> {
|
||||
pub fn close(mut self) -> io::Result<()> {
|
||||
self.field_stats
|
||||
.serialize(self.field_stats_write.get_mut())?;
|
||||
self.field_stats_write.terminate()?;
|
||||
self.terms_write.close()?;
|
||||
self.postings_write.close()?;
|
||||
self.positions_write.close()?;
|
||||
@@ -142,7 +156,6 @@ impl<'a> FieldSerializer<'a> {
|
||||
positionsidx_write: &'a mut CountingWriter<WritePtr>,
|
||||
fieldnorm_reader: Option<FieldNormReader>,
|
||||
) -> io::Result<FieldSerializer<'a>> {
|
||||
total_num_tokens.serialize(postings_write)?;
|
||||
let (term_freq_enabled, position_enabled): (bool, bool) = match field_type {
|
||||
FieldType::Str(ref text_options) => {
|
||||
if let Some(text_indexing_options) = text_options.get_indexing_options() {
|
||||
@@ -190,7 +203,8 @@ impl<'a> FieldSerializer<'a> {
|
||||
.unwrap_or(0u64);
|
||||
TermInfo {
|
||||
doc_freq: 0,
|
||||
postings_offset: self.postings_serializer.addr(),
|
||||
postings_start_offset: self.postings_serializer.addr(),
|
||||
postings_end_offset: 0u64,
|
||||
positions_idx,
|
||||
}
|
||||
}
|
||||
@@ -244,10 +258,12 @@ impl<'a> FieldSerializer<'a> {
|
||||
/// using `VInt` encoding.
|
||||
pub fn close_term(&mut self) -> io::Result<()> {
|
||||
if self.term_open {
|
||||
self.term_dictionary_builder
|
||||
.insert_value(&self.current_term_info)?;
|
||||
self.postings_serializer
|
||||
.close_term(self.current_term_info.doc_freq)?;
|
||||
let end_offset = self.postings_serializer.addr();
|
||||
self.current_term_info.postings_end_offset = end_offset;
|
||||
self.term_dictionary_builder
|
||||
.insert_value(&self.current_term_info)?;
|
||||
self.term_open = false;
|
||||
}
|
||||
Ok(())
|
||||
|
||||
@@ -7,35 +7,49 @@ use std::io;
|
||||
pub struct TermInfo {
|
||||
/// Number of documents in the segment containing the term
|
||||
pub doc_freq: u32,
|
||||
/// Start offset within the postings (`.idx`) file.
|
||||
pub postings_offset: u64,
|
||||
/// Start offset of the posting list within the postings (`.idx`) file.
|
||||
pub postings_start_offset: u64,
|
||||
/// End offset of the posting list within the postings (`.idx`) file.
|
||||
pub postings_end_offset: u64,
|
||||
/// Start offset of the first block within the position (`.pos`) file.
|
||||
pub positions_idx: u64,
|
||||
}
|
||||
|
||||
impl TermInfo {
|
||||
pub(crate) fn posting_num_bytes(&self) -> u32 {
|
||||
let num_bytes = self.postings_end_offset - self.postings_start_offset;
|
||||
assert!(num_bytes <= std::u32::MAX as u64);
|
||||
num_bytes as u32
|
||||
}
|
||||
}
|
||||
|
||||
impl FixedSize for TermInfo {
|
||||
/// Size required for the binary serialization of a `TermInfo` object.
|
||||
/// This is large, but in practise, `TermInfo` are encoded in blocks and
|
||||
/// only the first `TermInfo` of a block is serialized uncompressed.
|
||||
/// The subsequent `TermInfo` are delta encoded and bitpacked.
|
||||
const SIZE_IN_BYTES: usize = u32::SIZE_IN_BYTES + 2 * u64::SIZE_IN_BYTES;
|
||||
const SIZE_IN_BYTES: usize = 2 * u32::SIZE_IN_BYTES + 2 * u64::SIZE_IN_BYTES;
|
||||
}
|
||||
|
||||
impl BinarySerializable for TermInfo {
|
||||
fn serialize<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
|
||||
self.doc_freq.serialize(writer)?;
|
||||
self.postings_offset.serialize(writer)?;
|
||||
self.postings_start_offset.serialize(writer)?;
|
||||
self.posting_num_bytes().serialize(writer)?;
|
||||
self.positions_idx.serialize(writer)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn deserialize<R: io::Read>(reader: &mut R) -> io::Result<Self> {
|
||||
let doc_freq = u32::deserialize(reader)?;
|
||||
let postings_offset = u64::deserialize(reader)?;
|
||||
let postings_start_offset = u64::deserialize(reader)?;
|
||||
let postings_num_bytes = u32::deserialize(reader)?;
|
||||
let postings_end_offset = postings_start_offset + u64::from(postings_num_bytes);
|
||||
let positions_idx = u64::deserialize(reader)?;
|
||||
Ok(TermInfo {
|
||||
doc_freq,
|
||||
postings_offset,
|
||||
postings_start_offset,
|
||||
postings_end_offset,
|
||||
positions_idx,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -21,48 +21,51 @@ use std::str::FromStr;
|
||||
use tantivy_query_grammar::{UserInputAST, UserInputBound, UserInputLeaf};
|
||||
|
||||
/// Possible error that may happen when parsing a query.
|
||||
#[derive(Debug, PartialEq, Eq, Error)]
|
||||
#[derive(Debug, PartialEq, Eq, Fail)]
|
||||
pub enum QueryParserError {
|
||||
/// Error in the query syntax
|
||||
#[error("Syntax Error")]
|
||||
#[fail(display = "Syntax Error")]
|
||||
SyntaxError,
|
||||
/// `FieldDoesNotExist(field_name: String)`
|
||||
/// The query references a field that is not in the schema
|
||||
#[error("File does not exists: '{0:?}'")]
|
||||
#[fail(display = "File does not exists: '{:?}'", _0)]
|
||||
FieldDoesNotExist(String),
|
||||
/// The query contains a term for a `u64` or `i64`-field, but the value
|
||||
/// is neither.
|
||||
#[error("Expected a valid integer: '{0:?}'")]
|
||||
#[fail(display = "Expected a valid integer: '{:?}'", _0)]
|
||||
ExpectedInt(ParseIntError),
|
||||
/// The query contains a term for a `f64`-field, but the value
|
||||
/// is not a f64.
|
||||
#[error("Invalid query: Only excluding terms given")]
|
||||
#[fail(display = "Invalid query: Only excluding terms given")]
|
||||
ExpectedFloat(ParseFloatError),
|
||||
/// It is forbidden queries that are only "excluding". (e.g. -title:pop)
|
||||
#[error("Invalid query: Only excluding terms given")]
|
||||
#[fail(display = "Invalid query: Only excluding terms given")]
|
||||
AllButQueryForbidden,
|
||||
/// If no default field is declared, running a query without any
|
||||
/// field specified is forbbidden.
|
||||
#[error("No default field declared and no field specified in query")]
|
||||
#[fail(display = "No default field declared and no field specified in query")]
|
||||
NoDefaultFieldDeclared,
|
||||
/// The field searched for is not declared
|
||||
/// as indexed in the schema.
|
||||
#[error("The field '{0:?}' is not declared as indexed")]
|
||||
#[fail(display = "The field '{:?}' is not declared as indexed", _0)]
|
||||
FieldNotIndexed(String),
|
||||
/// A phrase query was requested for a field that does not
|
||||
/// have any positions indexed.
|
||||
#[error("The field '{0:?}' does not have positions indexed")]
|
||||
#[fail(display = "The field '{:?}' does not have positions indexed", _0)]
|
||||
FieldDoesNotHavePositionsIndexed(String),
|
||||
/// The tokenizer for the given field is unknown
|
||||
/// The two argument strings are the name of the field, the name of the tokenizer
|
||||
#[error("The tokenizer '{0:?}' for the field '{1:?}' is unknown")]
|
||||
#[fail(
|
||||
display = "The tokenizer '{:?}' for the field '{:?}' is unknown",
|
||||
_0, _1
|
||||
)]
|
||||
UnknownTokenizer(String, String),
|
||||
/// The query contains a range query with a phrase as one of the bounds.
|
||||
/// Only terms can be used as bounds.
|
||||
#[error("A range query cannot have a phrase as one of the bounds")]
|
||||
#[fail(display = "A range query cannot have a phrase as one of the bounds")]
|
||||
RangeMustNotHavePhrase,
|
||||
/// The format for the date field is not RFC 3339 compliant.
|
||||
#[error("The date field has an invalid format")]
|
||||
#[fail(display = "The date field has an invalid format")]
|
||||
DateFormatError(chrono::ParseError),
|
||||
}
|
||||
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
mod pool;
|
||||
|
||||
use slog::error;
|
||||
|
||||
pub use self::pool::LeasedItem;
|
||||
use self::pool::Pool;
|
||||
use crate::core::Segment;
|
||||
@@ -64,7 +62,6 @@ impl IndexReaderBuilder {
|
||||
/// to open different segment readers. It may take hundreds of milliseconds
|
||||
/// of time and it may return an error.
|
||||
pub fn try_into(self) -> crate::Result<IndexReader> {
|
||||
let logger = self.index.logger().clone();
|
||||
let inner_reader = InnerIndexReader {
|
||||
index: self.index,
|
||||
num_searchers: self.num_searchers,
|
||||
@@ -83,8 +80,8 @@ impl IndexReaderBuilder {
|
||||
let callback = move || {
|
||||
if let Err(err) = inner_reader_arc_clone.reload() {
|
||||
error!(
|
||||
logger,
|
||||
"Error while loading searcher after commit was detected. {:?}", err
|
||||
"Error while loading searcher after commit was detected. {:?}",
|
||||
err
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -381,16 +381,19 @@ impl<'de> Deserialize<'de> for Schema {
|
||||
|
||||
/// Error that may happen when deserializing
|
||||
/// a document from JSON.
|
||||
#[derive(Debug, Error, PartialEq)]
|
||||
#[derive(Debug, Fail, PartialEq)]
|
||||
pub enum DocParsingError {
|
||||
/// The payload given is not valid JSON.
|
||||
#[error("The provided string is not valid JSON")]
|
||||
#[fail(display = "The provided string is not valid JSON")]
|
||||
NotJSON(String),
|
||||
/// One of the value node could not be parsed.
|
||||
#[error("The field '{0:?}' could not be parsed: {1:?}")]
|
||||
#[fail(display = "The field '{:?}' could not be parsed: {:?}", _0, _1)]
|
||||
ValueError(String, ValueParsingError),
|
||||
/// The json-document contains a field that is not declared in the schema.
|
||||
#[error("The document contains a field that is not declared in the schema: {0:?}")]
|
||||
#[fail(
|
||||
display = "The document contains a field that is not declared in the schema: {:?}",
|
||||
_0
|
||||
)]
|
||||
NoSuchFieldInSchema(String),
|
||||
}
|
||||
|
||||
|
||||
@@ -25,6 +25,8 @@ pub enum ComponentSpaceUsage {
|
||||
Store(StoreSpaceUsage),
|
||||
/// Some sort of raw byte count
|
||||
Basic(ByteCount),
|
||||
///
|
||||
Unimplemented,
|
||||
}
|
||||
|
||||
/// Represents combined space usage of an entire searcher and its component segments.
|
||||
@@ -119,7 +121,7 @@ impl SegmentSpaceUsage {
|
||||
/// Clones the underlying data.
|
||||
/// Use the components directly if this is somehow in performance critical code.
|
||||
pub fn component(&self, component: SegmentComponent) -> ComponentSpaceUsage {
|
||||
use self::ComponentSpaceUsage::*;
|
||||
use self::ComponentSpaceUsage::{Basic, PerField, Store, Unimplemented};
|
||||
use crate::SegmentComponent::*;
|
||||
match component {
|
||||
POSTINGS => PerField(self.postings().clone()),
|
||||
@@ -130,6 +132,7 @@ impl SegmentSpaceUsage {
|
||||
TERMS => PerField(self.termdict().clone()),
|
||||
STORE => Store(self.store().clone()),
|
||||
DELETE => Basic(self.deletes()),
|
||||
FIELDSTATS => Unimplemented,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -68,17 +68,19 @@ impl<T: BinarySerializable> SkipListBuilder<T> {
|
||||
}
|
||||
|
||||
pub fn insert(&mut self, key: u64, dest: &T) -> io::Result<()> {
|
||||
let mut layer_id = 0;
|
||||
let mut skip_pointer = self.data_layer.insert(key, dest)?;
|
||||
for layer_id in 0.. {
|
||||
if let Some((skip_doc_id, skip_offset)) = skip_pointer {
|
||||
skip_pointer = self
|
||||
loop {
|
||||
skip_pointer = match skip_pointer {
|
||||
Some((skip_doc_id, skip_offset)) => self
|
||||
.get_skip_layer(layer_id)
|
||||
.insert(skip_doc_id, &skip_offset)?;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
.insert(skip_doc_id, &skip_offset)?,
|
||||
None => {
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
layer_id += 1;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn write<W: Write>(self, output: &mut W) -> io::Result<()> {
|
||||
|
||||
@@ -44,11 +44,13 @@ mod tests {
|
||||
|
||||
const BLOCK_SIZE: usize = 1_500;
|
||||
|
||||
fn make_term_info(val: u64) -> TermInfo {
|
||||
fn make_term_info(term_ord: u64) -> TermInfo {
|
||||
let offset = |term_ord: u64| term_ord * 100 + term_ord * term_ord;
|
||||
TermInfo {
|
||||
doc_freq: val as u32,
|
||||
positions_idx: val * 2u64,
|
||||
postings_offset: val * 3u64,
|
||||
doc_freq: term_ord as u32,
|
||||
postings_start_offset: offset(term_ord),
|
||||
postings_end_offset: offset(term_ord + 1),
|
||||
positions_idx: offset(term_ord) * 2u64,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -208,20 +210,14 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_stream_high_range_prefix_suffix() {
|
||||
fn test_stream_high_range_prefix_suffix() -> std::io::Result<()> {
|
||||
let buffer: Vec<u8> = {
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
|
||||
// term requires more than 16bits
|
||||
term_dictionary_builder
|
||||
.insert("abcdefghijklmnopqrstuvwxy", &make_term_info(1))
|
||||
.unwrap();
|
||||
term_dictionary_builder
|
||||
.insert("abcdefghijklmnopqrstuvwxyz", &make_term_info(2))
|
||||
.unwrap();
|
||||
term_dictionary_builder
|
||||
.insert("abr", &make_term_info(2))
|
||||
.unwrap();
|
||||
term_dictionary_builder.finish().unwrap()
|
||||
term_dictionary_builder.insert("abcdefghijklmnopqrstuvwxy", &make_term_info(1))?;
|
||||
term_dictionary_builder.insert("abcdefghijklmnopqrstuvwxyz", &make_term_info(2))?;
|
||||
term_dictionary_builder.insert("abr", &make_term_info(3))?;
|
||||
term_dictionary_builder.finish()?
|
||||
};
|
||||
let source = ReadOnlySource::from(buffer);
|
||||
let term_dictionary: TermDictionary = TermDictionary::from_source(&source);
|
||||
@@ -229,12 +225,15 @@ mod tests {
|
||||
assert!(kv_stream.advance());
|
||||
assert_eq!(kv_stream.key(), "abcdefghijklmnopqrstuvwxy".as_bytes());
|
||||
assert_eq!(kv_stream.value(), &make_term_info(1));
|
||||
dbg!(make_term_info(1));
|
||||
assert!(kv_stream.advance());
|
||||
assert_eq!(kv_stream.key(), "abcdefghijklmnopqrstuvwxyz".as_bytes());
|
||||
assert_eq!(kv_stream.value(), &make_term_info(2));
|
||||
assert!(kv_stream.advance());
|
||||
assert_eq!(kv_stream.key(), "abr".as_bytes());
|
||||
assert_eq!(kv_stream.value(), &make_term_info(3));
|
||||
assert!(!kv_stream.advance());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -57,21 +57,28 @@ impl TermInfoBlockMeta {
|
||||
self.doc_freq_nbits + self.postings_offset_nbits + self.positions_idx_nbits
|
||||
}
|
||||
|
||||
// Here inner_offset is the offset within the block, WITHOUT the first term_info.
|
||||
// In other word, term_info #1,#2,#3 gets inner_offset 0,1,2... While term_info #0
|
||||
// is encoded without bitpacking.
|
||||
fn deserialize_term_info(&self, data: &[u8], inner_offset: usize) -> TermInfo {
|
||||
assert!(inner_offset < BLOCK_LEN - 1);
|
||||
let num_bits = self.num_bits() as usize;
|
||||
let mut cursor = num_bits * inner_offset;
|
||||
|
||||
let postings_start_offset = extract_bits(data, cursor, self.postings_offset_nbits);
|
||||
let postings_end_offset = self.ref_term_info.postings_start_offset
|
||||
+ extract_bits(data, cursor + num_bits, self.postings_offset_nbits);
|
||||
cursor += self.postings_offset_nbits as usize;
|
||||
|
||||
let doc_freq = extract_bits(data, cursor, self.doc_freq_nbits) as u32;
|
||||
cursor += self.doc_freq_nbits as usize;
|
||||
|
||||
let postings_offset = extract_bits(data, cursor, self.postings_offset_nbits);
|
||||
cursor += self.postings_offset_nbits as usize;
|
||||
|
||||
let positions_idx = extract_bits(data, cursor, self.positions_idx_nbits);
|
||||
|
||||
TermInfo {
|
||||
doc_freq,
|
||||
postings_offset: postings_offset + self.ref_term_info.postings_offset,
|
||||
postings_start_offset: postings_start_offset + self.ref_term_info.postings_start_offset,
|
||||
postings_end_offset,
|
||||
positions_idx: positions_idx + self.ref_term_info.positions_idx,
|
||||
}
|
||||
}
|
||||
@@ -126,14 +133,13 @@ impl TermInfoStore {
|
||||
.expect("Failed to deserialize terminfoblockmeta");
|
||||
let inner_offset = (term_ord as usize) % BLOCK_LEN;
|
||||
if inner_offset == 0 {
|
||||
term_info_block_data.ref_term_info
|
||||
} else {
|
||||
let term_info_data = self.term_info_source.as_slice();
|
||||
term_info_block_data.deserialize_term_info(
|
||||
&term_info_data[term_info_block_data.offset as usize..],
|
||||
inner_offset - 1,
|
||||
)
|
||||
return term_info_block_data.ref_term_info;
|
||||
}
|
||||
let term_info_data = self.term_info_source.as_slice();
|
||||
term_info_block_data.deserialize_term_info(
|
||||
&term_info_data[term_info_block_data.offset as usize..],
|
||||
inner_offset - 1,
|
||||
)
|
||||
}
|
||||
|
||||
pub fn num_terms(&self) -> usize {
|
||||
@@ -154,16 +160,17 @@ fn bitpack_serialize<W: Write>(
|
||||
term_info_block_meta: &TermInfoBlockMeta,
|
||||
term_info: &TermInfo,
|
||||
) -> io::Result<()> {
|
||||
bit_packer.write(
|
||||
term_info.postings_start_offset,
|
||||
term_info_block_meta.postings_offset_nbits,
|
||||
write,
|
||||
)?;
|
||||
bit_packer.write(
|
||||
u64::from(term_info.doc_freq),
|
||||
term_info_block_meta.doc_freq_nbits,
|
||||
write,
|
||||
)?;
|
||||
bit_packer.write(
|
||||
term_info.postings_offset,
|
||||
term_info_block_meta.postings_offset_nbits,
|
||||
write,
|
||||
)?;
|
||||
|
||||
bit_packer.write(
|
||||
term_info.positions_idx,
|
||||
term_info_block_meta.positions_idx_nbits,
|
||||
@@ -183,23 +190,27 @@ impl TermInfoStoreWriter {
|
||||
}
|
||||
|
||||
fn flush_block(&mut self) -> io::Result<()> {
|
||||
if self.term_infos.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
let mut bit_packer = BitPacker::new();
|
||||
let ref_term_info = self.term_infos[0].clone();
|
||||
|
||||
let last_term_info = if let Some(last_term_info) = self.term_infos.last().cloned() {
|
||||
last_term_info
|
||||
} else {
|
||||
return Ok(());
|
||||
};
|
||||
let postings_end_offset =
|
||||
last_term_info.postings_end_offset - ref_term_info.postings_start_offset;
|
||||
for term_info in &mut self.term_infos[1..] {
|
||||
term_info.postings_offset -= ref_term_info.postings_offset;
|
||||
term_info.postings_start_offset -= ref_term_info.postings_start_offset;
|
||||
term_info.positions_idx -= ref_term_info.positions_idx;
|
||||
}
|
||||
|
||||
let mut max_doc_freq: u32 = 0u32;
|
||||
let mut max_postings_offset: u64 = 0u64;
|
||||
let mut max_positions_idx: u64 = 0u64;
|
||||
let max_postings_offset: u64 = postings_end_offset;
|
||||
let max_positions_idx: u64 = last_term_info.positions_idx;
|
||||
|
||||
for term_info in &self.term_infos[1..] {
|
||||
max_doc_freq = cmp::max(max_doc_freq, term_info.doc_freq);
|
||||
max_postings_offset = cmp::max(max_postings_offset, term_info.postings_offset);
|
||||
max_positions_idx = cmp::max(max_positions_idx, term_info.positions_idx);
|
||||
}
|
||||
|
||||
let max_doc_freq_nbits: u8 = compute_num_bits(u64::from(max_doc_freq));
|
||||
@@ -224,6 +235,12 @@ impl TermInfoStoreWriter {
|
||||
)?;
|
||||
}
|
||||
|
||||
bit_packer.write(
|
||||
postings_end_offset,
|
||||
term_info_block_meta.postings_offset_nbits,
|
||||
&mut self.buffer_term_infos,
|
||||
)?;
|
||||
|
||||
// Block need end up at the end of a byte.
|
||||
bit_packer.flush(&mut self.buffer_term_infos)?;
|
||||
self.term_infos.clear();
|
||||
@@ -232,6 +249,7 @@ impl TermInfoStoreWriter {
|
||||
}
|
||||
|
||||
pub fn write_term_info(&mut self, term_info: &TermInfo) -> io::Result<()> {
|
||||
assert!(term_info.postings_end_offset >= term_info.postings_start_offset);
|
||||
self.num_terms += 1u64;
|
||||
self.term_infos.push(term_info.clone());
|
||||
if self.term_infos.len() >= BLOCK_LEN {
|
||||
@@ -291,10 +309,11 @@ mod tests {
|
||||
#[test]
|
||||
fn test_term_info_block_meta_serialization() {
|
||||
let term_info_block_meta = TermInfoBlockMeta {
|
||||
offset: 2009,
|
||||
offset: 2009u64,
|
||||
ref_term_info: TermInfo {
|
||||
doc_freq: 512,
|
||||
postings_offset: 51,
|
||||
postings_start_offset: 51,
|
||||
postings_end_offset: 57u64,
|
||||
positions_idx: 3584,
|
||||
},
|
||||
doc_freq_nbits: 10,
|
||||
@@ -312,10 +331,12 @@ mod tests {
|
||||
fn test_pack() {
|
||||
let mut store_writer = TermInfoStoreWriter::new();
|
||||
let mut term_infos = vec![];
|
||||
let offset = |i| (i * 13 + i * i) as u64;
|
||||
for i in 0..1000 {
|
||||
let term_info = TermInfo {
|
||||
doc_freq: i as u32,
|
||||
postings_offset: (i / 10) as u64,
|
||||
postings_start_offset: offset(i),
|
||||
postings_end_offset: offset(i + 1),
|
||||
positions_idx: (i * 7) as u64,
|
||||
};
|
||||
store_writer.write_term_info(&term_info).unwrap();
|
||||
@@ -325,7 +346,12 @@ mod tests {
|
||||
store_writer.serialize(&mut buffer).unwrap();
|
||||
let term_info_store = TermInfoStore::open(&ReadOnlySource::from(buffer));
|
||||
for i in 0..1000 {
|
||||
assert_eq!(term_info_store.get(i as u64), term_infos[i]);
|
||||
assert_eq!(
|
||||
term_info_store.get(i as u64),
|
||||
term_infos[i],
|
||||
"term info {}",
|
||||
i
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user