Compare commits

...

21 Commits

Author SHA1 Message Date
Paul Masurel
4e93c681c0 Removed the SegmentCollector type from the Generics of the
FilterCollector
2020-11-25 14:13:22 +09:00
Paul Masurel
5a8546bb5a Revert "Move SegmentUpdater::list_files() to Index" 2020-11-25 14:01:26 +09:00
Adrien Guillo
f0c1867637 Move SegmentUpdater::list_files() to Index
... and make the method public
2020-11-25 14:01:26 +09:00
Paul Masurel
35fbf738c9 Refactoring of the skip index.
The skip index now identifies both the start and the end offset
of blocks. Checkpoints are compressed in blocks, reaching better
compression.
2020-11-25 14:01:26 +09:00
Adrien Guillo
6f919c61c7 Cache store reader blocks in an LRU fashion 2020-11-25 14:01:26 +09:00
Paul Masurel
86efdb778c Marked blockwand test as ignored.
- Using impl trait for iterating `matching_segments` in the termdict
merger
2020-11-25 14:01:26 +09:00
Paul Masurel
960c2ee39d Fixing unit test. 2020-11-25 14:01:26 +09:00
Paul Masurel
6653ed8eb6 Update src/core/searcher.rs
Co-authored-by: Adrien Guillo <adrien.guillo@gmail.com>
2020-11-25 14:01:26 +09:00
Paul Masurel
e20eedd98b Make field TermMerger API public 2020-11-25 14:01:26 +09:00
Paul Masurel
90c1fdefdc Closes #930 Minor bug.
Watch callback could be callback if the last watch handle was dropped
shortly before meta.json is called.
2020-11-25 14:01:26 +09:00
Paul Masurel
3e27d4c211 Making block wand test more robusts 2020-11-25 14:01:26 +09:00
Adrien Guillo
4dc268482f Modified Directory::exists API to return Result<bool, OpenReadError> 2020-11-25 14:01:26 +09:00
Paul Masurel
4f20dd410e Avoid loading fieldnorms when not necessary 2020-11-25 14:01:26 +09:00
Paul Masurel
b1125638f4 TermInfo contain the end_offset of the postings.
We slice the ReadOnlySource tightly.
2020-11-25 14:01:26 +09:00
Adrien Guillo
a4c95852e5 Updated CHANGELOG 2020-11-25 14:01:26 +09:00
Adrien Guillo
b28be75728 Implement FileWatcher 2020-11-25 14:01:26 +09:00
barrotsteindev
af7cb3ff0f simplified FilterCollector#for_segment 2020-11-23 11:09:03 +02:00
barrotsteindev
30d86c653a fmt 2020-11-22 10:40:45 +02:00
barrotsteindev
6f4c051700 updated docs 2020-11-21 21:26:44 +02:00
barrotsteindev
212e091553 removed unnecessary static liftimes 2020-11-21 21:24:27 +02:00
barrotsteindev
146058bdbf added initial implementation for filter_collector 2020-11-21 18:09:08 +02:00
41 changed files with 1403 additions and 745 deletions

View File

@@ -7,6 +7,7 @@ Tantivy 0.14.0
- Added support for Brotli compression in the DocStore. (@ppodolsky)
- Added helper for building intersections and unions in BooleanQuery (@guilload)
- Bugfix in `Query::explain`
- Removed dependency on `notify` #924. Replaced with `FileWatcher` struct that polls meta file every 500ms in background thread. (@halvorboe @guilload)
Tantivy 0.13.2
===================

View File

@@ -30,7 +30,6 @@ serde_json = "1"
num_cpus = "1"
fs2={version="0.4", optional=true}
levenshtein_automata = "0.2"
notify = {version="4", optional=true}
uuid = { version = "0.8", features = ["v4", "serde"] }
crossbeam = "0.8"
futures = {version = "0.3", features=["thread-pool"] }
@@ -48,6 +47,7 @@ murmurhash32 = "0.2"
chrono = "0.4"
smallvec = "1"
rayon = "1"
lru = "0.6"
[target.'cfg(windows)'.dependencies]
winapi = "0.3"
@@ -73,7 +73,7 @@ overflow-checks = true
[features]
default = ["mmap"]
mmap = ["fs2", "tempfile", "memmap", "notify"]
mmap = ["fs2", "tempfile", "memmap"]
brotli-compression = ["brotli"]
lz4-compression = ["lz4"]
failpoints = ["fail/failpoints"]

View File

@@ -0,0 +1,148 @@
// # Custom collector example
//
// This example shows how you can implement your own
// collector. As an example, we will compute a collector
// that computes the standard deviation of a given fast field.
//
// Of course, you can have a look at the tantivy's built-in collectors
// such as the `CountCollector` for more examples.
// ---
// Importing tantivy...
use crate::collector::{Collector, SegmentCollector};
use crate::fastfield::FastFieldReader;
use crate::schema::Field;
use crate::{Score, SegmentReader, TantivyError};
/// The `FilterCollector` collector filters docs using a u64 fast field value and a predicate.
/// Only the documents for which the predicate returned "true" will be passed on to the next collector.
///
/// ```rust
/// use tantivy::collector::{TopDocs, FilterCollector};
/// use tantivy::query::QueryParser;
/// use tantivy::schema::{Schema, TEXT, INDEXED, FAST};
/// use tantivy::{doc, DocAddress, Index};
///
/// let mut schema_builder = Schema::builder();
/// let title = schema_builder.add_text_field("title", TEXT);
/// let price = schema_builder.add_u64_field("price", INDEXED | FAST);
/// let schema = schema_builder.build();
/// let index = Index::create_in_ram(schema);
///
/// let mut index_writer = index.writer_with_num_threads(1, 10_000_000).unwrap();
/// index_writer.add_document(doc!(title => "The Name of the Wind", price => 30_200u64));
/// index_writer.add_document(doc!(title => "The Diary of Muadib", price => 29_240u64));
/// index_writer.add_document(doc!(title => "A Dairy Cow", price => 21_240u64));
/// index_writer.add_document(doc!(title => "The Diary of a Young Girl", price => 20_120u64));
/// assert!(index_writer.commit().is_ok());
///
/// let reader = index.reader().unwrap();
/// let searcher = reader.searcher();
///
/// let query_parser = QueryParser::for_index(&index, vec![title]);
/// let query = query_parser.parse_query("diary").unwrap();
/// let no_filter_collector = FilterCollector::new(price, &|value| true, TopDocs::with_limit(2));
/// let top_docs = searcher.search(&query, &no_filter_collector).unwrap();
///
/// assert_eq!(top_docs.len(), 2);
/// assert_eq!(top_docs[0].1, DocAddress(0, 1));
/// assert_eq!(top_docs[1].1, DocAddress(0, 3));
///
/// let filter_all_collector = FilterCollector::new(price, &|value| false, TopDocs::with_limit(2));
/// let filtered_top_docs = searcher.search(&query, &filter_all_collector).unwrap();
///
/// assert_eq!(filtered_top_docs.len(), 0);
/// ```
pub struct FilterCollector<TCollector> {
field: Field,
collector: TCollector,
predicate: &'static (dyn Fn(u64) -> bool + Send + Sync),
}
impl<TCollector> FilterCollector<TCollector>
where
TCollector: Collector + Send + Sync,
{
pub fn new(
field: Field,
predicate: &'static (dyn Fn(u64) -> bool + Send + Sync),
collector: TCollector,
) -> FilterCollector<TCollector> {
FilterCollector {
field,
predicate,
collector,
}
}
}
impl<TCollector> Collector for FilterCollector<TCollector>
where
TCollector: Collector + Send + Sync,
{
// That's the type of our result.
// Our standard deviation will be a float.
type Fruit = TCollector::Fruit;
type Child = FilterSegmentCollector<TCollector::Child>;
fn for_segment(
&self,
segment_local_id: u32,
segment_reader: &SegmentReader,
) -> crate::Result<FilterSegmentCollector<TCollector::Child>> {
let fast_field_reader = segment_reader
.fast_fields()
.u64(self.field)
.ok_or_else(|| {
let field_name = segment_reader.schema().get_field_name(self.field);
TantivyError::SchemaError(format!(
"Field {:?} is not a u64 fast field.",
field_name
))
})?;
let segment_collector = self
.collector
.for_segment(segment_local_id, segment_reader)?;
Ok(FilterSegmentCollector {
fast_field_reader,
segment_collector: segment_collector,
predicate: self.predicate,
})
}
fn requires_scoring(&self) -> bool {
self.collector.requires_scoring()
}
fn merge_fruits(
&self,
segment_fruits: Vec<<TCollector::Child as SegmentCollector>::Fruit>,
) -> crate::Result<TCollector::Fruit> {
self.collector.merge_fruits(segment_fruits)
}
}
pub struct FilterSegmentCollector<TSegmentCollector> {
fast_field_reader: FastFieldReader<u64>,
segment_collector: TSegmentCollector,
predicate: &'static (dyn Fn(u64) -> bool + Send + Sync),
}
impl<TSegmentCollector> SegmentCollector for FilterSegmentCollector<TSegmentCollector>
where
TSegmentCollector: SegmentCollector,
{
type Fruit = TSegmentCollector::Fruit;
fn collect(&mut self, doc: u32, score: Score) {
let value = self.fast_field_reader.get(doc);
if (self.predicate)(value) {
self.segment_collector.collect(doc, score)
}
}
fn harvest(self) -> <TSegmentCollector as SegmentCollector>::Fruit {
self.segment_collector.harvest()
}
}

View File

@@ -114,6 +114,9 @@ use crate::query::Weight;
mod docset_collector;
pub use self::docset_collector::DocSetCollector;
mod filter_collector_wrapper;
pub use self::filter_collector_wrapper::FilterCollector;
/// `Fruit` is the type for the result of our collection.
/// e.g. `usize` for the `Count` collector.
pub trait Fruit: Send + downcast_rs::Downcast {}

View File

@@ -66,10 +66,6 @@ pub(crate) fn compute_num_bits(n: u64) -> u8 {
}
}
pub(crate) fn is_power_of_2(n: usize) -> bool {
(n > 0) && (n & (n - 1) == 0)
}
/// Has length trait
pub trait HasLen {
/// Return length

View File

@@ -5,6 +5,7 @@ use crate::core::SegmentId;
use crate::core::SegmentMeta;
use crate::core::SegmentMetaInventory;
use crate::core::META_FILEPATH;
use crate::directory::error::OpenReadError;
use crate::directory::ManagedDirectory;
#[cfg(feature = "mmap")]
use crate::directory::MmapDirectory;
@@ -59,7 +60,7 @@ impl Index {
/// Examines the directory to see if it contains an index.
///
/// Effectively, it only checks for the presence of the `meta.json` file.
pub fn exists<Dir: Directory>(dir: &Dir) -> bool {
pub fn exists<Dir: Directory>(dir: &Dir) -> Result<bool, OpenReadError> {
dir.exists(&META_FILEPATH)
}
@@ -106,7 +107,7 @@ impl Index {
schema: Schema,
) -> crate::Result<Index> {
let mmap_directory = MmapDirectory::open(directory_path)?;
if Index::exists(&mmap_directory) {
if Index::exists(&mmap_directory)? {
return Err(TantivyError::IndexAlreadyExists);
}
Index::create(mmap_directory, schema)
@@ -114,7 +115,7 @@ impl Index {
/// Opens or creates a new index in the provided directory
pub fn open_or_create<Dir: Directory>(dir: Dir, schema: Schema) -> crate::Result<Index> {
if !Index::exists(&dir) {
if !Index::exists(&dir)? {
return Index::create(dir, schema);
}
let index = Index::open(dir)?;
@@ -399,7 +400,7 @@ impl fmt::Debug for Index {
#[cfg(test)]
mod tests {
use crate::directory::RAMDirectory;
use crate::directory::{RAMDirectory, WatchCallback};
use crate::schema::Field;
use crate::schema::{Schema, INDEXED, TEXT};
use crate::IndexReader;
@@ -423,24 +424,24 @@ mod tests {
#[test]
fn test_index_exists() {
let directory = RAMDirectory::create();
assert!(!Index::exists(&directory));
assert!(!Index::exists(&directory).unwrap());
assert!(Index::create(directory.clone(), throw_away_schema()).is_ok());
assert!(Index::exists(&directory));
assert!(Index::exists(&directory).unwrap());
}
#[test]
fn open_or_create_should_create() {
let directory = RAMDirectory::create();
assert!(!Index::exists(&directory));
assert!(!Index::exists(&directory).unwrap());
assert!(Index::open_or_create(directory.clone(), throw_away_schema()).is_ok());
assert!(Index::exists(&directory));
assert!(Index::exists(&directory).unwrap());
}
#[test]
fn open_or_create_should_open() {
let directory = RAMDirectory::create();
assert!(Index::create(directory.clone(), throw_away_schema()).is_ok());
assert!(Index::exists(&directory));
assert!(Index::exists(&directory).unwrap());
assert!(Index::open_or_create(directory, throw_away_schema()).is_ok());
}
@@ -448,7 +449,7 @@ mod tests {
fn create_should_wipeoff_existing() {
let directory = RAMDirectory::create();
assert!(Index::create(directory.clone(), throw_away_schema()).is_ok());
assert!(Index::exists(&directory));
assert!(Index::exists(&directory).unwrap());
assert!(Index::create(directory.clone(), Schema::builder().build()).is_ok());
}
@@ -456,7 +457,7 @@ mod tests {
fn open_or_create_exists_but_schema_does_not_match() {
let directory = RAMDirectory::create();
assert!(Index::create(directory.clone(), throw_away_schema()).is_ok());
assert!(Index::exists(&directory));
assert!(Index::exists(&directory).unwrap());
assert!(Index::open_or_create(directory.clone(), throw_away_schema()).is_ok());
let err = Index::open_or_create(directory, Schema::builder().build());
assert_eq!(
@@ -524,7 +525,7 @@ mod tests {
assert_eq!(reader.searcher().num_docs(), 0);
writer.add_document(doc!(field=>1u64));
let (sender, receiver) = crossbeam::channel::unbounded();
let _handle = index.directory_mut().watch(Box::new(move || {
let _handle = index.directory_mut().watch(WatchCallback::new(move || {
let _ = sender.send(());
}));
writer.commit().unwrap();
@@ -554,9 +555,11 @@ mod tests {
fn test_index_on_commit_reload_policy_aux(field: Field, index: &Index, reader: &IndexReader) {
let mut reader_index = reader.index();
let (sender, receiver) = crossbeam::channel::unbounded();
let _watch_handle = reader_index.directory_mut().watch(Box::new(move || {
let _ = sender.send(());
}));
let _watch_handle = reader_index
.directory_mut()
.watch(WatchCallback::new(move || {
let _ = sender.send(());
}));
let mut writer = index.writer_for_tests().unwrap();
assert_eq!(reader.searcher().num_docs(), 0);
writer.add_document(doc!(field=>1u64));
@@ -595,7 +598,7 @@ mod tests {
writer.add_document(doc!(field => i));
}
let (sender, receiver) = crossbeam::channel::unbounded();
let _handle = directory.watch(Box::new(move || {
let _handle = directory.watch(WatchCallback::new(move || {
let _ = sender.send(());
}));
writer.commit().unwrap();

View File

@@ -90,9 +90,9 @@ impl InvertedIndexReader {
term_info: &TermInfo,
block_postings: &mut BlockSegmentPostings,
) -> io::Result<()> {
let postings_slice = self
.postings_file_slice
.slice_from(term_info.postings_offset as usize);
let start_offset = term_info.postings_start_offset as usize;
let stop_offset = term_info.postings_stop_offset as usize;
let postings_slice = self.postings_file_slice.slice(start_offset, stop_offset);
block_postings.reset(term_info.doc_freq, postings_slice.read_bytes()?);
Ok(())
}
@@ -121,8 +121,10 @@ impl InvertedIndexReader {
term_info: &TermInfo,
requested_option: IndexRecordOption,
) -> io::Result<BlockSegmentPostings> {
let offset = term_info.postings_offset as usize;
let postings_data = self.postings_file_slice.slice_from(offset);
let postings_data = self.postings_file_slice.slice(
term_info.postings_start_offset as usize,
term_info.postings_stop_offset as usize,
);
BlockSegmentPostings::open(
term_info.doc_freq,
postings_data,

View File

@@ -12,7 +12,7 @@ pub use self::executor::Executor;
pub use self::index::Index;
pub use self::index_meta::{IndexMeta, SegmentMeta, SegmentMetaInventory};
pub use self::inverted_index_reader::InvertedIndexReader;
pub use self::searcher::Searcher;
pub use self::searcher::{FieldSearcher, Searcher};
pub use self::segment::Segment;
pub use self::segment::SerializableSegment;
pub use self::segment_component::SegmentComponent;

View File

@@ -168,6 +168,7 @@ impl Searcher {
}
}
/// **Experimental API** `FieldSearcher` only gives access to a stream over the terms of a field.
pub struct FieldSearcher {
inv_index_readers: Vec<Arc<InvertedIndexReader>>,
}
@@ -179,7 +180,11 @@ impl FieldSearcher {
/// Returns a Stream over all of the sorted unique terms of
/// for the given field.
pub fn terms(&self) -> TermMerger<'_> {
///
/// This method does not take into account which documents are deleted, so
/// in presence of deletes some terms may not actually exist in any document
/// anymore.
pub fn terms(&self) -> TermMerger {
let term_streamers: Vec<_> = self
.inv_index_readers
.iter()

View File

@@ -131,7 +131,7 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
fn delete(&self, path: &Path) -> Result<(), DeleteError>;
/// Returns true iff the file exists
fn exists(&self, path: &Path) -> bool;
fn exists(&self, path: &Path) -> Result<bool, OpenReadError>;
/// Opens a writer for the *virtual file* associated with
/// a Path.

View File

@@ -0,0 +1,178 @@
use crate::directory::{WatchCallback, WatchCallbackList, WatchHandle};
use crc32fast::Hasher;
use std::fs;
use std::io;
use std::io::BufRead;
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
pub const POLLING_INTERVAL: Duration = Duration::from_millis(if cfg!(test) { 1 } else { 500 });
// Watches a file and executes registered callbacks when the file is modified.
pub struct FileWatcher {
path: Arc<PathBuf>,
callbacks: Arc<WatchCallbackList>,
state: Arc<AtomicUsize>, // 0: new, 1: runnable, 2: terminated
}
impl FileWatcher {
pub fn new(path: &PathBuf) -> FileWatcher {
FileWatcher {
path: Arc::new(path.clone()),
callbacks: Default::default(),
state: Default::default(),
}
}
pub fn spawn(&self) {
if self.state.compare_and_swap(0, 1, Ordering::SeqCst) > 0 {
return;
}
let path = self.path.clone();
let callbacks = self.callbacks.clone();
let state = self.state.clone();
thread::Builder::new()
.name("thread-tantivy-meta-file-watcher".to_string())
.spawn(move || {
let mut current_checksum = None;
while state.load(Ordering::SeqCst) == 1 {
if let Ok(checksum) = FileWatcher::compute_checksum(&path) {
// `None.unwrap_or_else(|| !checksum) != checksum` evaluates to `true`
if current_checksum.unwrap_or_else(|| !checksum) != checksum {
info!("Meta file {:?} was modified", path);
current_checksum = Some(checksum);
futures::executor::block_on(callbacks.broadcast());
}
}
thread::sleep(POLLING_INTERVAL);
}
})
.expect("Failed to spawn meta file watcher thread");
}
pub fn watch(&self, callback: WatchCallback) -> WatchHandle {
let handle = self.callbacks.subscribe(callback);
self.spawn();
handle
}
fn compute_checksum(path: &PathBuf) -> Result<u32, io::Error> {
let reader = match fs::File::open(path) {
Ok(f) => io::BufReader::new(f),
Err(e) => {
warn!("Failed to open meta file {:?}: {:?}", path, e);
return Err(e);
}
};
let mut hasher = Hasher::new();
for line in reader.lines() {
hasher.update(line?.as_bytes())
}
Ok(hasher.finalize())
}
}
impl Drop for FileWatcher {
fn drop(&mut self) {
self.state.store(2, Ordering::SeqCst);
}
}
#[cfg(test)]
mod tests {
use std::mem;
use crate::directory::mmap_directory::atomic_write;
use super::*;
#[test]
fn test_file_watcher_drop_watcher() -> crate::Result<()> {
let tmp_dir = tempfile::TempDir::new()?;
let tmp_file = tmp_dir.path().join("watched.txt");
let counter: Arc<AtomicUsize> = Default::default();
let (tx, rx) = crossbeam::channel::unbounded();
let timeout = Duration::from_millis(100);
let watcher = FileWatcher::new(&tmp_file);
let state = watcher.state.clone();
assert_eq!(state.load(Ordering::SeqCst), 0);
let counter_clone = counter.clone();
let _handle = watcher.watch(WatchCallback::new(move || {
let val = counter_clone.fetch_add(1, Ordering::SeqCst);
tx.send(val + 1).unwrap();
}));
assert_eq!(counter.load(Ordering::SeqCst), 0);
assert_eq!(state.load(Ordering::SeqCst), 1);
atomic_write(&tmp_file, b"foo")?;
assert_eq!(rx.recv_timeout(timeout), Ok(1));
atomic_write(&tmp_file, b"foo")?;
assert!(rx.recv_timeout(timeout).is_err());
atomic_write(&tmp_file, b"bar")?;
assert_eq!(rx.recv_timeout(timeout), Ok(2));
mem::drop(watcher);
atomic_write(&tmp_file, b"qux")?;
thread::sleep(Duration::from_millis(10));
assert_eq!(counter.load(Ordering::SeqCst), 2);
assert_eq!(state.load(Ordering::SeqCst), 2);
Ok(())
}
#[test]
fn test_file_watcher_drop_handle() -> crate::Result<()> {
let tmp_dir = tempfile::TempDir::new()?;
let tmp_file = tmp_dir.path().join("watched.txt");
let counter: Arc<AtomicUsize> = Default::default();
let (tx, rx) = crossbeam::channel::unbounded();
let timeout = Duration::from_millis(100);
let watcher = FileWatcher::new(&tmp_file);
let state = watcher.state.clone();
assert_eq!(state.load(Ordering::SeqCst), 0);
let counter_clone = counter.clone();
let handle = watcher.watch(WatchCallback::new(move || {
let val = counter_clone.fetch_add(1, Ordering::SeqCst);
tx.send(val + 1).unwrap();
}));
assert_eq!(counter.load(Ordering::SeqCst), 0);
assert_eq!(state.load(Ordering::SeqCst), 1);
atomic_write(&tmp_file, b"foo")?;
assert_eq!(rx.recv_timeout(timeout), Ok(1));
mem::drop(handle);
atomic_write(&tmp_file, b"qux")?;
assert_eq!(counter.load(Ordering::SeqCst), 1);
assert_eq!(state.load(Ordering::SeqCst), 1);
Ok(())
}
}

View File

@@ -307,7 +307,7 @@ impl Directory for ManagedDirectory {
self.directory.delete(path)
}
fn exists(&self, path: &Path) -> bool {
fn exists(&self, path: &Path) -> Result<bool, OpenReadError> {
self.directory.exists(path)
}
@@ -355,22 +355,22 @@ mod tests_mmap_specific {
managed_directory
.atomic_write(test_path2, &[0u8, 1u8])
.unwrap();
assert!(managed_directory.exists(test_path1));
assert!(managed_directory.exists(test_path2));
assert!(managed_directory.exists(test_path1).unwrap());
assert!(managed_directory.exists(test_path2).unwrap());
let living_files: HashSet<PathBuf> = [test_path1.to_owned()].iter().cloned().collect();
assert!(managed_directory.garbage_collect(|| living_files).is_ok());
assert!(managed_directory.exists(test_path1));
assert!(!managed_directory.exists(test_path2));
assert!(managed_directory.exists(test_path1).unwrap());
assert!(!managed_directory.exists(test_path2).unwrap());
}
{
let mmap_directory = MmapDirectory::open(&tempdir_path).unwrap();
let mut managed_directory = ManagedDirectory::wrap(mmap_directory).unwrap();
assert!(managed_directory.exists(test_path1));
assert!(!managed_directory.exists(test_path2));
assert!(managed_directory.exists(test_path1).unwrap());
assert!(!managed_directory.exists(test_path2).unwrap());
let living_files: HashSet<PathBuf> = HashSet::new();
assert!(managed_directory.garbage_collect(|| living_files).is_ok());
assert!(!managed_directory.exists(test_path1));
assert!(!managed_directory.exists(test_path2));
assert!(!managed_directory.exists(test_path1).unwrap());
assert!(!managed_directory.exists(test_path2).unwrap());
}
}
@@ -387,7 +387,7 @@ mod tests_mmap_specific {
let mut write = managed_directory.open_write(test_path1).unwrap();
write.write_all(&[0u8, 1u8]).unwrap();
write.terminate().unwrap();
assert!(managed_directory.exists(test_path1));
assert!(managed_directory.exists(test_path1).unwrap());
let _mmap_read = managed_directory.open_read(test_path1).unwrap();
assert!(managed_directory
@@ -395,15 +395,15 @@ mod tests_mmap_specific {
.is_ok());
if cfg!(target_os = "windows") {
// On Windows, gc should try and fail the file as it is mmapped.
assert!(managed_directory.exists(test_path1));
assert!(managed_directory.exists(test_path1).unwrap());
// unmap should happen here.
drop(_mmap_read);
// The file should still be in the list of managed file and
// eventually be deleted once mmap is released.
assert!(managed_directory.garbage_collect(|| living_files).is_ok());
assert!(!managed_directory.exists(test_path1));
assert!(!managed_directory.exists(test_path1).unwrap());
} else {
assert!(!managed_directory.exists(test_path1));
assert!(!managed_directory.exists(test_path1).unwrap());
}
}

View File

@@ -1,6 +1,7 @@
use crate::core::META_FILEPATH;
use crate::directory::error::LockError;
use crate::directory::error::{DeleteError, OpenDirectoryError, OpenReadError, OpenWriteError};
use crate::directory::file_watcher::FileWatcher;
use crate::directory::AntiCallToken;
use crate::directory::BoxedData;
use crate::directory::Directory;
@@ -8,14 +9,10 @@ use crate::directory::DirectoryLock;
use crate::directory::FileSlice;
use crate::directory::Lock;
use crate::directory::WatchCallback;
use crate::directory::WatchCallbackList;
use crate::directory::WatchHandle;
use crate::directory::{TerminatingWrite, WritePtr};
use fs2::FileExt;
use memmap::Mmap;
use notify::RawEvent;
use notify::RecursiveMode;
use notify::Watcher;
use serde::{Deserialize, Serialize};
use stable_deref_trait::StableDeref;
use std::convert::From;
@@ -26,12 +23,9 @@ use std::io::{self, Seek, SeekFrom};
use std::io::{BufWriter, Read, Write};
use std::path::{Path, PathBuf};
use std::result;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::RwLock;
use std::sync::Weak;
use std::thread;
use std::{collections::HashMap, ops::Deref};
use tempfile::TempDir;
@@ -137,67 +131,6 @@ impl MmapCache {
}
}
struct WatcherWrapper {
_watcher: Mutex<notify::RecommendedWatcher>,
watcher_router: Arc<WatchCallbackList>,
}
impl WatcherWrapper {
pub fn new(path: &Path) -> Result<Self, OpenDirectoryError> {
let (tx, watcher_recv): (Sender<RawEvent>, Receiver<RawEvent>) = channel();
// We need to initialize the
let watcher = notify::raw_watcher(tx)
.and_then(|mut watcher| {
watcher.watch(path, RecursiveMode::Recursive)?;
Ok(watcher)
})
.map_err(|err| match err {
notify::Error::PathNotFound => OpenDirectoryError::DoesNotExist(path.to_owned()),
_ => {
panic!("Unknown error while starting watching directory {:?}", path);
}
})?;
let watcher_router: Arc<WatchCallbackList> = Default::default();
let watcher_router_clone = watcher_router.clone();
thread::Builder::new()
.name("meta-file-watch-thread".to_string())
.spawn(move || {
loop {
match watcher_recv.recv().map(|evt| evt.path) {
Ok(Some(changed_path)) => {
// ... Actually subject to false positive.
// We might want to be more accurate than this at one point.
if let Some(filename) = changed_path.file_name() {
if filename == *META_FILEPATH {
let _ = watcher_router_clone.broadcast();
}
}
}
Ok(None) => {
// not an event we are interested in.
}
Err(_e) => {
// the watch send channel was dropped
break;
}
}
}
})
.map_err(|io_error| OpenDirectoryError::IoError {
io_error,
directory_path: path.to_path_buf(),
})?;
Ok(WatcherWrapper {
_watcher: Mutex::new(watcher),
watcher_router,
})
}
pub fn watch(&mut self, watch_callback: WatchCallback) -> WatchHandle {
self.watcher_router.subscribe(watch_callback)
}
}
/// Directory storing data in files, read via mmap.
///
/// The Mmap object are cached to limit the
@@ -219,40 +152,21 @@ struct MmapDirectoryInner {
root_path: PathBuf,
mmap_cache: RwLock<MmapCache>,
_temp_directory: Option<TempDir>,
watcher: RwLock<Option<WatcherWrapper>>,
watcher: FileWatcher,
}
impl MmapDirectoryInner {
fn new(root_path: PathBuf, temp_directory: Option<TempDir>) -> MmapDirectoryInner {
MmapDirectoryInner {
root_path,
mmap_cache: Default::default(),
_temp_directory: temp_directory,
watcher: RwLock::new(None),
watcher: FileWatcher::new(&root_path.join(*META_FILEPATH)),
root_path: root_path,
}
}
fn watch(&self, watch_callback: WatchCallback) -> crate::Result<WatchHandle> {
// a lot of juggling here, to ensure we don't do anything that panics
// while the rwlock is held. That way we ensure that the rwlock cannot
// be poisoned.
//
// The downside is that we might create a watch wrapper that is not useful.
let need_initialization = self.watcher.read().unwrap().is_none();
if need_initialization {
let watch_wrapper = WatcherWrapper::new(&self.root_path)?;
let mut watch_wlock = self.watcher.write().unwrap();
// the watcher could have been initialized when we released the lock, and
// we do not want to lose the watched files that were set.
if watch_wlock.is_none() {
*watch_wlock = Some(watch_wrapper);
}
}
if let Some(watch_wrapper) = self.watcher.write().unwrap().as_mut() {
Ok(watch_wrapper.watch(watch_callback))
} else {
unreachable!("At this point, watch wrapper is supposed to be initialized");
}
fn watch(&self, callback: WatchCallback) -> crate::Result<WatchHandle> {
Ok(self.watcher.watch(callback))
}
}
@@ -413,6 +327,24 @@ impl Deref for MmapArc {
}
unsafe impl StableDeref for MmapArc {}
/// Writes a file in an atomic manner.
pub(crate) fn atomic_write(path: &Path, content: &[u8]) -> io::Result<()> {
// We create the temporary file in the same directory as the target file.
// Indeed the canonical temp directory and the target file might sit in different
// filesystem, in which case the atomic write may actually not work.
let parent_path = path.parent().ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"Path {:?} does not have parent directory.",
)
})?;
let mut tempfile = tempfile::Builder::new().tempfile_in(&parent_path)?;
tempfile.write_all(content)?;
tempfile.flush()?;
tempfile.into_temp_path().persist(path)?;
Ok(())
}
impl Directory for MmapDirectory {
fn open_read(&self, path: &Path) -> result::Result<FileSlice, OpenReadError> {
debug!("Open Read {:?}", path);
@@ -456,9 +388,9 @@ impl Directory for MmapDirectory {
}
}
fn exists(&self, path: &Path) -> bool {
fn exists(&self, path: &Path) -> Result<bool, OpenReadError> {
let full_path = self.resolve_path(path);
full_path.exists()
Ok(full_path.exists())
}
fn open_write(&self, path: &Path) -> Result<WritePtr, OpenWriteError> {
@@ -513,12 +445,8 @@ impl Directory for MmapDirectory {
fn atomic_write(&self, path: &Path, content: &[u8]) -> io::Result<()> {
debug!("Atomic Write {:?}", path);
let mut tempfile = tempfile::Builder::new().tempfile_in(&self.inner.root_path)?;
tempfile.write_all(content)?;
tempfile.flush()?;
let full_path = self.resolve_path(path);
tempfile.into_temp_path().persist(full_path)?;
Ok(())
atomic_write(&full_path, content)
}
fn acquire_lock(&self, lock: &Lock) -> Result<DirectoryLock, LockError> {
@@ -557,8 +485,6 @@ mod tests {
use crate::Index;
use crate::ReloadPolicy;
use crate::{common::HasLen, indexer::LogMergePolicy};
use std::fs;
use std::sync::atomic::{AtomicUsize, Ordering};
#[test]
fn test_open_non_existent_path() {
@@ -647,27 +573,6 @@ mod tests {
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 0);
}
#[test]
fn test_watch_wrapper() {
let counter: Arc<AtomicUsize> = Default::default();
let counter_clone = counter.clone();
let tmp_dir = tempfile::TempDir::new().unwrap();
let tmp_dirpath = tmp_dir.path().to_owned();
let mut watch_wrapper = WatcherWrapper::new(&tmp_dirpath).unwrap();
let tmp_file = tmp_dirpath.join(*META_FILEPATH);
let _handle = watch_wrapper.watch(Box::new(move || {
counter_clone.fetch_add(1, Ordering::SeqCst);
}));
let (sender, receiver) = crossbeam::channel::unbounded();
let _handle2 = watch_wrapper.watch(Box::new(move || {
let _ = sender.send(());
}));
assert_eq!(counter.load(Ordering::SeqCst), 0);
fs::write(&tmp_file, b"whateverwilldo").unwrap();
assert!(receiver.recv().is_ok());
assert!(counter.load(Ordering::SeqCst) >= 1);
}
#[test]
fn test_mmap_released() {
let mmap_directory = MmapDirectory::create_from_tempdir().unwrap();

View File

@@ -10,6 +10,7 @@ mod mmap_directory;
mod directory;
mod directory_lock;
mod file_slice;
mod file_watcher;
mod footer;
mod managed_directory;
mod owned_bytes;

View File

@@ -177,8 +177,15 @@ impl Directory for RAMDirectory {
self.fs.write().unwrap().delete(path)
}
fn exists(&self, path: &Path) -> bool {
self.fs.read().unwrap().exists(path)
fn exists(&self, path: &Path) -> Result<bool, OpenReadError> {
Ok(self
.fs
.read()
.map_err(|e| OpenReadError::IOError {
io_error: io::Error::new(io::ErrorKind::Other, e.to_string()),
filepath: path.to_path_buf(),
})?
.exists(path))
}
fn open_write(&self, path: &Path) -> Result<WritePtr, OpenWriteError> {

View File

@@ -130,7 +130,7 @@ fn ram_directory_panics_if_flush_forgotten() {
fn test_simple(directory: &dyn Directory) -> crate::Result<()> {
let test_path: &'static Path = Path::new("some_path_for_test");
let mut write_file = directory.open_write(test_path)?;
assert!(directory.exists(test_path));
assert!(directory.exists(test_path).unwrap());
write_file.write_all(&[4])?;
write_file.write_all(&[3])?;
write_file.write_all(&[7, 3, 5])?;
@@ -139,14 +139,14 @@ fn test_simple(directory: &dyn Directory) -> crate::Result<()> {
assert_eq!(read_file.as_slice(), &[4u8, 3u8, 7u8, 3u8, 5u8]);
mem::drop(read_file);
assert!(directory.delete(test_path).is_ok());
assert!(!directory.exists(test_path));
assert!(!directory.exists(test_path).unwrap());
Ok(())
}
fn test_rewrite_forbidden(directory: &dyn Directory) -> crate::Result<()> {
let test_path: &'static Path = Path::new("some_path_for_test");
directory.open_write(test_path)?;
assert!(directory.exists(test_path));
assert!(directory.exists(test_path).unwrap());
assert!(directory.open_write(test_path).is_err());
assert!(directory.delete(test_path).is_ok());
Ok(())
@@ -157,7 +157,7 @@ fn test_write_create_the_file(directory: &dyn Directory) {
{
assert!(directory.open_read(test_path).is_err());
let _w = directory.open_write(test_path).unwrap();
assert!(directory.exists(test_path));
assert!(directory.exists(test_path).unwrap());
assert!(directory.open_read(test_path).is_ok());
assert!(directory.delete(test_path).is_ok());
}
@@ -190,38 +190,33 @@ fn test_directory_delete(directory: &dyn Directory) -> crate::Result<()> {
}
fn test_watch(directory: &dyn Directory) {
let num_progress: Arc<AtomicUsize> = Default::default();
let counter: Arc<AtomicUsize> = Default::default();
let counter_clone = counter.clone();
let (sender, receiver) = crossbeam::channel::unbounded();
let watch_callback = Box::new(move || {
counter_clone.fetch_add(1, SeqCst);
});
// This callback is used to synchronize watching in our unit test.
// We bind it to a variable because the callback is removed when that
// handle is dropped.
let watch_handle = directory.watch(watch_callback).unwrap();
let _progress_listener = directory
.watch(Box::new(move || {
let val = num_progress.fetch_add(1, SeqCst);
let _ = sender.send(val);
let (tx, rx) = crossbeam::channel::unbounded();
let timeout = Duration::from_millis(500);
let handle = directory
.watch(WatchCallback::new(move || {
let val = counter.fetch_add(1, SeqCst);
tx.send(val + 1).unwrap();
}))
.unwrap();
for i in 0..10 {
assert!(i <= counter.load(SeqCst));
assert!(directory
.atomic_write(Path::new("meta.json"), b"random_test_data_2")
.is_ok());
assert_eq!(receiver.recv_timeout(Duration::from_millis(500)), Ok(i));
assert!(i + 1 <= counter.load(SeqCst)); // notify can trigger more than once.
}
mem::drop(watch_handle);
assert!(directory
.atomic_write(Path::new("meta.json"), b"random_test_data")
.atomic_write(Path::new("meta.json"), b"foo")
.is_ok());
assert!(receiver.recv_timeout(Duration::from_millis(500)).is_ok());
assert!(10 <= counter.load(SeqCst));
assert_eq!(rx.recv_timeout(timeout), Ok(1));
assert!(directory
.atomic_write(Path::new("meta.json"), b"bar")
.is_ok());
assert_eq!(rx.recv_timeout(timeout), Ok(2));
mem::drop(handle);
assert!(directory
.atomic_write(Path::new("meta.json"), b"qux")
.is_ok());
assert!(rx.recv_timeout(timeout).is_err());
}
fn test_lock_non_blocking(directory: &dyn Directory) {

View File

@@ -4,8 +4,20 @@ use std::sync::Arc;
use std::sync::RwLock;
use std::sync::Weak;
/// Type alias for callbacks registered when watching files of a `Directory`.
pub type WatchCallback = Box<dyn Fn() + Sync + Send>;
/// Cloneable wrapper for callbacks registered when watching files of a `Directory`.
#[derive(Clone)]
pub struct WatchCallback(Arc<Box<dyn Fn() + Sync + Send>>);
impl WatchCallback {
/// Wraps a `Fn()` to create a WatchCallback.
pub fn new<F: Fn() + Sync + Send + 'static>(op: F) -> Self {
WatchCallback(Arc::new(Box::new(op)))
}
fn call(&self) {
self.0()
}
}
/// Helper struct to implement the watch method in `Directory` implementations.
///
@@ -34,7 +46,7 @@ impl WatchHandle {
///
/// This function is only useful when implementing a readonly directory.
pub fn empty() -> WatchHandle {
WatchHandle::new(Arc::new(Box::new(|| {})))
WatchHandle::new(Arc::new(WatchCallback::new(|| {})))
}
}
@@ -47,13 +59,13 @@ impl WatchCallbackList {
WatchHandle::new(watch_callback_arc)
}
fn list_callback(&self) -> Vec<Arc<WatchCallback>> {
let mut callbacks = vec![];
fn list_callback(&self) -> Vec<WatchCallback> {
let mut callbacks: Vec<WatchCallback> = vec![];
let mut router_wlock = self.router.write().unwrap();
let mut i = 0;
while i < router_wlock.len() {
if let Some(watch) = router_wlock[i].upgrade() {
callbacks.push(watch);
callbacks.push(watch.as_ref().clone());
i += 1;
} else {
router_wlock.swap_remove(i);
@@ -75,7 +87,7 @@ impl WatchCallbackList {
.name("watch-callbacks".to_string())
.spawn(move || {
for callback in callbacks {
callback();
callback.call();
}
let _ = sender.send(());
});
@@ -91,7 +103,7 @@ impl WatchCallbackList {
#[cfg(test)]
mod tests {
use crate::directory::WatchCallbackList;
use crate::directory::{WatchCallback, WatchCallbackList};
use futures::executor::block_on;
use std::mem;
use std::sync::atomic::{AtomicUsize, Ordering};
@@ -102,7 +114,7 @@ mod tests {
let watch_event_router = WatchCallbackList::default();
let counter: Arc<AtomicUsize> = Default::default();
let counter_clone = counter.clone();
let inc_callback = Box::new(move || {
let inc_callback = WatchCallback::new(move || {
counter_clone.fetch_add(1, Ordering::SeqCst);
});
block_on(watch_event_router.broadcast());
@@ -130,7 +142,7 @@ mod tests {
let counter: Arc<AtomicUsize> = Default::default();
let inc_callback = |inc: usize| {
let counter_clone = counter.clone();
Box::new(move || {
WatchCallback::new(move || {
counter_clone.fetch_add(inc, Ordering::SeqCst);
})
};
@@ -158,7 +170,7 @@ mod tests {
let watch_event_router = WatchCallbackList::default();
let counter: Arc<AtomicUsize> = Default::default();
let counter_clone = counter.clone();
let inc_callback = Box::new(move || {
let inc_callback = WatchCallback::new(move || {
counter_clone.fetch_add(1, Ordering::SeqCst);
});
let handle_a = watch_event_router.subscribe(inc_callback);

View File

@@ -98,10 +98,9 @@ mod tests {
let field = searcher.schema().get_field("string_bytes").unwrap();
let term = Term::from_field_bytes(field, b"lucene".as_ref());
let term_query = TermQuery::new(term, IndexRecordOption::Basic);
let term_weight = term_query.specialized_weight(&searcher, false)?;
let term_scorer_err = term_weight.specialized_scorer(searcher.segment_reader(0), 1.0);
let term_weight_err = term_query.specialized_weight(&searcher, false);
assert!(matches!(
term_scorer_err,
term_weight_err,
Err(crate::TantivyError::SchemaError(_))
));
Ok(())

View File

@@ -66,10 +66,21 @@ pub struct FieldNormReader {
}
impl FieldNormReader {
/// Creates a `FieldNormReader` with a constant fieldnorm.
pub fn constant(num_docs: u32, fieldnorm: u32) -> FieldNormReader {
let fieldnorm_id = fieldnorm_to_id(fieldnorm);
let field_norms_data = OwnedBytes::new(vec![fieldnorm_id; num_docs as usize]);
FieldNormReader::new(field_norms_data)
}
/// Opens a field norm reader given its file.
pub fn open(fieldnorm_file: FileSlice) -> crate::Result<Self> {
let data = fieldnorm_file.read_bytes()?;
Ok(FieldNormReader { data })
Ok(FieldNormReader::new(data))
}
fn new(data: OwnedBytes) -> Self {
FieldNormReader { data }
}
/// Returns the number of documents in this segment.

View File

@@ -160,7 +160,7 @@ pub use self::docset::{DocSet, TERMINATED};
pub use crate::common::HasLen;
pub use crate::common::{f64_to_u64, i64_to_u64, u64_to_f64, u64_to_i64};
pub use crate::core::{Executor, SegmentComponent};
pub use crate::core::{Index, IndexMeta, Searcher, Segment, SegmentId, SegmentMeta};
pub use crate::core::{FieldSearcher, Index, IndexMeta, Searcher, Segment, SegmentId, SegmentMeta};
pub use crate::core::{InvertedIndexReader, SegmentReader};
pub use crate::directory::Directory;
pub use crate::indexer::operation::UserOperation;

View File

@@ -15,18 +15,14 @@ mod stacker;
mod term_info;
pub(crate) use self::block_search::BlockSearcher;
pub(crate) use self::postings_writer::MultiFieldPostingsWriter;
pub use self::serializer::{FieldSerializer, InvertedIndexSerializer};
pub use self::postings::Postings;
pub(crate) use self::skip::{BlockInfo, SkipReader};
pub use self::term_info::TermInfo;
pub use self::block_segment_postings::BlockSegmentPostings;
pub use self::postings::Postings;
pub(crate) use self::postings_writer::MultiFieldPostingsWriter;
pub use self::segment_postings::SegmentPostings;
pub use self::serializer::{FieldSerializer, InvertedIndexSerializer};
pub(crate) use self::skip::{BlockInfo, SkipReader};
pub(crate) use self::stacker::compute_table_size;
pub use self::term_info::TermInfo;
pub(crate) type UnorderedTermId = u64;

View File

@@ -177,14 +177,16 @@ impl<'a> FieldSerializer<'a> {
}
fn current_term_info(&self) -> TermInfo {
let positions_idx = self
.positions_serializer_opt
.as_ref()
.map(PositionSerializer::positions_idx)
.unwrap_or(0u64);
let positions_idx =
if let Some(positions_serializer) = self.positions_serializer_opt.as_ref() {
positions_serializer.positions_idx()
} else {
0u64
};
TermInfo {
doc_freq: 0,
postings_offset: self.postings_serializer.addr(),
postings_start_offset: self.postings_serializer.addr(),
postings_stop_offset: 0u64,
positions_idx,
}
}
@@ -238,10 +240,11 @@ impl<'a> FieldSerializer<'a> {
/// using `VInt` encoding.
pub fn close_term(&mut self) -> io::Result<()> {
if self.term_open {
self.term_dictionary_builder
.insert_value(&self.current_term_info)?;
self.postings_serializer
.close_term(self.current_term_info.doc_freq)?;
self.current_term_info.postings_stop_offset = self.postings_serializer.addr();
self.term_dictionary_builder
.insert_value(&self.current_term_info)?;
self.term_open = false;
}
Ok(())

View File

@@ -7,35 +7,50 @@ use std::io;
pub struct TermInfo {
/// Number of documents in the segment containing the term
pub doc_freq: u32,
/// Start offset within the postings (`.idx`) file.
pub postings_offset: u64,
/// Start offset of the posting list within the postings (`.idx`) file.
pub postings_start_offset: u64,
/// Stop offset of the posting list within the postings (`.idx`) file.
/// The byte range is `[start_offset..stop_offset)`.
pub postings_stop_offset: u64,
/// Start offset of the first block within the position (`.pos`) file.
pub positions_idx: u64,
}
impl TermInfo {
pub(crate) fn posting_num_bytes(&self) -> u32 {
let num_bytes = self.postings_stop_offset - self.postings_start_offset;
assert!(num_bytes <= std::u32::MAX as u64);
num_bytes as u32
}
}
impl FixedSize for TermInfo {
/// Size required for the binary serialization of a `TermInfo` object.
/// This is large, but in practise, `TermInfo` are encoded in blocks and
/// only the first `TermInfo` of a block is serialized uncompressed.
/// The subsequent `TermInfo` are delta encoded and bitpacked.
const SIZE_IN_BYTES: usize = u32::SIZE_IN_BYTES + 2 * u64::SIZE_IN_BYTES;
const SIZE_IN_BYTES: usize = 2 * u32::SIZE_IN_BYTES + 2 * u64::SIZE_IN_BYTES;
}
impl BinarySerializable for TermInfo {
fn serialize<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
self.doc_freq.serialize(writer)?;
self.postings_offset.serialize(writer)?;
self.postings_start_offset.serialize(writer)?;
self.posting_num_bytes().serialize(writer)?;
self.positions_idx.serialize(writer)?;
Ok(())
}
fn deserialize<R: io::Read>(reader: &mut R) -> io::Result<Self> {
let doc_freq = u32::deserialize(reader)?;
let postings_offset = u64::deserialize(reader)?;
let postings_start_offset = u64::deserialize(reader)?;
let postings_num_bytes = u32::deserialize(reader)?;
let postings_stop_offset = postings_start_offset + u64::from(postings_num_bytes);
let positions_idx = u64::deserialize(reader)?;
Ok(TermInfo {
doc_freq,
postings_offset,
postings_start_offset,
postings_stop_offset,
positions_idx,
})
}

View File

@@ -268,7 +268,7 @@ mod tests {
}
fn nearly_equals(left: Score, right: Score) -> bool {
(left - right).abs() < 0.00001 * (left + right).abs()
(left - right).abs() < 0.0001 * (left + right).abs()
}
fn compute_checkpoints_for_each_pruning(
@@ -531,7 +531,9 @@ mod tests {
proptest! {
#![proptest_config(ProptestConfig::with_cases(500))]
#[ignore]
#[test]
#[ignore]
fn test_block_wand_three_term_scorers((posting_lists, fieldnorms) in gen_term_scorers(3)) {
test_block_wand_aux(&posting_lists[..], &fieldnorms[..]);
}

View File

@@ -93,6 +93,13 @@ impl TermQuery {
scoring_enabled: bool,
) -> crate::Result<TermWeight> {
let term = self.term.clone();
let field_entry = searcher.schema().get_field_entry(term.field());
if !field_entry.is_indexed() {
return Err(crate::TantivyError::SchemaError(format!(
"Field {:?} is not indexed",
field_entry.name()
)));
}
let bm25_weight = BM25Weight::for_terms(searcher, &[term])?;
let index_record_option = if scoring_enabled {
self.index_record_option
@@ -103,6 +110,7 @@ impl TermQuery {
self.term.clone(),
index_record_option,
bm25_weight,
scoring_enabled,
))
}
}

View File

@@ -1,6 +1,7 @@
use super::term_scorer::TermScorer;
use crate::core::SegmentReader;
use crate::docset::DocSet;
use crate::fieldnorm::FieldNormReader;
use crate::postings::SegmentPostings;
use crate::query::bm25::BM25Weight;
use crate::query::explanation::does_not_match;
@@ -15,6 +16,7 @@ pub struct TermWeight {
term: Term,
index_record_option: IndexRecordOption,
similarity_weight: BM25Weight,
scoring_enabled: bool,
}
impl Weight for TermWeight {
@@ -87,11 +89,13 @@ impl TermWeight {
term: Term,
index_record_option: IndexRecordOption,
similarity_weight: BM25Weight,
scoring_enabled: bool,
) -> TermWeight {
TermWeight {
term,
index_record_option,
similarity_weight,
scoring_enabled,
}
}
@@ -102,7 +106,11 @@ impl TermWeight {
) -> crate::Result<TermScorer> {
let field = self.term.field();
let inverted_index = reader.inverted_index(field)?;
let fieldnorm_reader = reader.get_fieldnorms_reader(field)?;
let fieldnorm_reader = if self.scoring_enabled {
reader.get_fieldnorms_reader(field)?
} else {
FieldNormReader::constant(reader.max_doc(), 1)
};
let similarity_weight = self.similarity_weight.boost_by(boost);
let postings_opt: Option<SegmentPostings> =
inverted_index.read_postings(&self.term, self.index_record_option)?;

View File

@@ -3,9 +3,9 @@ mod pool;
pub use self::pool::LeasedItem;
use self::pool::Pool;
use crate::core::Segment;
use crate::directory::Directory;
use crate::directory::WatchHandle;
use crate::directory::META_LOCK;
use crate::directory::{Directory, WatchCallback};
use crate::Index;
use crate::Searcher;
use crate::SegmentReader;
@@ -88,7 +88,7 @@ impl IndexReaderBuilder {
let watch_handle = inner_reader_arc
.index
.directory()
.watch(Box::new(callback))?;
.watch(WatchCallback::new(callback))?;
watch_handle_opt = Some(watch_handle);
}
}

165
src/store/index/block.rs Normal file
View File

@@ -0,0 +1,165 @@
use crate::common::VInt;
use crate::store::index::{Checkpoint, CHECKPOINT_PERIOD};
use crate::DocId;
use std::io;
/// Represents a block of checkpoints.
///
/// The DocStore index checkpoints are organized into block
/// for code-readability and compression purpose.
///
/// A block can be of any size.
pub struct CheckpointBlock {
pub checkpoints: Vec<Checkpoint>,
}
impl Default for CheckpointBlock {
fn default() -> CheckpointBlock {
CheckpointBlock {
checkpoints: Vec::with_capacity(2 * CHECKPOINT_PERIOD),
}
}
}
impl CheckpointBlock {
/// If non-empty returns [start_doc, end_doc)
/// for the overall block.
pub fn doc_interval(&self) -> Option<(DocId, DocId)> {
let start_doc_opt = self
.checkpoints
.first()
.cloned()
.map(|checkpoint| checkpoint.start_doc);
let end_doc_opt = self
.checkpoints
.last()
.cloned()
.map(|checkpoint| checkpoint.end_doc);
match (start_doc_opt, end_doc_opt) {
(Some(start_doc), Some(end_doc)) => Some((start_doc, end_doc)),
_ => None,
}
}
/// Adding another checkpoint in the block.
pub fn push(&mut self, checkpoint: Checkpoint) {
self.checkpoints.push(checkpoint);
}
/// Returns the number of checkpoints in the block.
pub fn len(&self) -> usize {
self.checkpoints.len()
}
pub fn get(&self, idx: usize) -> Checkpoint {
self.checkpoints[idx]
}
pub fn clear(&mut self) {
self.checkpoints.clear();
}
pub fn serialize(&mut self, buffer: &mut Vec<u8>) {
VInt(self.checkpoints.len() as u64).serialize_into_vec(buffer);
if self.checkpoints.is_empty() {
return;
}
VInt(self.checkpoints[0].start_doc as u64).serialize_into_vec(buffer);
VInt(self.checkpoints[0].start_offset as u64).serialize_into_vec(buffer);
for checkpoint in &self.checkpoints {
let delta_doc = checkpoint.end_doc - checkpoint.start_doc;
VInt(delta_doc as u64).serialize_into_vec(buffer);
VInt(checkpoint.end_offset - checkpoint.start_offset).serialize_into_vec(buffer);
}
}
pub fn deserialize(&mut self, data: &mut &[u8]) -> io::Result<()> {
if data.is_empty() {
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, ""));
}
self.checkpoints.clear();
let len = VInt::deserialize_u64(data)? as usize;
if len == 0 {
return Ok(());
}
let mut doc = VInt::deserialize_u64(data)? as DocId;
let mut start_offset = VInt::deserialize_u64(data)?;
for _ in 0..len {
let num_docs = VInt::deserialize_u64(data)? as DocId;
let block_num_bytes = VInt::deserialize_u64(data)?;
self.checkpoints.push(Checkpoint {
start_doc: doc,
end_doc: doc + num_docs,
start_offset,
end_offset: start_offset + block_num_bytes,
});
doc += num_docs;
start_offset += block_num_bytes;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use crate::store::index::block::CheckpointBlock;
use crate::store::index::Checkpoint;
use crate::DocId;
use std::io;
fn test_aux_ser_deser(checkpoints: &[Checkpoint]) -> io::Result<()> {
let mut block = CheckpointBlock::default();
for &checkpoint in checkpoints {
block.push(checkpoint);
}
let mut buffer = Vec::new();
block.serialize(&mut buffer);
let mut block_deser = CheckpointBlock::default();
let checkpoint = Checkpoint {
start_doc: 0,
end_doc: 1,
start_offset: 2,
end_offset: 3,
};
block_deser.push(checkpoint); // < check that value is erased before deser
let mut data = &buffer[..];
block_deser.deserialize(&mut data)?;
assert!(data.is_empty());
assert_eq!(checkpoints, &block_deser.checkpoints[..]);
Ok(())
}
#[test]
fn test_block_serialize_empty() -> io::Result<()> {
test_aux_ser_deser(&[])
}
#[test]
fn test_block_serialize_simple() -> io::Result<()> {
let checkpoints = vec![Checkpoint {
start_doc: 10,
end_doc: 12,
start_offset: 100,
end_offset: 120,
}];
test_aux_ser_deser(&checkpoints)
}
#[test]
fn test_block_serialize() -> io::Result<()> {
let offsets: Vec<u64> = (0..11).map(|i| i * i * i).collect();
let mut checkpoints = vec![];
let mut start_doc = 0;
for i in 0..10 {
let end_doc = (i * i) as DocId;
checkpoints.push(Checkpoint {
start_doc,
end_doc,
start_offset: offsets[i],
end_offset: offsets[i + 1],
});
start_doc = end_doc;
}
test_aux_ser_deser(&checkpoints)
}
}

230
src/store/index/mod.rs Normal file
View File

@@ -0,0 +1,230 @@
const CHECKPOINT_PERIOD: usize = 8;
use std::fmt;
mod block;
mod skip_index;
mod skip_index_builder;
use crate::DocId;
pub use self::skip_index::SkipIndex;
pub use self::skip_index_builder::SkipIndexBuilder;
/// A checkpoint contains meta-information about
/// a block. Either a block of documents, or another block
/// of checkpoints.
///
/// All of the intervals here defined are semi-open.
/// The checkpoint describes that the block within the bytes
/// `[start_offset..end_offset)` spans over the docs
/// `[start_doc..end_doc)`.
#[derive(Clone, Copy, Eq, PartialEq)]
pub struct Checkpoint {
pub start_doc: DocId,
pub end_doc: DocId,
pub start_offset: u64,
pub end_offset: u64,
}
impl fmt::Debug for Checkpoint {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"(doc=[{}..{}), bytes=[{}..{}))",
self.start_doc, self.end_doc, self.start_offset, self.end_offset
)
}
}
#[cfg(test)]
mod tests {
use std::io;
use proptest::strategy::{BoxedStrategy, Strategy};
use crate::directory::OwnedBytes;
use crate::store::index::Checkpoint;
use crate::DocId;
use super::{SkipIndex, SkipIndexBuilder};
#[test]
fn test_skip_index_empty() -> io::Result<()> {
let mut output: Vec<u8> = Vec::new();
let skip_index_builder: SkipIndexBuilder = SkipIndexBuilder::new();
skip_index_builder.write(&mut output)?;
let skip_index: SkipIndex = SkipIndex::from(OwnedBytes::new(output));
let mut skip_cursor = skip_index.checkpoints();
assert!(skip_cursor.next().is_none());
Ok(())
}
#[test]
fn test_skip_index_single_el() -> io::Result<()> {
let mut output: Vec<u8> = Vec::new();
let mut skip_index_builder: SkipIndexBuilder = SkipIndexBuilder::new();
let checkpoint = Checkpoint {
start_doc: 0,
end_doc: 2,
start_offset: 0,
end_offset: 3,
};
skip_index_builder.insert(checkpoint);
skip_index_builder.write(&mut output)?;
let skip_index: SkipIndex = SkipIndex::from(OwnedBytes::new(output));
let mut skip_cursor = skip_index.checkpoints();
assert_eq!(skip_cursor.next(), Some(checkpoint));
assert_eq!(skip_cursor.next(), None);
Ok(())
}
#[test]
fn test_skip_index() -> io::Result<()> {
let mut output: Vec<u8> = Vec::new();
let checkpoints = vec![
Checkpoint {
start_doc: 0,
end_doc: 3,
start_offset: 4,
end_offset: 9,
},
Checkpoint {
start_doc: 3,
end_doc: 4,
start_offset: 9,
end_offset: 25,
},
Checkpoint {
start_doc: 4,
end_doc: 6,
start_offset: 25,
end_offset: 49,
},
Checkpoint {
start_doc: 6,
end_doc: 8,
start_offset: 49,
end_offset: 81,
},
Checkpoint {
start_doc: 8,
end_doc: 10,
start_offset: 81,
end_offset: 100,
},
];
let mut skip_index_builder: SkipIndexBuilder = SkipIndexBuilder::new();
for &checkpoint in &checkpoints {
skip_index_builder.insert(checkpoint);
}
skip_index_builder.write(&mut output)?;
let skip_index: SkipIndex = SkipIndex::from(OwnedBytes::new(output));
assert_eq!(
&skip_index.checkpoints().collect::<Vec<_>>()[..],
&checkpoints[..]
);
Ok(())
}
fn offset_test(doc: DocId) -> u64 {
(doc as u64) * (doc as u64)
}
#[test]
fn test_skip_index_long() -> io::Result<()> {
let mut output: Vec<u8> = Vec::new();
let checkpoints: Vec<Checkpoint> = (0..1000)
.map(|i| Checkpoint {
start_doc: i,
end_doc: i + 1,
start_offset: offset_test(i),
end_offset: offset_test(i + 1),
})
.collect();
let mut skip_index_builder = SkipIndexBuilder::new();
for checkpoint in &checkpoints {
skip_index_builder.insert(*checkpoint);
}
skip_index_builder.write(&mut output)?;
assert_eq!(output.len(), 4035);
let resulting_checkpoints: Vec<Checkpoint> = SkipIndex::from(OwnedBytes::new(output))
.checkpoints()
.collect();
assert_eq!(&resulting_checkpoints, &checkpoints);
Ok(())
}
fn integrate_delta(mut vals: Vec<u64>) -> Vec<u64> {
let mut prev = 0u64;
for val in vals.iter_mut() {
let new_val = *val + prev;
prev = new_val;
*val = new_val;
}
vals
}
// Generates a sequence of n valid checkpoints, with n < max_len.
fn monotonic_checkpoints(max_len: usize) -> BoxedStrategy<Vec<Checkpoint>> {
(1..max_len)
.prop_flat_map(move |len: usize| {
(
proptest::collection::vec(1u64..20u64, len as usize).prop_map(integrate_delta),
proptest::collection::vec(1u64..26u64, len as usize).prop_map(integrate_delta),
)
.prop_map(|(docs, offsets)| {
(0..docs.len() - 1)
.map(move |i| Checkpoint {
start_doc: docs[i] as DocId,
end_doc: docs[i + 1] as DocId,
start_offset: offsets[i],
end_offset: offsets[i + 1],
})
.collect::<Vec<Checkpoint>>()
})
})
.boxed()
}
fn seek_manual<I: Iterator<Item = Checkpoint>>(
checkpoints: I,
target: DocId,
) -> Option<Checkpoint> {
checkpoints
.into_iter()
.filter(|checkpoint| checkpoint.end_doc > target)
.next()
}
fn test_skip_index_aux(skip_index: SkipIndex, checkpoints: &[Checkpoint]) {
if let Some(last_checkpoint) = checkpoints.last() {
for doc in 0u32..last_checkpoint.end_doc {
let expected = seek_manual(skip_index.checkpoints(), doc);
assert_eq!(expected, skip_index.seek(doc), "Doc {}", doc);
}
assert!(skip_index.seek(last_checkpoint.end_doc).is_none());
}
}
use proptest::prelude::*;
proptest! {
#![proptest_config(ProptestConfig::with_cases(20))]
#[test]
fn test_proptest_skip(checkpoints in monotonic_checkpoints(100)) {
let mut skip_index_builder = SkipIndexBuilder::new();
for checkpoint in checkpoints.iter().cloned() {
skip_index_builder.insert(checkpoint);
}
let mut buffer = Vec::new();
skip_index_builder.write(&mut buffer).unwrap();
let skip_index = SkipIndex::from(OwnedBytes::new(buffer));
let iter_checkpoints: Vec<Checkpoint> = skip_index.checkpoints().collect();
assert_eq!(&checkpoints[..], &iter_checkpoints[..]);
test_skip_index_aux(skip_index, &checkpoints[..]);
}
}
}

View File

@@ -0,0 +1,112 @@
use crate::common::{BinarySerializable, VInt};
use crate::directory::OwnedBytes;
use crate::store::index::block::CheckpointBlock;
use crate::store::index::Checkpoint;
use crate::DocId;
pub struct LayerCursor<'a> {
remaining: &'a [u8],
block: CheckpointBlock,
cursor: usize,
}
impl<'a> Iterator for LayerCursor<'a> {
type Item = Checkpoint;
fn next(&mut self) -> Option<Checkpoint> {
if self.cursor == self.block.len() {
if self.remaining.is_empty() {
return None;
}
let (block_mut, remaining_mut) = (&mut self.block, &mut self.remaining);
if let Err(_) = block_mut.deserialize(remaining_mut) {
return None;
}
self.cursor = 0;
}
let res = Some(self.block.get(self.cursor));
self.cursor += 1;
res
}
}
struct Layer {
data: OwnedBytes,
}
impl Layer {
fn cursor<'a>(&'a self) -> impl Iterator<Item = Checkpoint> + 'a {
self.cursor_at_offset(0u64)
}
fn cursor_at_offset<'a>(&'a self, start_offset: u64) -> impl Iterator<Item = Checkpoint> + 'a {
let data = &self.data.as_slice();
LayerCursor {
remaining: &data[start_offset as usize..],
block: CheckpointBlock::default(),
cursor: 0,
}
}
fn seek_start_at_offset(&self, target: DocId, offset: u64) -> Option<Checkpoint> {
self.cursor_at_offset(offset)
.filter(|checkpoint| checkpoint.end_doc > target)
.next()
}
}
pub struct SkipIndex {
layers: Vec<Layer>,
}
impl SkipIndex {
pub(crate) fn checkpoints<'a>(&'a self) -> impl Iterator<Item = Checkpoint> + 'a {
self.layers
.last()
.into_iter()
.flat_map(|layer| layer.cursor())
}
pub fn seek(&self, target: DocId) -> Option<Checkpoint> {
let first_layer_len = self
.layers
.first()
.map(|layer| layer.data.len() as u64)
.unwrap_or(0u64);
let mut cur_checkpoint = Checkpoint {
start_doc: 0u32,
end_doc: 1u32,
start_offset: 0u64,
end_offset: first_layer_len,
};
for layer in &self.layers {
if let Some(checkpoint) =
layer.seek_start_at_offset(target, cur_checkpoint.start_offset)
{
cur_checkpoint = checkpoint;
} else {
return None;
}
}
Some(cur_checkpoint)
}
}
impl From<OwnedBytes> for SkipIndex {
fn from(mut data: OwnedBytes) -> SkipIndex {
let offsets: Vec<u64> = Vec::<VInt>::deserialize(&mut data)
.unwrap()
.into_iter()
.map(|el| el.0)
.collect();
let mut start_offset = 0;
let mut layers = Vec::new();
for end_offset in offsets {
layers.push(Layer {
data: data.slice(start_offset as usize, end_offset as usize),
});
start_offset = end_offset;
}
SkipIndex { layers }
}
}

View File

@@ -0,0 +1,115 @@
use crate::common::{BinarySerializable, VInt};
use crate::store::index::block::CheckpointBlock;
use crate::store::index::{Checkpoint, CHECKPOINT_PERIOD};
use std::io;
use std::io::Write;
// Each skip contains iterator over pairs (last doc in block, offset to start of block).
struct LayerBuilder {
buffer: Vec<u8>,
pub block: CheckpointBlock,
}
impl LayerBuilder {
fn finish(self) -> Vec<u8> {
self.buffer
}
fn new() -> LayerBuilder {
LayerBuilder {
buffer: Vec::new(),
block: CheckpointBlock::default(),
}
}
/// Serializes the block, and return a checkpoint representing
/// the entire block.
///
/// If the block was empty to begin with, simply return None.
fn flush_block(&mut self) -> Option<Checkpoint> {
self.block.doc_interval().map(|(start_doc, end_doc)| {
let start_offset = self.buffer.len() as u64;
self.block.serialize(&mut self.buffer);
let end_offset = self.buffer.len() as u64;
self.block.clear();
Checkpoint {
start_doc,
end_doc,
start_offset,
end_offset,
}
})
}
fn push(&mut self, checkpoint: Checkpoint) {
self.block.push(checkpoint);
}
fn insert(&mut self, checkpoint: Checkpoint) -> Option<Checkpoint> {
self.push(checkpoint);
let emit_skip_info = (self.block.len() % CHECKPOINT_PERIOD) == 0;
if emit_skip_info {
self.flush_block()
} else {
None
}
}
}
pub struct SkipIndexBuilder {
layers: Vec<LayerBuilder>,
}
impl SkipIndexBuilder {
pub fn new() -> SkipIndexBuilder {
SkipIndexBuilder { layers: Vec::new() }
}
fn get_layer(&mut self, layer_id: usize) -> &mut LayerBuilder {
if layer_id == self.layers.len() {
let layer_builder = LayerBuilder::new();
self.layers.push(layer_builder);
}
&mut self.layers[layer_id]
}
pub fn insert(&mut self, checkpoint: Checkpoint) {
let mut skip_pointer = Some(checkpoint);
for layer_id in 0.. {
if let Some(checkpoint) = skip_pointer {
skip_pointer = self.get_layer(layer_id).insert(checkpoint);
} else {
break;
}
}
}
pub fn write<W: Write>(mut self, output: &mut W) -> io::Result<()> {
let mut last_pointer = None;
for skip_layer in self.layers.iter_mut() {
if let Some(checkpoint) = last_pointer {
skip_layer.push(checkpoint);
}
last_pointer = skip_layer.flush_block();
}
let layer_buffers: Vec<Vec<u8>> = self
.layers
.into_iter()
.rev()
.map(|layer| layer.finish())
.collect();
let mut layer_offset = 0;
let mut layer_sizes = Vec::new();
for layer_buffer in &layer_buffers {
layer_offset += layer_buffer.len() as u64;
layer_sizes.push(VInt(layer_offset));
}
layer_sizes.serialize(output)?;
for layer_buffer in layer_buffers {
output.write_all(&layer_buffer[..])?;
}
Ok(())
}
}

View File

@@ -33,8 +33,8 @@ and should rely on either
!*/
mod index;
mod reader;
mod skiplist;
mod writer;
pub use self::reader::StoreReader;
pub use self::writer::StoreWriter;

View File

@@ -1,69 +1,91 @@
use super::decompress;
use super::skiplist::SkipList;
use super::index::SkipIndex;
use crate::common::VInt;
use crate::common::{BinarySerializable, HasLen};
use crate::directory::{FileSlice, OwnedBytes};
use crate::schema::Document;
use crate::space_usage::StoreSpaceUsage;
use crate::store::index::Checkpoint;
use crate::DocId;
use std::cell::RefCell;
use lru::LruCache;
use std::io;
use std::mem::size_of;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
const LRU_CACHE_CAPACITY: usize = 100;
type Block = Arc<Vec<u8>>;
type BlockCache = Arc<Mutex<LruCache<u64, Block>>>;
/// Reads document off tantivy's [`Store`](./index.html)
#[derive(Clone)]
pub struct StoreReader {
data: FileSlice,
offset_index_file: OwnedBytes,
current_block_offset: RefCell<usize>,
current_block: RefCell<Vec<u8>>,
max_doc: DocId,
cache: BlockCache,
cache_hits: Arc<AtomicUsize>,
cache_misses: Arc<AtomicUsize>,
skip_index: Arc<SkipIndex>,
space_usage: StoreSpaceUsage,
}
impl StoreReader {
/// Opens a store reader
// TODO rename open
pub fn open(store_file: FileSlice) -> io::Result<StoreReader> {
let (data_file, offset_index_file, max_doc) = split_file(store_file)?;
let (data_file, offset_index_file) = split_file(store_file)?;
let index_data = offset_index_file.read_bytes()?;
let space_usage = StoreSpaceUsage::new(data_file.len(), offset_index_file.len());
let skip_index = SkipIndex::from(index_data);
Ok(StoreReader {
data: data_file,
offset_index_file: offset_index_file.read_bytes()?,
current_block_offset: RefCell::new(usize::max_value()),
current_block: RefCell::new(Vec::new()),
max_doc,
cache: Arc::new(Mutex::new(LruCache::new(LRU_CACHE_CAPACITY))),
cache_hits: Default::default(),
cache_misses: Default::default(),
skip_index: Arc::new(skip_index),
space_usage,
})
}
pub(crate) fn block_index(&self) -> SkipList<'_, u64> {
SkipList::from(self.offset_index_file.as_slice())
pub(crate) fn block_checkpoints<'a>(&'a self) -> impl Iterator<Item = Checkpoint> + 'a {
self.skip_index.checkpoints()
}
fn block_offset(&self, doc_id: DocId) -> (DocId, u64) {
self.block_index()
.seek(u64::from(doc_id) + 1)
.map(|(doc, offset)| (doc as DocId, offset))
.unwrap_or((0u32, 0u64))
fn block_checkpoint(&self, doc_id: DocId) -> Option<Checkpoint> {
self.skip_index.seek(doc_id)
}
pub(crate) fn block_data(&self) -> io::Result<OwnedBytes> {
self.data.read_bytes()
}
fn compressed_block(&self, addr: usize) -> io::Result<OwnedBytes> {
let (block_len_bytes, block_body) = self.data.slice_from(addr).split(4);
let block_len = u32::deserialize(&mut block_len_bytes.read_bytes()?)?;
block_body.slice_to(block_len as usize).read_bytes()
fn compressed_block(&self, checkpoint: &Checkpoint) -> io::Result<OwnedBytes> {
self.data
.slice(
checkpoint.start_offset as usize,
checkpoint.end_offset as usize,
)
.read_bytes()
}
fn read_block(&self, block_offset: usize) -> io::Result<()> {
if block_offset != *self.current_block_offset.borrow() {
let mut current_block_mut = self.current_block.borrow_mut();
current_block_mut.clear();
let compressed_block = self.compressed_block(block_offset)?;
decompress(compressed_block.as_slice(), &mut current_block_mut)?;
*self.current_block_offset.borrow_mut() = block_offset;
fn read_block(&self, checkpoint: &Checkpoint) -> io::Result<Block> {
if let Some(block) = self.cache.lock().unwrap().get(&checkpoint.start_offset) {
self.cache_hits.fetch_add(1, Ordering::SeqCst);
return Ok(block.clone());
}
Ok(())
self.cache_misses.fetch_add(1, Ordering::SeqCst);
let compressed_block = self.compressed_block(checkpoint)?;
let mut decompressed_block = vec![];
decompress(compressed_block.as_slice(), &mut decompressed_block)?;
let block = Arc::new(decompressed_block);
self.cache
.lock()
.unwrap()
.put(checkpoint.start_offset, block.clone());
Ok(block)
}
/// Reads a given document.
@@ -74,14 +96,15 @@ impl StoreReader {
/// It should not be called to score documents
/// for instance.
pub fn get(&self, doc_id: DocId) -> crate::Result<Document> {
let (first_doc_id, block_offset) = self.block_offset(doc_id);
self.read_block(block_offset as usize)?;
let current_block_mut = self.current_block.borrow_mut();
let mut cursor = &current_block_mut[..];
for _ in first_doc_id..doc_id {
let checkpoint = self.block_checkpoint(doc_id).ok_or_else(|| {
crate::TantivyError::InvalidArgument(format!("Failed to lookup Doc #{}.", doc_id))
})?;
let mut cursor = &self.read_block(&checkpoint)?[..];
for _ in checkpoint.start_doc..doc_id {
let doc_length = VInt::deserialize(&mut cursor)?.val() as usize;
cursor = &cursor[doc_length..];
}
let doc_length = VInt::deserialize(&mut cursor)?.val() as usize;
cursor = &cursor[..doc_length];
Ok(Document::deserialize(&mut cursor)?)
@@ -89,21 +112,93 @@ impl StoreReader {
/// Summarize total space usage of this store reader.
pub fn space_usage(&self) -> StoreSpaceUsage {
StoreSpaceUsage::new(self.data.len(), self.offset_index_file.len())
self.space_usage.clone()
}
}
fn split_file(data: FileSlice) -> io::Result<(FileSlice, FileSlice, DocId)> {
let data_len = data.len();
let footer_offset = data_len - size_of::<u64>() - size_of::<u32>();
let serialized_offset: OwnedBytes = data.slice(footer_offset, data_len).read_bytes()?;
fn split_file(data: FileSlice) -> io::Result<(FileSlice, FileSlice)> {
let (data, footer_len_bytes) = data.split_from_end(size_of::<u64>());
let serialized_offset: OwnedBytes = footer_len_bytes.read_bytes()?;
let mut serialized_offset_buf = serialized_offset.as_slice();
let offset = u64::deserialize(&mut serialized_offset_buf)?;
let offset = offset as usize;
let max_doc = u32::deserialize(&mut serialized_offset_buf)?;
Ok((
data.slice(0, offset),
data.slice(offset, footer_offset),
max_doc,
))
let offset = u64::deserialize(&mut serialized_offset_buf)? as usize;
Ok(data.split(offset))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::schema::Document;
use crate::schema::Field;
use crate::{directory::RAMDirectory, store::tests::write_lorem_ipsum_store, Directory};
use std::path::Path;
fn get_text_field<'a>(doc: &'a Document, field: &'a Field) -> Option<&'a str> {
doc.get_first(*field).and_then(|f| f.text())
}
#[test]
fn test_store_lru_cache() -> crate::Result<()> {
let directory = RAMDirectory::create();
let path = Path::new("store");
let writer = directory.open_write(path)?;
let schema = write_lorem_ipsum_store(writer, 500);
let title = schema.get_field("title").unwrap();
let store_file = directory.open_read(path)?;
let store = StoreReader::open(store_file)?;
assert_eq!(store.cache.lock().unwrap().len(), 0);
assert_eq!(store.cache_hits.load(Ordering::SeqCst), 0);
assert_eq!(store.cache_misses.load(Ordering::SeqCst), 0);
let doc = store.get(0)?;
assert_eq!(get_text_field(&doc, &title), Some("Doc 0"));
assert_eq!(store.cache.lock().unwrap().len(), 1);
assert_eq!(store.cache_hits.load(Ordering::SeqCst), 0);
assert_eq!(store.cache_misses.load(Ordering::SeqCst), 1);
assert_eq!(
store
.cache
.lock()
.unwrap()
.peek_lru()
.map(|(&k, _)| k as usize),
Some(0)
);
let doc = store.get(499)?;
assert_eq!(get_text_field(&doc, &title), Some("Doc 499"));
assert_eq!(store.cache.lock().unwrap().len(), 2);
assert_eq!(store.cache_hits.load(Ordering::SeqCst), 0);
assert_eq!(store.cache_misses.load(Ordering::SeqCst), 2);
assert_eq!(
store
.cache
.lock()
.unwrap()
.peek_lru()
.map(|(&k, _)| k as usize),
Some(0)
);
let doc = store.get(0)?;
assert_eq!(get_text_field(&doc, &title), Some("Doc 0"));
assert_eq!(store.cache.lock().unwrap().len(), 2);
assert_eq!(store.cache_hits.load(Ordering::SeqCst), 1);
assert_eq!(store.cache_misses.load(Ordering::SeqCst), 2);
assert_eq!(
store
.cache
.lock()
.unwrap()
.peek_lru()
.map(|(&k, _)| k as usize),
Some(18806)
);
Ok(())
}
}

View File

@@ -1,168 +0,0 @@
#![allow(dead_code)]
mod skiplist;
mod skiplist_builder;
pub use self::skiplist::SkipList;
pub use self::skiplist_builder::SkipListBuilder;
#[cfg(test)]
mod tests {
use super::{SkipList, SkipListBuilder};
#[test]
fn test_skiplist() {
let mut output: Vec<u8> = Vec::new();
let mut skip_list_builder: SkipListBuilder<u32> = SkipListBuilder::new(8);
skip_list_builder.insert(2, &3).unwrap();
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
let mut skip_list: SkipList<'_, u32> = SkipList::from(output.as_slice());
assert_eq!(skip_list.next(), Some((2, 3)));
}
#[test]
fn test_skiplist2() {
let mut output: Vec<u8> = Vec::new();
let skip_list_builder: SkipListBuilder<u32> = SkipListBuilder::new(8);
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
let mut skip_list: SkipList<'_, u32> = SkipList::from(output.as_slice());
assert_eq!(skip_list.next(), None);
}
#[test]
fn test_skiplist3() {
let mut output: Vec<u8> = Vec::new();
let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(2);
skip_list_builder.insert(2, &()).unwrap();
skip_list_builder.insert(3, &()).unwrap();
skip_list_builder.insert(5, &()).unwrap();
skip_list_builder.insert(7, &()).unwrap();
skip_list_builder.insert(9, &()).unwrap();
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
let mut skip_list: SkipList<'_, ()> = SkipList::from(output.as_slice());
assert_eq!(skip_list.next().unwrap(), (2, ()));
assert_eq!(skip_list.next().unwrap(), (3, ()));
assert_eq!(skip_list.next().unwrap(), (5, ()));
assert_eq!(skip_list.next().unwrap(), (7, ()));
assert_eq!(skip_list.next().unwrap(), (9, ()));
assert_eq!(skip_list.next(), None);
}
#[test]
fn test_skiplist4() {
let mut output: Vec<u8> = Vec::new();
let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(2);
skip_list_builder.insert(2, &()).unwrap();
skip_list_builder.insert(3, &()).unwrap();
skip_list_builder.insert(5, &()).unwrap();
skip_list_builder.insert(7, &()).unwrap();
skip_list_builder.insert(9, &()).unwrap();
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
let mut skip_list: SkipList<'_, ()> = SkipList::from(output.as_slice());
assert_eq!(skip_list.next().unwrap(), (2, ()));
skip_list.seek(5);
assert_eq!(skip_list.next().unwrap(), (5, ()));
assert_eq!(skip_list.next().unwrap(), (7, ()));
assert_eq!(skip_list.next().unwrap(), (9, ()));
assert_eq!(skip_list.next(), None);
}
#[test]
fn test_skiplist5() {
let mut output: Vec<u8> = Vec::new();
let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(4);
skip_list_builder.insert(2, &()).unwrap();
skip_list_builder.insert(3, &()).unwrap();
skip_list_builder.insert(5, &()).unwrap();
skip_list_builder.insert(6, &()).unwrap();
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
let mut skip_list: SkipList<'_, ()> = SkipList::from(output.as_slice());
assert_eq!(skip_list.next().unwrap(), (2, ()));
skip_list.seek(6);
assert_eq!(skip_list.next().unwrap(), (6, ()));
assert_eq!(skip_list.next(), None);
}
#[test]
fn test_skiplist6() {
let mut output: Vec<u8> = Vec::new();
let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(2);
skip_list_builder.insert(2, &()).unwrap();
skip_list_builder.insert(3, &()).unwrap();
skip_list_builder.insert(5, &()).unwrap();
skip_list_builder.insert(7, &()).unwrap();
skip_list_builder.insert(9, &()).unwrap();
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
let mut skip_list: SkipList<'_, ()> = SkipList::from(output.as_slice());
assert_eq!(skip_list.next().unwrap(), (2, ()));
skip_list.seek(10);
assert_eq!(skip_list.next(), None);
}
#[test]
fn test_skiplist7() {
let mut output: Vec<u8> = Vec::new();
let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(4);
for i in 0..1000 {
skip_list_builder.insert(i, &()).unwrap();
}
skip_list_builder.insert(1004, &()).unwrap();
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
let mut skip_list: SkipList<'_, ()> = SkipList::from(output.as_slice());
assert_eq!(skip_list.next().unwrap(), (0, ()));
skip_list.seek(431);
assert_eq!(skip_list.next().unwrap(), (431, ()));
skip_list.seek(1003);
assert_eq!(skip_list.next().unwrap(), (1004, ()));
assert_eq!(skip_list.next(), None);
}
#[test]
fn test_skiplist8() {
let mut output: Vec<u8> = Vec::new();
let mut skip_list_builder: SkipListBuilder<u64> = SkipListBuilder::new(8);
skip_list_builder.insert(2, &3).unwrap();
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
assert_eq!(output.len(), 11);
assert_eq!(output[0], 1u8 + 128u8);
}
#[test]
fn test_skiplist9() {
let mut output: Vec<u8> = Vec::new();
let mut skip_list_builder: SkipListBuilder<u64> = SkipListBuilder::new(4);
for i in 0..4 * 4 * 4 {
skip_list_builder.insert(i, &i).unwrap();
}
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
assert_eq!(output.len(), 774);
assert_eq!(output[0], 4u8 + 128u8);
}
#[test]
fn test_skiplist10() {
// checking that void gets serialized to nothing.
let mut output: Vec<u8> = Vec::new();
let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(4);
for i in 0..((4 * 4 * 4) - 1) {
skip_list_builder.insert(i, &()).unwrap();
}
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
assert_eq!(output.len(), 230);
assert_eq!(output[0], 128u8 + 3u8);
}
#[test]
fn test_skiplist11() {
// checking that void gets serialized to nothing.
let mut output: Vec<u8> = Vec::new();
let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(4);
for i in 0..(4 * 4) {
skip_list_builder.insert(i, &()).unwrap();
}
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
assert_eq!(output.len(), 65);
assert_eq!(output[0], 128u8 + 3u8);
}
}

View File

@@ -1,133 +0,0 @@
use crate::common::{BinarySerializable, VInt};
use std::cmp::max;
use std::marker::PhantomData;
static EMPTY: [u8; 0] = [];
struct Layer<'a, T> {
data: &'a [u8],
cursor: &'a [u8],
next_id: Option<u64>,
_phantom_: PhantomData<T>,
}
impl<'a, T: BinarySerializable> Iterator for Layer<'a, T> {
type Item = (u64, T);
fn next(&mut self) -> Option<(u64, T)> {
if let Some(cur_id) = self.next_id {
let cur_val = T::deserialize(&mut self.cursor).unwrap();
self.next_id = VInt::deserialize_u64(&mut self.cursor).ok();
Some((cur_id, cur_val))
} else {
None
}
}
}
impl<'a, T: BinarySerializable> From<&'a [u8]> for Layer<'a, T> {
fn from(data: &'a [u8]) -> Layer<'a, T> {
let mut cursor = data;
let next_id = VInt::deserialize_u64(&mut cursor).ok();
Layer {
data,
cursor,
next_id,
_phantom_: PhantomData,
}
}
}
impl<'a, T: BinarySerializable> Layer<'a, T> {
fn empty() -> Layer<'a, T> {
Layer {
data: &EMPTY,
cursor: &EMPTY,
next_id: None,
_phantom_: PhantomData,
}
}
fn seek_offset(&mut self, offset: usize) {
self.cursor = &self.data[offset..];
self.next_id = VInt::deserialize_u64(&mut self.cursor).ok();
}
// Returns the last element (key, val)
// such that (key < doc_id)
//
// If there is no such element anymore,
// returns None.
//
// If the element exists, it will be returned
// at the next call to `.next()`.
fn seek(&mut self, key: u64) -> Option<(u64, T)> {
let mut result: Option<(u64, T)> = None;
loop {
if let Some(next_id) = self.next_id {
if next_id < key {
if let Some(v) = self.next() {
result = Some(v);
continue;
}
}
}
return result;
}
}
}
pub struct SkipList<'a, T: BinarySerializable> {
data_layer: Layer<'a, T>,
skip_layers: Vec<Layer<'a, u64>>,
}
impl<'a, T: BinarySerializable> Iterator for SkipList<'a, T> {
type Item = (u64, T);
fn next(&mut self) -> Option<(u64, T)> {
self.data_layer.next()
}
}
impl<'a, T: BinarySerializable> SkipList<'a, T> {
pub fn seek(&mut self, key: u64) -> Option<(u64, T)> {
let mut next_layer_skip: Option<(u64, u64)> = None;
for skip_layer in &mut self.skip_layers {
if let Some((_, offset)) = next_layer_skip {
skip_layer.seek_offset(offset as usize);
}
next_layer_skip = skip_layer.seek(key);
}
if let Some((_, offset)) = next_layer_skip {
self.data_layer.seek_offset(offset as usize);
}
self.data_layer.seek(key)
}
}
impl<'a, T: BinarySerializable> From<&'a [u8]> for SkipList<'a, T> {
fn from(mut data: &'a [u8]) -> SkipList<'a, T> {
let offsets: Vec<u64> = Vec::<VInt>::deserialize(&mut data)
.unwrap()
.into_iter()
.map(|el| el.0)
.collect();
let num_layers = offsets.len();
let layers_data: &[u8] = data;
let data_layer: Layer<'a, T> = if num_layers == 0 {
Layer::empty()
} else {
let first_layer_data: &[u8] = &layers_data[..offsets[0] as usize];
Layer::from(first_layer_data)
};
let skip_layers = (0..max(1, num_layers) - 1)
.map(|i| (offsets[i] as usize, offsets[i + 1] as usize))
.map(|(start, stop)| Layer::from(&layers_data[start..stop]))
.collect();
SkipList {
skip_layers,
data_layer,
}
}
}

View File

@@ -1,98 +0,0 @@
use crate::common::{is_power_of_2, BinarySerializable, VInt};
use std::io;
use std::io::Write;
use std::marker::PhantomData;
struct LayerBuilder<T: BinarySerializable> {
period_mask: usize,
buffer: Vec<u8>,
len: usize,
_phantom_: PhantomData<T>,
}
impl<T: BinarySerializable> LayerBuilder<T> {
fn written_size(&self) -> usize {
self.buffer.len()
}
fn write(&self, output: &mut dyn Write) -> Result<(), io::Error> {
output.write_all(&self.buffer)?;
Ok(())
}
fn with_period(period: usize) -> LayerBuilder<T> {
assert!(is_power_of_2(period), "The period has to be a power of 2.");
LayerBuilder {
period_mask: (period - 1),
buffer: Vec::new(),
len: 0,
_phantom_: PhantomData,
}
}
fn insert(&mut self, key: u64, value: &T) -> io::Result<Option<(u64, u64)>> {
self.len += 1;
let offset = self.written_size() as u64;
VInt(key).serialize_into_vec(&mut self.buffer);
value.serialize(&mut self.buffer)?;
let emit_skip_info = (self.period_mask & self.len) == 0;
if emit_skip_info {
Ok(Some((key, offset)))
} else {
Ok(None)
}
}
}
pub struct SkipListBuilder<T: BinarySerializable> {
period: usize,
data_layer: LayerBuilder<T>,
skip_layers: Vec<LayerBuilder<u64>>,
}
impl<T: BinarySerializable> SkipListBuilder<T> {
pub fn new(period: usize) -> SkipListBuilder<T> {
SkipListBuilder {
period,
data_layer: LayerBuilder::with_period(period),
skip_layers: Vec::new(),
}
}
fn get_skip_layer(&mut self, layer_id: usize) -> &mut LayerBuilder<u64> {
if layer_id == self.skip_layers.len() {
let layer_builder = LayerBuilder::with_period(self.period);
self.skip_layers.push(layer_builder);
}
&mut self.skip_layers[layer_id]
}
pub fn insert(&mut self, key: u64, dest: &T) -> io::Result<()> {
let mut skip_pointer = self.data_layer.insert(key, dest)?;
for layer_id in 0.. {
if let Some((skip_doc_id, skip_offset)) = skip_pointer {
skip_pointer = self
.get_skip_layer(layer_id)
.insert(skip_doc_id, &skip_offset)?;
} else {
break;
}
}
Ok(())
}
pub fn write<W: Write>(self, output: &mut W) -> io::Result<()> {
let mut size: u64 = self.data_layer.buffer.len() as u64;
let mut layer_sizes = vec![VInt(size)];
for layer in self.skip_layers.iter().rev() {
size += layer.buffer.len() as u64;
layer_sizes.push(VInt(size));
}
layer_sizes.serialize(output)?;
self.data_layer.write(output)?;
for layer in self.skip_layers.iter().rev() {
layer.write(output)?;
}
Ok(())
}
}

View File

@@ -1,11 +1,12 @@
use super::compress;
use super::skiplist::SkipListBuilder;
use super::index::SkipIndexBuilder;
use super::StoreReader;
use crate::common::CountingWriter;
use crate::common::{BinarySerializable, VInt};
use crate::directory::TerminatingWrite;
use crate::directory::WritePtr;
use crate::schema::Document;
use crate::store::index::Checkpoint;
use crate::DocId;
use std::io::{self, Write};
@@ -21,7 +22,8 @@ const BLOCK_SIZE: usize = 16_384;
///
pub struct StoreWriter {
doc: DocId,
offset_index_writer: SkipListBuilder<u64>,
first_doc_in_block: DocId,
offset_index_writer: SkipIndexBuilder,
writer: CountingWriter<WritePtr>,
intermediary_buffer: Vec<u8>,
current_block: Vec<u8>,
@@ -35,7 +37,8 @@ impl StoreWriter {
pub fn new(writer: WritePtr) -> StoreWriter {
StoreWriter {
doc: 0,
offset_index_writer: SkipListBuilder::new(4),
first_doc_in_block: 0,
offset_index_writer: SkipIndexBuilder::new(),
writer: CountingWriter::wrap(writer),
intermediary_buffer: Vec::new(),
current_block: Vec::new(),
@@ -68,11 +71,9 @@ impl StoreWriter {
pub fn stack(&mut self, store_reader: &StoreReader) -> io::Result<()> {
if !self.current_block.is_empty() {
self.write_and_compress_block()?;
self.offset_index_writer
.insert(u64::from(self.doc), &(self.writer.written_bytes() as u64))?;
}
let doc_offset = self.doc;
let start_offset = self.writer.written_bytes() as u64;
let doc_shift = self.doc;
let start_shift = self.writer.written_bytes() as u64;
// just bulk write all of the block of the given reader.
self.writer
@@ -80,22 +81,33 @@ impl StoreWriter {
// concatenate the index of the `store_reader`, after translating
// its start doc id and its start file offset.
for (next_doc_id, block_addr) in store_reader.block_index() {
self.doc = doc_offset + next_doc_id as u32;
self.offset_index_writer
.insert(u64::from(self.doc), &(start_offset + block_addr))?;
for mut checkpoint in store_reader.block_checkpoints() {
checkpoint.start_doc += doc_shift;
checkpoint.end_doc += doc_shift;
checkpoint.start_offset += start_shift;
checkpoint.end_offset += start_shift;
self.offset_index_writer.insert(checkpoint);
self.doc = checkpoint.end_doc;
}
Ok(())
}
fn write_and_compress_block(&mut self) -> io::Result<()> {
assert!(self.doc > 0);
self.intermediary_buffer.clear();
compress(&self.current_block[..], &mut self.intermediary_buffer)?;
(self.intermediary_buffer.len() as u32).serialize(&mut self.writer)?;
let start_offset = self.writer.written_bytes();
self.writer.write_all(&self.intermediary_buffer)?;
self.offset_index_writer
.insert(u64::from(self.doc), &(self.writer.written_bytes() as u64))?;
let end_offset = self.writer.written_bytes();
let end_doc = self.doc;
self.offset_index_writer.insert(Checkpoint {
start_doc: self.first_doc_in_block,
end_doc,
start_offset,
end_offset,
});
self.current_block.clear();
self.first_doc_in_block = self.doc;
Ok(())
}
@@ -110,7 +122,6 @@ impl StoreWriter {
let header_offset: u64 = self.writer.written_bytes() as u64;
self.offset_index_writer.write(&mut self.writer)?;
header_offset.serialize(&mut self.writer)?;
self.doc.serialize(&mut self.writer)?;
self.writer.terminate()
}
}

View File

@@ -60,12 +60,10 @@ impl<'a> TermMerger<'a> {
pub(crate) fn matching_segments<'b: 'a>(
&'b self,
) -> Box<dyn 'b + Iterator<Item = (usize, TermOrdinal)>> {
Box::new(
self.current_streamers
.iter()
.map(|heap_item| (heap_item.segment_ord, heap_item.streamer.term_ord())),
)
) -> impl 'b + Iterator<Item = (usize, TermOrdinal)> {
self.current_streamers
.iter()
.map(|heap_item| (heap_item.segment_ord, heap_item.streamer.term_ord()))
}
fn advance_segments(&mut self) {

View File

@@ -44,11 +44,13 @@ mod tests {
const BLOCK_SIZE: usize = 1_500;
fn make_term_info(val: u64) -> TermInfo {
fn make_term_info(term_ord: u64) -> TermInfo {
let offset = |term_ord: u64| term_ord * 100 + term_ord * term_ord;
TermInfo {
doc_freq: val as u32,
positions_idx: val * 2u64,
postings_offset: val * 3u64,
doc_freq: term_ord as u32,
postings_start_offset: offset(term_ord),
postings_stop_offset: offset(term_ord + 1),
positions_idx: offset(term_ord) * 2u64,
}
}
@@ -197,7 +199,7 @@ mod tests {
// term requires more than 16bits
term_dictionary_builder.insert("abcdefghijklmnopqrstuvwxy", &make_term_info(1))?;
term_dictionary_builder.insert("abcdefghijklmnopqrstuvwxyz", &make_term_info(2))?;
term_dictionary_builder.insert("abr", &make_term_info(2))?;
term_dictionary_builder.insert("abr", &make_term_info(3))?;
term_dictionary_builder.finish()?
};
let term_dict_file = FileSlice::from(buffer);
@@ -211,6 +213,7 @@ mod tests {
assert_eq!(kv_stream.value(), &make_term_info(2));
assert!(kv_stream.advance());
assert_eq!(kv_stream.key(), "abr".as_bytes());
assert_eq!(kv_stream.value(), &make_term_info(3));
assert!(!kv_stream.advance());
Ok(())
}

View File

@@ -55,22 +55,32 @@ impl TermInfoBlockMeta {
self.doc_freq_nbits + self.postings_offset_nbits + self.positions_idx_nbits
}
// Here inner_offset is the offset within the block, WITHOUT the first term_info.
// In other word, term_info #1,#2,#3 gets inner_offset 0,1,2... While term_info #0
// is encoded without bitpacking.
fn deserialize_term_info(&self, data: &[u8], inner_offset: usize) -> TermInfo {
assert!(inner_offset < BLOCK_LEN - 1);
let num_bits = self.num_bits() as usize;
let mut cursor = num_bits * inner_offset;
let doc_freq = extract_bits(data, cursor, self.doc_freq_nbits) as u32;
cursor += self.doc_freq_nbits as usize;
let posting_start_addr = num_bits * inner_offset;
// the stop offset is the start offset of the next term info.
let posting_stop_addr = posting_start_addr + num_bits;
let doc_freq_addr = posting_start_addr + self.postings_offset_nbits as usize;
let positions_idx_addr = doc_freq_addr + self.doc_freq_nbits as usize;
let postings_offset = extract_bits(data, cursor, self.postings_offset_nbits);
cursor += self.postings_offset_nbits as usize;
let positions_idx = extract_bits(data, cursor, self.positions_idx_nbits);
let postings_start_offset = self.ref_term_info.postings_start_offset
+ extract_bits(data, posting_start_addr, self.postings_offset_nbits);
let postings_stop_offset = self.ref_term_info.postings_start_offset
+ extract_bits(data, posting_stop_addr, self.postings_offset_nbits);
let doc_freq = extract_bits(data, doc_freq_addr, self.doc_freq_nbits) as u32;
let positions_idx = self.ref_term_info.positions_idx
+ extract_bits(data, positions_idx_addr, self.positions_idx_nbits);
TermInfo {
doc_freq,
postings_offset: postings_offset + self.ref_term_info.postings_offset,
positions_idx: positions_idx + self.ref_term_info.positions_idx,
postings_start_offset,
postings_stop_offset,
positions_idx,
}
}
}
@@ -152,16 +162,17 @@ fn bitpack_serialize<W: Write>(
term_info_block_meta: &TermInfoBlockMeta,
term_info: &TermInfo,
) -> io::Result<()> {
bit_packer.write(
term_info.postings_start_offset,
term_info_block_meta.postings_offset_nbits,
write,
)?;
bit_packer.write(
u64::from(term_info.doc_freq),
term_info_block_meta.doc_freq_nbits,
write,
)?;
bit_packer.write(
term_info.postings_offset,
term_info_block_meta.postings_offset_nbits,
write,
)?;
bit_packer.write(
term_info.positions_idx,
term_info_block_meta.positions_idx_nbits,
@@ -181,23 +192,27 @@ impl TermInfoStoreWriter {
}
fn flush_block(&mut self) -> io::Result<()> {
if self.term_infos.is_empty() {
return Ok(());
}
let mut bit_packer = BitPacker::new();
let ref_term_info = self.term_infos[0].clone();
let last_term_info = if let Some(last_term_info) = self.term_infos.last().cloned() {
last_term_info
} else {
return Ok(());
};
let postings_stop_offset =
last_term_info.postings_stop_offset - ref_term_info.postings_start_offset;
for term_info in &mut self.term_infos[1..] {
term_info.postings_offset -= ref_term_info.postings_offset;
term_info.postings_start_offset -= ref_term_info.postings_start_offset;
term_info.positions_idx -= ref_term_info.positions_idx;
}
let mut max_doc_freq: u32 = 0u32;
let mut max_postings_offset: u64 = 0u64;
let mut max_positions_idx: u64 = 0u64;
let max_postings_offset: u64 = postings_stop_offset;
let max_positions_idx: u64 = last_term_info.positions_idx;
for term_info in &self.term_infos[1..] {
max_doc_freq = cmp::max(max_doc_freq, term_info.doc_freq);
max_postings_offset = cmp::max(max_postings_offset, term_info.postings_offset);
max_positions_idx = cmp::max(max_positions_idx, term_info.positions_idx);
}
let max_doc_freq_nbits: u8 = compute_num_bits(u64::from(max_doc_freq));
@@ -222,6 +237,12 @@ impl TermInfoStoreWriter {
)?;
}
bit_packer.write(
postings_stop_offset,
term_info_block_meta.postings_offset_nbits,
&mut self.buffer_term_infos,
)?;
// Block need end up at the end of a byte.
bit_packer.flush(&mut self.buffer_term_infos)?;
self.term_infos.clear();
@@ -230,6 +251,7 @@ impl TermInfoStoreWriter {
}
pub fn write_term_info(&mut self, term_info: &TermInfo) -> io::Result<()> {
assert!(term_info.postings_stop_offset >= term_info.postings_start_offset);
self.num_terms += 1u64;
self.term_infos.push(term_info.clone());
if self.term_infos.len() >= BLOCK_LEN {
@@ -289,10 +311,11 @@ mod tests {
#[test]
fn test_term_info_block_meta_serialization() {
let term_info_block_meta = TermInfoBlockMeta {
offset: 2009,
offset: 2009u64,
ref_term_info: TermInfo {
doc_freq: 512,
postings_offset: 51,
postings_start_offset: 51,
postings_stop_offset: 57u64,
positions_idx: 3584,
},
doc_freq_nbits: 10,
@@ -310,10 +333,12 @@ mod tests {
fn test_pack() -> crate::Result<()> {
let mut store_writer = TermInfoStoreWriter::new();
let mut term_infos = vec![];
let offset = |i| (i * 13 + i * i) as u64;
for i in 0..1000 {
let term_info = TermInfo {
doc_freq: i as u32,
postings_offset: (i / 10) as u64,
postings_start_offset: offset(i),
postings_stop_offset: offset(i + 1),
positions_idx: (i * 7) as u64,
};
store_writer.write_term_info(&term_info)?;
@@ -323,7 +348,12 @@ mod tests {
store_writer.serialize(&mut buffer)?;
let term_info_store = TermInfoStore::open(FileSlice::from(buffer))?;
for i in 0..1000 {
assert_eq!(term_info_store.get(i as u64), term_infos[i]);
assert_eq!(
term_info_store.get(i as u64),
term_infos[i],
"term info {}",
i
);
}
Ok(())
}

View File

@@ -18,7 +18,7 @@ fn test_failpoints_managed_directory_gc_if_delete_fails() {
.unwrap()
.terminate()
.unwrap();
assert!(managed_directory.exists(test_path));
assert!(managed_directory.exists(test_path).unwrap());
// triggering gc and setting the delete operation to fail.
//
// We are checking that the gc operation is not removing the
@@ -29,12 +29,12 @@ fn test_failpoints_managed_directory_gc_if_delete_fails() {
// lock file.
fail::cfg("RAMDirectory::delete", "1*off->1*return").unwrap();
assert!(managed_directory.garbage_collect(Default::default).is_ok());
assert!(managed_directory.exists(test_path));
assert!(managed_directory.exists(test_path).unwrap());
// running the gc a second time should remove the file.
assert!(managed_directory.garbage_collect(Default::default).is_ok());
assert!(
!managed_directory.exists(test_path),
!managed_directory.exists(test_path).unwrap(),
"The file should have been deleted"
);
}