Compare commits

..

1 Commits

Author SHA1 Message Date
Paul Masurel
7cb018c640 Add an option to opt out fieldnorms for indexed fields.
Closes #922
2020-11-03 16:15:20 +09:00
50 changed files with 1001 additions and 1638 deletions

View File

@@ -7,7 +7,7 @@ Tantivy 0.14.0
- Added support for Brotli compression in the DocStore. (@ppodolsky)
- Added helper for building intersections and unions in BooleanQuery (@guilload)
- Bugfix in `Query::explain`
- Removed dependency on `notify` #924. Replaced with `FileWatcher` struct that polls meta file every 500ms in background thread. (@halvorboe @guilload)
- Making it possible to opt out the generation of fieldnorms information for indexed fields. This change breaks compatibility as the meta.json file format is slightly changed. (#922, @pmasurel)
Tantivy 0.13.2
===================

View File

@@ -30,6 +30,7 @@ serde_json = "1"
num_cpus = "1"
fs2={version="0.4", optional=true}
levenshtein_automata = "0.2"
notify = {version="4", optional=true}
uuid = { version = "0.8", features = ["v4", "serde"] }
crossbeam = "0.8"
futures = {version = "0.3", features=["thread-pool"] }
@@ -47,7 +48,6 @@ murmurhash32 = "0.2"
chrono = "0.4"
smallvec = "1"
rayon = "1"
lru = "0.6"
[target.'cfg(windows)'.dependencies]
winapi = "0.3"
@@ -73,7 +73,7 @@ overflow-checks = true
[features]
default = ["mmap"]
mmap = ["fs2", "tempfile", "memmap"]
mmap = ["fs2", "tempfile", "memmap", "notify"]
brotli-compression = ["brotli"]
lz4-compression = ["lz4"]
failpoints = ["fail/failpoints"]

View File

@@ -1,148 +0,0 @@
// # Custom collector example
//
// This example shows how you can implement your own
// collector. As an example, we will compute a collector
// that computes the standard deviation of a given fast field.
//
// Of course, you can have a look at the tantivy's built-in collectors
// such as the `CountCollector` for more examples.
// ---
// Importing tantivy...
use crate::collector::{Collector, SegmentCollector};
use crate::fastfield::FastFieldReader;
use crate::schema::Field;
use crate::{Score, SegmentReader, TantivyError};
/// The `FilterCollector` collector filters docs using a u64 fast field value and a predicate.
/// Only the documents for which the predicate returned "true" will be passed on to the next collector.
///
/// ```rust
/// use tantivy::collector::{TopDocs, FilterCollector};
/// use tantivy::query::QueryParser;
/// use tantivy::schema::{Schema, TEXT, INDEXED, FAST};
/// use tantivy::{doc, DocAddress, Index};
///
/// let mut schema_builder = Schema::builder();
/// let title = schema_builder.add_text_field("title", TEXT);
/// let price = schema_builder.add_u64_field("price", INDEXED | FAST);
/// let schema = schema_builder.build();
/// let index = Index::create_in_ram(schema);
///
/// let mut index_writer = index.writer_with_num_threads(1, 10_000_000).unwrap();
/// index_writer.add_document(doc!(title => "The Name of the Wind", price => 30_200u64));
/// index_writer.add_document(doc!(title => "The Diary of Muadib", price => 29_240u64));
/// index_writer.add_document(doc!(title => "A Dairy Cow", price => 21_240u64));
/// index_writer.add_document(doc!(title => "The Diary of a Young Girl", price => 20_120u64));
/// assert!(index_writer.commit().is_ok());
///
/// let reader = index.reader().unwrap();
/// let searcher = reader.searcher();
///
/// let query_parser = QueryParser::for_index(&index, vec![title]);
/// let query = query_parser.parse_query("diary").unwrap();
/// let no_filter_collector = FilterCollector::new(price, &|value| true, TopDocs::with_limit(2));
/// let top_docs = searcher.search(&query, &no_filter_collector).unwrap();
///
/// assert_eq!(top_docs.len(), 2);
/// assert_eq!(top_docs[0].1, DocAddress(0, 1));
/// assert_eq!(top_docs[1].1, DocAddress(0, 3));
///
/// let filter_all_collector = FilterCollector::new(price, &|value| false, TopDocs::with_limit(2));
/// let filtered_top_docs = searcher.search(&query, &filter_all_collector).unwrap();
///
/// assert_eq!(filtered_top_docs.len(), 0);
/// ```
pub struct FilterCollector<TCollector> {
field: Field,
collector: TCollector,
predicate: &'static (dyn Fn(u64) -> bool + Send + Sync),
}
impl<TCollector> FilterCollector<TCollector>
where
TCollector: Collector + Send + Sync,
{
pub fn new(
field: Field,
predicate: &'static (dyn Fn(u64) -> bool + Send + Sync),
collector: TCollector,
) -> FilterCollector<TCollector> {
FilterCollector {
field,
predicate,
collector,
}
}
}
impl<TCollector> Collector for FilterCollector<TCollector>
where
TCollector: Collector + Send + Sync,
{
// That's the type of our result.
// Our standard deviation will be a float.
type Fruit = TCollector::Fruit;
type Child = FilterSegmentCollector<TCollector::Child>;
fn for_segment(
&self,
segment_local_id: u32,
segment_reader: &SegmentReader,
) -> crate::Result<FilterSegmentCollector<TCollector::Child>> {
let fast_field_reader = segment_reader
.fast_fields()
.u64(self.field)
.ok_or_else(|| {
let field_name = segment_reader.schema().get_field_name(self.field);
TantivyError::SchemaError(format!(
"Field {:?} is not a u64 fast field.",
field_name
))
})?;
let segment_collector = self
.collector
.for_segment(segment_local_id, segment_reader)?;
Ok(FilterSegmentCollector {
fast_field_reader,
segment_collector: segment_collector,
predicate: self.predicate,
})
}
fn requires_scoring(&self) -> bool {
self.collector.requires_scoring()
}
fn merge_fruits(
&self,
segment_fruits: Vec<<TCollector::Child as SegmentCollector>::Fruit>,
) -> crate::Result<TCollector::Fruit> {
self.collector.merge_fruits(segment_fruits)
}
}
pub struct FilterSegmentCollector<TSegmentCollector> {
fast_field_reader: FastFieldReader<u64>,
segment_collector: TSegmentCollector,
predicate: &'static (dyn Fn(u64) -> bool + Send + Sync),
}
impl<TSegmentCollector> SegmentCollector for FilterSegmentCollector<TSegmentCollector>
where
TSegmentCollector: SegmentCollector,
{
type Fruit = TSegmentCollector::Fruit;
fn collect(&mut self, doc: u32, score: Score) {
let value = self.fast_field_reader.get(doc);
if (self.predicate)(value) {
self.segment_collector.collect(doc, score)
}
}
fn harvest(self) -> <TSegmentCollector as SegmentCollector>::Fruit {
self.segment_collector.harvest()
}
}

View File

@@ -114,9 +114,6 @@ use crate::query::Weight;
mod docset_collector;
pub use self::docset_collector::DocSetCollector;
mod filter_collector_wrapper;
pub use self::filter_collector_wrapper::FilterCollector;
/// `Fruit` is the type for the result of our collection.
/// e.g. `usize` for the `Count` collector.
pub trait Fruit: Send + downcast_rs::Downcast {}

View File

@@ -66,6 +66,10 @@ pub(crate) fn compute_num_bits(n: u64) -> u8 {
}
}
pub(crate) fn is_power_of_2(n: usize) -> bool {
(n > 0) && (n & (n - 1) == 0)
}
/// Has length trait
pub trait HasLen {
/// Return length

View File

@@ -5,7 +5,6 @@ use crate::core::SegmentId;
use crate::core::SegmentMeta;
use crate::core::SegmentMetaInventory;
use crate::core::META_FILEPATH;
use crate::directory::error::OpenReadError;
use crate::directory::ManagedDirectory;
#[cfg(feature = "mmap")]
use crate::directory::MmapDirectory;
@@ -60,7 +59,7 @@ impl Index {
/// Examines the directory to see if it contains an index.
///
/// Effectively, it only checks for the presence of the `meta.json` file.
pub fn exists<Dir: Directory>(dir: &Dir) -> Result<bool, OpenReadError> {
pub fn exists<Dir: Directory>(dir: &Dir) -> bool {
dir.exists(&META_FILEPATH)
}
@@ -107,7 +106,7 @@ impl Index {
schema: Schema,
) -> crate::Result<Index> {
let mmap_directory = MmapDirectory::open(directory_path)?;
if Index::exists(&mmap_directory)? {
if Index::exists(&mmap_directory) {
return Err(TantivyError::IndexAlreadyExists);
}
Index::create(mmap_directory, schema)
@@ -115,7 +114,7 @@ impl Index {
/// Opens or creates a new index in the provided directory
pub fn open_or_create<Dir: Directory>(dir: Dir, schema: Schema) -> crate::Result<Index> {
if !Index::exists(&dir)? {
if !Index::exists(&dir) {
return Index::create(dir, schema);
}
let index = Index::open(dir)?;
@@ -400,7 +399,7 @@ impl fmt::Debug for Index {
#[cfg(test)]
mod tests {
use crate::directory::{RAMDirectory, WatchCallback};
use crate::directory::RAMDirectory;
use crate::schema::Field;
use crate::schema::{Schema, INDEXED, TEXT};
use crate::IndexReader;
@@ -424,24 +423,24 @@ mod tests {
#[test]
fn test_index_exists() {
let directory = RAMDirectory::create();
assert!(!Index::exists(&directory).unwrap());
assert!(!Index::exists(&directory));
assert!(Index::create(directory.clone(), throw_away_schema()).is_ok());
assert!(Index::exists(&directory).unwrap());
assert!(Index::exists(&directory));
}
#[test]
fn open_or_create_should_create() {
let directory = RAMDirectory::create();
assert!(!Index::exists(&directory).unwrap());
assert!(!Index::exists(&directory));
assert!(Index::open_or_create(directory.clone(), throw_away_schema()).is_ok());
assert!(Index::exists(&directory).unwrap());
assert!(Index::exists(&directory));
}
#[test]
fn open_or_create_should_open() {
let directory = RAMDirectory::create();
assert!(Index::create(directory.clone(), throw_away_schema()).is_ok());
assert!(Index::exists(&directory).unwrap());
assert!(Index::exists(&directory));
assert!(Index::open_or_create(directory, throw_away_schema()).is_ok());
}
@@ -449,7 +448,7 @@ mod tests {
fn create_should_wipeoff_existing() {
let directory = RAMDirectory::create();
assert!(Index::create(directory.clone(), throw_away_schema()).is_ok());
assert!(Index::exists(&directory).unwrap());
assert!(Index::exists(&directory));
assert!(Index::create(directory.clone(), Schema::builder().build()).is_ok());
}
@@ -457,7 +456,7 @@ mod tests {
fn open_or_create_exists_but_schema_does_not_match() {
let directory = RAMDirectory::create();
assert!(Index::create(directory.clone(), throw_away_schema()).is_ok());
assert!(Index::exists(&directory).unwrap());
assert!(Index::exists(&directory));
assert!(Index::open_or_create(directory.clone(), throw_away_schema()).is_ok());
let err = Index::open_or_create(directory, Schema::builder().build());
assert_eq!(
@@ -525,7 +524,7 @@ mod tests {
assert_eq!(reader.searcher().num_docs(), 0);
writer.add_document(doc!(field=>1u64));
let (sender, receiver) = crossbeam::channel::unbounded();
let _handle = index.directory_mut().watch(WatchCallback::new(move || {
let _handle = index.directory_mut().watch(Box::new(move || {
let _ = sender.send(());
}));
writer.commit().unwrap();
@@ -555,11 +554,9 @@ mod tests {
fn test_index_on_commit_reload_policy_aux(field: Field, index: &Index, reader: &IndexReader) {
let mut reader_index = reader.index();
let (sender, receiver) = crossbeam::channel::unbounded();
let _watch_handle = reader_index
.directory_mut()
.watch(WatchCallback::new(move || {
let _ = sender.send(());
}));
let _watch_handle = reader_index.directory_mut().watch(Box::new(move || {
let _ = sender.send(());
}));
let mut writer = index.writer_for_tests().unwrap();
assert_eq!(reader.searcher().num_docs(), 0);
writer.add_document(doc!(field=>1u64));
@@ -598,7 +595,7 @@ mod tests {
writer.add_document(doc!(field => i));
}
let (sender, receiver) = crossbeam::channel::unbounded();
let _handle = directory.watch(WatchCallback::new(move || {
let _handle = directory.watch(Box::new(move || {
let _ = sender.send(());
}));
writer.commit().unwrap();

View File

@@ -301,7 +301,7 @@ mod tests {
let json = serde_json::ser::to_string(&index_metas).expect("serialization failed");
assert_eq!(
json,
r#"{"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","tokenizer":"default"},"stored":false}}],"opstamp":0}"#
r#"{"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","tokenizer":"default","fieldnorms":true},"stored":false}}],"opstamp":0}"#
);
}
}

View File

@@ -90,9 +90,9 @@ impl InvertedIndexReader {
term_info: &TermInfo,
block_postings: &mut BlockSegmentPostings,
) -> io::Result<()> {
let start_offset = term_info.postings_start_offset as usize;
let stop_offset = term_info.postings_stop_offset as usize;
let postings_slice = self.postings_file_slice.slice(start_offset, stop_offset);
let postings_slice = self
.postings_file_slice
.slice_from(term_info.postings_offset as usize);
block_postings.reset(term_info.doc_freq, postings_slice.read_bytes()?);
Ok(())
}
@@ -121,10 +121,8 @@ impl InvertedIndexReader {
term_info: &TermInfo,
requested_option: IndexRecordOption,
) -> io::Result<BlockSegmentPostings> {
let postings_data = self.postings_file_slice.slice(
term_info.postings_start_offset as usize,
term_info.postings_stop_offset as usize,
);
let offset = term_info.postings_offset as usize;
let postings_data = self.postings_file_slice.slice_from(offset);
BlockSegmentPostings::open(
term_info.doc_freq,
postings_data,

View File

@@ -12,7 +12,7 @@ pub use self::executor::Executor;
pub use self::index::Index;
pub use self::index_meta::{IndexMeta, SegmentMeta, SegmentMetaInventory};
pub use self::inverted_index_reader::InvertedIndexReader;
pub use self::searcher::{FieldSearcher, Searcher};
pub use self::searcher::Searcher;
pub use self::segment::Segment;
pub use self::segment::SerializableSegment;
pub use self::segment_component::SegmentComponent;

View File

@@ -168,7 +168,6 @@ impl Searcher {
}
}
/// **Experimental API** `FieldSearcher` only gives access to a stream over the terms of a field.
pub struct FieldSearcher {
inv_index_readers: Vec<Arc<InvertedIndexReader>>,
}
@@ -180,11 +179,7 @@ impl FieldSearcher {
/// Returns a Stream over all of the sorted unique terms of
/// for the given field.
///
/// This method does not take into account which documents are deleted, so
/// in presence of deletes some terms may not actually exist in any document
/// anymore.
pub fn terms(&self) -> TermMerger {
pub fn terms(&self) -> TermMerger<'_> {
let term_streamers: Vec<_> = self
.inv_index_readers
.iter()

View File

@@ -131,7 +131,7 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
fn delete(&self, path: &Path) -> Result<(), DeleteError>;
/// Returns true iff the file exists
fn exists(&self, path: &Path) -> Result<bool, OpenReadError>;
fn exists(&self, path: &Path) -> bool;
/// Opens a writer for the *virtual file* associated with
/// a Path.

View File

@@ -1,178 +0,0 @@
use crate::directory::{WatchCallback, WatchCallbackList, WatchHandle};
use crc32fast::Hasher;
use std::fs;
use std::io;
use std::io::BufRead;
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
pub const POLLING_INTERVAL: Duration = Duration::from_millis(if cfg!(test) { 1 } else { 500 });
// Watches a file and executes registered callbacks when the file is modified.
pub struct FileWatcher {
path: Arc<PathBuf>,
callbacks: Arc<WatchCallbackList>,
state: Arc<AtomicUsize>, // 0: new, 1: runnable, 2: terminated
}
impl FileWatcher {
pub fn new(path: &PathBuf) -> FileWatcher {
FileWatcher {
path: Arc::new(path.clone()),
callbacks: Default::default(),
state: Default::default(),
}
}
pub fn spawn(&self) {
if self.state.compare_and_swap(0, 1, Ordering::SeqCst) > 0 {
return;
}
let path = self.path.clone();
let callbacks = self.callbacks.clone();
let state = self.state.clone();
thread::Builder::new()
.name("thread-tantivy-meta-file-watcher".to_string())
.spawn(move || {
let mut current_checksum = None;
while state.load(Ordering::SeqCst) == 1 {
if let Ok(checksum) = FileWatcher::compute_checksum(&path) {
// `None.unwrap_or_else(|| !checksum) != checksum` evaluates to `true`
if current_checksum.unwrap_or_else(|| !checksum) != checksum {
info!("Meta file {:?} was modified", path);
current_checksum = Some(checksum);
futures::executor::block_on(callbacks.broadcast());
}
}
thread::sleep(POLLING_INTERVAL);
}
})
.expect("Failed to spawn meta file watcher thread");
}
pub fn watch(&self, callback: WatchCallback) -> WatchHandle {
let handle = self.callbacks.subscribe(callback);
self.spawn();
handle
}
fn compute_checksum(path: &PathBuf) -> Result<u32, io::Error> {
let reader = match fs::File::open(path) {
Ok(f) => io::BufReader::new(f),
Err(e) => {
warn!("Failed to open meta file {:?}: {:?}", path, e);
return Err(e);
}
};
let mut hasher = Hasher::new();
for line in reader.lines() {
hasher.update(line?.as_bytes())
}
Ok(hasher.finalize())
}
}
impl Drop for FileWatcher {
fn drop(&mut self) {
self.state.store(2, Ordering::SeqCst);
}
}
#[cfg(test)]
mod tests {
use std::mem;
use crate::directory::mmap_directory::atomic_write;
use super::*;
#[test]
fn test_file_watcher_drop_watcher() -> crate::Result<()> {
let tmp_dir = tempfile::TempDir::new()?;
let tmp_file = tmp_dir.path().join("watched.txt");
let counter: Arc<AtomicUsize> = Default::default();
let (tx, rx) = crossbeam::channel::unbounded();
let timeout = Duration::from_millis(100);
let watcher = FileWatcher::new(&tmp_file);
let state = watcher.state.clone();
assert_eq!(state.load(Ordering::SeqCst), 0);
let counter_clone = counter.clone();
let _handle = watcher.watch(WatchCallback::new(move || {
let val = counter_clone.fetch_add(1, Ordering::SeqCst);
tx.send(val + 1).unwrap();
}));
assert_eq!(counter.load(Ordering::SeqCst), 0);
assert_eq!(state.load(Ordering::SeqCst), 1);
atomic_write(&tmp_file, b"foo")?;
assert_eq!(rx.recv_timeout(timeout), Ok(1));
atomic_write(&tmp_file, b"foo")?;
assert!(rx.recv_timeout(timeout).is_err());
atomic_write(&tmp_file, b"bar")?;
assert_eq!(rx.recv_timeout(timeout), Ok(2));
mem::drop(watcher);
atomic_write(&tmp_file, b"qux")?;
thread::sleep(Duration::from_millis(10));
assert_eq!(counter.load(Ordering::SeqCst), 2);
assert_eq!(state.load(Ordering::SeqCst), 2);
Ok(())
}
#[test]
fn test_file_watcher_drop_handle() -> crate::Result<()> {
let tmp_dir = tempfile::TempDir::new()?;
let tmp_file = tmp_dir.path().join("watched.txt");
let counter: Arc<AtomicUsize> = Default::default();
let (tx, rx) = crossbeam::channel::unbounded();
let timeout = Duration::from_millis(100);
let watcher = FileWatcher::new(&tmp_file);
let state = watcher.state.clone();
assert_eq!(state.load(Ordering::SeqCst), 0);
let counter_clone = counter.clone();
let handle = watcher.watch(WatchCallback::new(move || {
let val = counter_clone.fetch_add(1, Ordering::SeqCst);
tx.send(val + 1).unwrap();
}));
assert_eq!(counter.load(Ordering::SeqCst), 0);
assert_eq!(state.load(Ordering::SeqCst), 1);
atomic_write(&tmp_file, b"foo")?;
assert_eq!(rx.recv_timeout(timeout), Ok(1));
mem::drop(handle);
atomic_write(&tmp_file, b"qux")?;
assert_eq!(counter.load(Ordering::SeqCst), 1);
assert_eq!(state.load(Ordering::SeqCst), 1);
Ok(())
}
}

View File

@@ -307,7 +307,7 @@ impl Directory for ManagedDirectory {
self.directory.delete(path)
}
fn exists(&self, path: &Path) -> Result<bool, OpenReadError> {
fn exists(&self, path: &Path) -> bool {
self.directory.exists(path)
}
@@ -355,22 +355,22 @@ mod tests_mmap_specific {
managed_directory
.atomic_write(test_path2, &[0u8, 1u8])
.unwrap();
assert!(managed_directory.exists(test_path1).unwrap());
assert!(managed_directory.exists(test_path2).unwrap());
assert!(managed_directory.exists(test_path1));
assert!(managed_directory.exists(test_path2));
let living_files: HashSet<PathBuf> = [test_path1.to_owned()].iter().cloned().collect();
assert!(managed_directory.garbage_collect(|| living_files).is_ok());
assert!(managed_directory.exists(test_path1).unwrap());
assert!(!managed_directory.exists(test_path2).unwrap());
assert!(managed_directory.exists(test_path1));
assert!(!managed_directory.exists(test_path2));
}
{
let mmap_directory = MmapDirectory::open(&tempdir_path).unwrap();
let mut managed_directory = ManagedDirectory::wrap(mmap_directory).unwrap();
assert!(managed_directory.exists(test_path1).unwrap());
assert!(!managed_directory.exists(test_path2).unwrap());
assert!(managed_directory.exists(test_path1));
assert!(!managed_directory.exists(test_path2));
let living_files: HashSet<PathBuf> = HashSet::new();
assert!(managed_directory.garbage_collect(|| living_files).is_ok());
assert!(!managed_directory.exists(test_path1).unwrap());
assert!(!managed_directory.exists(test_path2).unwrap());
assert!(!managed_directory.exists(test_path1));
assert!(!managed_directory.exists(test_path2));
}
}
@@ -387,7 +387,7 @@ mod tests_mmap_specific {
let mut write = managed_directory.open_write(test_path1).unwrap();
write.write_all(&[0u8, 1u8]).unwrap();
write.terminate().unwrap();
assert!(managed_directory.exists(test_path1).unwrap());
assert!(managed_directory.exists(test_path1));
let _mmap_read = managed_directory.open_read(test_path1).unwrap();
assert!(managed_directory
@@ -395,15 +395,15 @@ mod tests_mmap_specific {
.is_ok());
if cfg!(target_os = "windows") {
// On Windows, gc should try and fail the file as it is mmapped.
assert!(managed_directory.exists(test_path1).unwrap());
assert!(managed_directory.exists(test_path1));
// unmap should happen here.
drop(_mmap_read);
// The file should still be in the list of managed file and
// eventually be deleted once mmap is released.
assert!(managed_directory.garbage_collect(|| living_files).is_ok());
assert!(!managed_directory.exists(test_path1).unwrap());
assert!(!managed_directory.exists(test_path1));
} else {
assert!(!managed_directory.exists(test_path1).unwrap());
assert!(!managed_directory.exists(test_path1));
}
}

View File

@@ -1,7 +1,6 @@
use crate::core::META_FILEPATH;
use crate::directory::error::LockError;
use crate::directory::error::{DeleteError, OpenDirectoryError, OpenReadError, OpenWriteError};
use crate::directory::file_watcher::FileWatcher;
use crate::directory::AntiCallToken;
use crate::directory::BoxedData;
use crate::directory::Directory;
@@ -9,10 +8,14 @@ use crate::directory::DirectoryLock;
use crate::directory::FileSlice;
use crate::directory::Lock;
use crate::directory::WatchCallback;
use crate::directory::WatchCallbackList;
use crate::directory::WatchHandle;
use crate::directory::{TerminatingWrite, WritePtr};
use fs2::FileExt;
use memmap::Mmap;
use notify::RawEvent;
use notify::RecursiveMode;
use notify::Watcher;
use serde::{Deserialize, Serialize};
use stable_deref_trait::StableDeref;
use std::convert::From;
@@ -23,9 +26,12 @@ use std::io::{self, Seek, SeekFrom};
use std::io::{BufWriter, Read, Write};
use std::path::{Path, PathBuf};
use std::result;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::RwLock;
use std::sync::Weak;
use std::thread;
use std::{collections::HashMap, ops::Deref};
use tempfile::TempDir;
@@ -131,6 +137,67 @@ impl MmapCache {
}
}
struct WatcherWrapper {
_watcher: Mutex<notify::RecommendedWatcher>,
watcher_router: Arc<WatchCallbackList>,
}
impl WatcherWrapper {
pub fn new(path: &Path) -> Result<Self, OpenDirectoryError> {
let (tx, watcher_recv): (Sender<RawEvent>, Receiver<RawEvent>) = channel();
// We need to initialize the
let watcher = notify::raw_watcher(tx)
.and_then(|mut watcher| {
watcher.watch(path, RecursiveMode::Recursive)?;
Ok(watcher)
})
.map_err(|err| match err {
notify::Error::PathNotFound => OpenDirectoryError::DoesNotExist(path.to_owned()),
_ => {
panic!("Unknown error while starting watching directory {:?}", path);
}
})?;
let watcher_router: Arc<WatchCallbackList> = Default::default();
let watcher_router_clone = watcher_router.clone();
thread::Builder::new()
.name("meta-file-watch-thread".to_string())
.spawn(move || {
loop {
match watcher_recv.recv().map(|evt| evt.path) {
Ok(Some(changed_path)) => {
// ... Actually subject to false positive.
// We might want to be more accurate than this at one point.
if let Some(filename) = changed_path.file_name() {
if filename == *META_FILEPATH {
let _ = watcher_router_clone.broadcast();
}
}
}
Ok(None) => {
// not an event we are interested in.
}
Err(_e) => {
// the watch send channel was dropped
break;
}
}
}
})
.map_err(|io_error| OpenDirectoryError::IoError {
io_error,
directory_path: path.to_path_buf(),
})?;
Ok(WatcherWrapper {
_watcher: Mutex::new(watcher),
watcher_router,
})
}
pub fn watch(&mut self, watch_callback: WatchCallback) -> WatchHandle {
self.watcher_router.subscribe(watch_callback)
}
}
/// Directory storing data in files, read via mmap.
///
/// The Mmap object are cached to limit the
@@ -152,21 +219,40 @@ struct MmapDirectoryInner {
root_path: PathBuf,
mmap_cache: RwLock<MmapCache>,
_temp_directory: Option<TempDir>,
watcher: FileWatcher,
watcher: RwLock<Option<WatcherWrapper>>,
}
impl MmapDirectoryInner {
fn new(root_path: PathBuf, temp_directory: Option<TempDir>) -> MmapDirectoryInner {
MmapDirectoryInner {
root_path,
mmap_cache: Default::default(),
_temp_directory: temp_directory,
watcher: FileWatcher::new(&root_path.join(*META_FILEPATH)),
root_path: root_path,
watcher: RwLock::new(None),
}
}
fn watch(&self, callback: WatchCallback) -> crate::Result<WatchHandle> {
Ok(self.watcher.watch(callback))
fn watch(&self, watch_callback: WatchCallback) -> crate::Result<WatchHandle> {
// a lot of juggling here, to ensure we don't do anything that panics
// while the rwlock is held. That way we ensure that the rwlock cannot
// be poisoned.
//
// The downside is that we might create a watch wrapper that is not useful.
let need_initialization = self.watcher.read().unwrap().is_none();
if need_initialization {
let watch_wrapper = WatcherWrapper::new(&self.root_path)?;
let mut watch_wlock = self.watcher.write().unwrap();
// the watcher could have been initialized when we released the lock, and
// we do not want to lose the watched files that were set.
if watch_wlock.is_none() {
*watch_wlock = Some(watch_wrapper);
}
}
if let Some(watch_wrapper) = self.watcher.write().unwrap().as_mut() {
Ok(watch_wrapper.watch(watch_callback))
} else {
unreachable!("At this point, watch wrapper is supposed to be initialized");
}
}
}
@@ -327,24 +413,6 @@ impl Deref for MmapArc {
}
unsafe impl StableDeref for MmapArc {}
/// Writes a file in an atomic manner.
pub(crate) fn atomic_write(path: &Path, content: &[u8]) -> io::Result<()> {
// We create the temporary file in the same directory as the target file.
// Indeed the canonical temp directory and the target file might sit in different
// filesystem, in which case the atomic write may actually not work.
let parent_path = path.parent().ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"Path {:?} does not have parent directory.",
)
})?;
let mut tempfile = tempfile::Builder::new().tempfile_in(&parent_path)?;
tempfile.write_all(content)?;
tempfile.flush()?;
tempfile.into_temp_path().persist(path)?;
Ok(())
}
impl Directory for MmapDirectory {
fn open_read(&self, path: &Path) -> result::Result<FileSlice, OpenReadError> {
debug!("Open Read {:?}", path);
@@ -388,9 +456,9 @@ impl Directory for MmapDirectory {
}
}
fn exists(&self, path: &Path) -> Result<bool, OpenReadError> {
fn exists(&self, path: &Path) -> bool {
let full_path = self.resolve_path(path);
Ok(full_path.exists())
full_path.exists()
}
fn open_write(&self, path: &Path) -> Result<WritePtr, OpenWriteError> {
@@ -445,8 +513,12 @@ impl Directory for MmapDirectory {
fn atomic_write(&self, path: &Path, content: &[u8]) -> io::Result<()> {
debug!("Atomic Write {:?}", path);
let mut tempfile = tempfile::Builder::new().tempfile_in(&self.inner.root_path)?;
tempfile.write_all(content)?;
tempfile.flush()?;
let full_path = self.resolve_path(path);
atomic_write(&full_path, content)
tempfile.into_temp_path().persist(full_path)?;
Ok(())
}
fn acquire_lock(&self, lock: &Lock) -> Result<DirectoryLock, LockError> {
@@ -485,6 +557,8 @@ mod tests {
use crate::Index;
use crate::ReloadPolicy;
use crate::{common::HasLen, indexer::LogMergePolicy};
use std::fs;
use std::sync::atomic::{AtomicUsize, Ordering};
#[test]
fn test_open_non_existent_path() {
@@ -573,6 +647,27 @@ mod tests {
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 0);
}
#[test]
fn test_watch_wrapper() {
let counter: Arc<AtomicUsize> = Default::default();
let counter_clone = counter.clone();
let tmp_dir = tempfile::TempDir::new().unwrap();
let tmp_dirpath = tmp_dir.path().to_owned();
let mut watch_wrapper = WatcherWrapper::new(&tmp_dirpath).unwrap();
let tmp_file = tmp_dirpath.join(*META_FILEPATH);
let _handle = watch_wrapper.watch(Box::new(move || {
counter_clone.fetch_add(1, Ordering::SeqCst);
}));
let (sender, receiver) = crossbeam::channel::unbounded();
let _handle2 = watch_wrapper.watch(Box::new(move || {
let _ = sender.send(());
}));
assert_eq!(counter.load(Ordering::SeqCst), 0);
fs::write(&tmp_file, b"whateverwilldo").unwrap();
assert!(receiver.recv().is_ok());
assert!(counter.load(Ordering::SeqCst) >= 1);
}
#[test]
fn test_mmap_released() {
let mmap_directory = MmapDirectory::create_from_tempdir().unwrap();

View File

@@ -10,7 +10,6 @@ mod mmap_directory;
mod directory;
mod directory_lock;
mod file_slice;
mod file_watcher;
mod footer;
mod managed_directory;
mod owned_bytes;

View File

@@ -177,15 +177,8 @@ impl Directory for RAMDirectory {
self.fs.write().unwrap().delete(path)
}
fn exists(&self, path: &Path) -> Result<bool, OpenReadError> {
Ok(self
.fs
.read()
.map_err(|e| OpenReadError::IOError {
io_error: io::Error::new(io::ErrorKind::Other, e.to_string()),
filepath: path.to_path_buf(),
})?
.exists(path))
fn exists(&self, path: &Path) -> bool {
self.fs.read().unwrap().exists(path)
}
fn open_write(&self, path: &Path) -> Result<WritePtr, OpenWriteError> {

View File

@@ -130,7 +130,7 @@ fn ram_directory_panics_if_flush_forgotten() {
fn test_simple(directory: &dyn Directory) -> crate::Result<()> {
let test_path: &'static Path = Path::new("some_path_for_test");
let mut write_file = directory.open_write(test_path)?;
assert!(directory.exists(test_path).unwrap());
assert!(directory.exists(test_path));
write_file.write_all(&[4])?;
write_file.write_all(&[3])?;
write_file.write_all(&[7, 3, 5])?;
@@ -139,14 +139,14 @@ fn test_simple(directory: &dyn Directory) -> crate::Result<()> {
assert_eq!(read_file.as_slice(), &[4u8, 3u8, 7u8, 3u8, 5u8]);
mem::drop(read_file);
assert!(directory.delete(test_path).is_ok());
assert!(!directory.exists(test_path).unwrap());
assert!(!directory.exists(test_path));
Ok(())
}
fn test_rewrite_forbidden(directory: &dyn Directory) -> crate::Result<()> {
let test_path: &'static Path = Path::new("some_path_for_test");
directory.open_write(test_path)?;
assert!(directory.exists(test_path).unwrap());
assert!(directory.exists(test_path));
assert!(directory.open_write(test_path).is_err());
assert!(directory.delete(test_path).is_ok());
Ok(())
@@ -157,7 +157,7 @@ fn test_write_create_the_file(directory: &dyn Directory) {
{
assert!(directory.open_read(test_path).is_err());
let _w = directory.open_write(test_path).unwrap();
assert!(directory.exists(test_path).unwrap());
assert!(directory.exists(test_path));
assert!(directory.open_read(test_path).is_ok());
assert!(directory.delete(test_path).is_ok());
}
@@ -190,33 +190,38 @@ fn test_directory_delete(directory: &dyn Directory) -> crate::Result<()> {
}
fn test_watch(directory: &dyn Directory) {
let num_progress: Arc<AtomicUsize> = Default::default();
let counter: Arc<AtomicUsize> = Default::default();
let (tx, rx) = crossbeam::channel::unbounded();
let timeout = Duration::from_millis(500);
let handle = directory
.watch(WatchCallback::new(move || {
let val = counter.fetch_add(1, SeqCst);
tx.send(val + 1).unwrap();
let counter_clone = counter.clone();
let (sender, receiver) = crossbeam::channel::unbounded();
let watch_callback = Box::new(move || {
counter_clone.fetch_add(1, SeqCst);
});
// This callback is used to synchronize watching in our unit test.
// We bind it to a variable because the callback is removed when that
// handle is dropped.
let watch_handle = directory.watch(watch_callback).unwrap();
let _progress_listener = directory
.watch(Box::new(move || {
let val = num_progress.fetch_add(1, SeqCst);
let _ = sender.send(val);
}))
.unwrap();
for i in 0..10 {
assert!(i <= counter.load(SeqCst));
assert!(directory
.atomic_write(Path::new("meta.json"), b"random_test_data_2")
.is_ok());
assert_eq!(receiver.recv_timeout(Duration::from_millis(500)), Ok(i));
assert!(i + 1 <= counter.load(SeqCst)); // notify can trigger more than once.
}
mem::drop(watch_handle);
assert!(directory
.atomic_write(Path::new("meta.json"), b"foo")
.atomic_write(Path::new("meta.json"), b"random_test_data")
.is_ok());
assert_eq!(rx.recv_timeout(timeout), Ok(1));
assert!(directory
.atomic_write(Path::new("meta.json"), b"bar")
.is_ok());
assert_eq!(rx.recv_timeout(timeout), Ok(2));
mem::drop(handle);
assert!(directory
.atomic_write(Path::new("meta.json"), b"qux")
.is_ok());
assert!(rx.recv_timeout(timeout).is_err());
assert!(receiver.recv_timeout(Duration::from_millis(500)).is_ok());
assert!(10 <= counter.load(SeqCst));
}
fn test_lock_non_blocking(directory: &dyn Directory) {

View File

@@ -4,20 +4,8 @@ use std::sync::Arc;
use std::sync::RwLock;
use std::sync::Weak;
/// Cloneable wrapper for callbacks registered when watching files of a `Directory`.
#[derive(Clone)]
pub struct WatchCallback(Arc<Box<dyn Fn() + Sync + Send>>);
impl WatchCallback {
/// Wraps a `Fn()` to create a WatchCallback.
pub fn new<F: Fn() + Sync + Send + 'static>(op: F) -> Self {
WatchCallback(Arc::new(Box::new(op)))
}
fn call(&self) {
self.0()
}
}
/// Type alias for callbacks registered when watching files of a `Directory`.
pub type WatchCallback = Box<dyn Fn() + Sync + Send>;
/// Helper struct to implement the watch method in `Directory` implementations.
///
@@ -46,7 +34,7 @@ impl WatchHandle {
///
/// This function is only useful when implementing a readonly directory.
pub fn empty() -> WatchHandle {
WatchHandle::new(Arc::new(WatchCallback::new(|| {})))
WatchHandle::new(Arc::new(Box::new(|| {})))
}
}
@@ -59,13 +47,13 @@ impl WatchCallbackList {
WatchHandle::new(watch_callback_arc)
}
fn list_callback(&self) -> Vec<WatchCallback> {
let mut callbacks: Vec<WatchCallback> = vec![];
fn list_callback(&self) -> Vec<Arc<WatchCallback>> {
let mut callbacks = vec![];
let mut router_wlock = self.router.write().unwrap();
let mut i = 0;
while i < router_wlock.len() {
if let Some(watch) = router_wlock[i].upgrade() {
callbacks.push(watch.as_ref().clone());
callbacks.push(watch);
i += 1;
} else {
router_wlock.swap_remove(i);
@@ -87,7 +75,7 @@ impl WatchCallbackList {
.name("watch-callbacks".to_string())
.spawn(move || {
for callback in callbacks {
callback.call();
callback();
}
let _ = sender.send(());
});
@@ -103,7 +91,7 @@ impl WatchCallbackList {
#[cfg(test)]
mod tests {
use crate::directory::{WatchCallback, WatchCallbackList};
use crate::directory::WatchCallbackList;
use futures::executor::block_on;
use std::mem;
use std::sync::atomic::{AtomicUsize, Ordering};
@@ -114,7 +102,7 @@ mod tests {
let watch_event_router = WatchCallbackList::default();
let counter: Arc<AtomicUsize> = Default::default();
let counter_clone = counter.clone();
let inc_callback = WatchCallback::new(move || {
let inc_callback = Box::new(move || {
counter_clone.fetch_add(1, Ordering::SeqCst);
});
block_on(watch_event_router.broadcast());
@@ -142,7 +130,7 @@ mod tests {
let counter: Arc<AtomicUsize> = Default::default();
let inc_callback = |inc: usize| {
let counter_clone = counter.clone();
WatchCallback::new(move || {
Box::new(move || {
counter_clone.fetch_add(inc, Ordering::SeqCst);
})
};
@@ -170,7 +158,7 @@ mod tests {
let watch_event_router = WatchCallbackList::default();
let counter: Arc<AtomicUsize> = Default::default();
let counter_clone = counter.clone();
let inc_callback = WatchCallback::new(move || {
let inc_callback = Box::new(move || {
counter_clone.fetch_add(1, Ordering::SeqCst);
});
let handle_a = watch_event_router.subscribe(inc_callback);

View File

@@ -86,7 +86,7 @@ mod tests {
let term = Term::from_field_bytes(field, b"lucene".as_ref());
let term_query = TermQuery::new(term, IndexRecordOption::Basic);
let term_weight = term_query.specialized_weight(&searcher, true)?;
let term_scorer = term_weight.specialized_scorer(searcher.segment_reader(0), 1.0)?;
let term_scorer = term_weight.specialized_scorer(searcher.segment_reader(0), 1.0f32)?;
assert_eq!(term_scorer.doc(), 0u32);
Ok(())
}
@@ -98,9 +98,9 @@ mod tests {
let field = searcher.schema().get_field("string_bytes").unwrap();
let term = Term::from_field_bytes(field, b"lucene".as_ref());
let term_query = TermQuery::new(term, IndexRecordOption::Basic);
let term_weight_err = term_query.specialized_weight(&searcher, false);
let term_weight_res = term_query.specialized_weight(&searcher, false);
assert!(matches!(
term_weight_err,
term_weight_res,
Err(crate::TantivyError::SchemaError(_))
));
Ok(())

View File

@@ -629,7 +629,7 @@ mod bench {
{
let fast_fields_composite = CompositeFile::open(&file).unwrap();
let data = fast_fields_composite.open_read(*FIELD).unwrap();
let fast_field_reader = FastFieldReader::<u64>::open(data).unwrap();
let fast_field_reader = FastFieldReader::<u64>::open(data);
b.iter(|| {
let n = test::black_box(7000u32);
@@ -663,7 +663,7 @@ mod bench {
{
let fast_fields_composite = CompositeFile::open(&file).unwrap();
let data = fast_fields_composite.open_read(*FIELD).unwrap();
let fast_field_reader = FastFieldReader::<u64>::open(data).unwrap();
let fast_field_reader = FastFieldReader::<u64>::open(data);
b.iter(|| {
let n = test::black_box(1000u32);

View File

@@ -49,7 +49,7 @@ impl FieldNormReaders {
///
/// This metric is important to compute the score of a
/// document : a document having a query word in one its short fields
/// (e.g. title) is likely to be more relevant than in one of its longer field
/// (e.g. title)is likely to be more relevant than in one of its longer field
/// (e.g. body).
///
/// tantivy encodes `fieldnorm` on one byte with some precision loss,
@@ -61,31 +61,31 @@ impl FieldNormReaders {
/// precompute computationally expensive functions of the fieldnorm
/// in a very short array.
#[derive(Clone)]
pub struct FieldNormReader {
data: OwnedBytes,
pub enum FieldNormReader {
ConstFieldNorm { fieldnorm_id: u8, num_docs: u32 },
OneByte(OwnedBytes),
}
impl FieldNormReader {
/// Creates a `FieldNormReader` with a constant fieldnorm.
pub fn constant(num_docs: u32, fieldnorm: u32) -> FieldNormReader {
let fieldnorm_id = fieldnorm_to_id(fieldnorm);
let field_norms_data = OwnedBytes::new(vec![fieldnorm_id; num_docs as usize]);
FieldNormReader::new(field_norms_data)
pub fn const_fieldnorm_id(fieldnorm_id: u8, num_docs: u32) -> FieldNormReader {
FieldNormReader::ConstFieldNorm {
fieldnorm_id,
num_docs,
}
}
/// Opens a field norm reader given its file.
pub fn open(fieldnorm_file: FileSlice) -> crate::Result<Self> {
let data = fieldnorm_file.read_bytes()?;
Ok(FieldNormReader::new(data))
}
fn new(data: OwnedBytes) -> Self {
FieldNormReader { data }
Ok(FieldNormReader::OneByte(data))
}
/// Returns the number of documents in this segment.
pub fn num_docs(&self) -> u32 {
self.data.len() as u32
match self {
Self::ConstFieldNorm { num_docs, .. } => *num_docs,
FieldNormReader::OneByte(vals) => vals.len() as u32,
}
}
/// Returns the `fieldnorm` associated to a doc id.
@@ -97,6 +97,7 @@ impl FieldNormReader {
///
/// The fieldnorm is effectively decoded from the
/// `fieldnorm_id` by doing a simple table lookup.
#[inline(always)]
pub fn fieldnorm(&self, doc_id: DocId) -> u32 {
let fieldnorm_id = self.fieldnorm_id(doc_id);
id_to_fieldnorm(fieldnorm_id)
@@ -105,7 +106,11 @@ impl FieldNormReader {
/// Returns the `fieldnorm_id` associated to a document.
#[inline(always)]
pub fn fieldnorm_id(&self, doc_id: DocId) -> u8 {
self.data.as_slice()[doc_id as usize]
match self {
FieldNormReader::ConstFieldNorm { fieldnorm_id, .. } => *fieldnorm_id,
FieldNormReader::OneByte(data) => data.as_slice()[doc_id as usize],
}
}
/// Converts a `fieldnorm_id` into a fieldnorm.
@@ -129,9 +134,7 @@ impl FieldNormReader {
.map(FieldNormReader::fieldnorm_to_id)
.collect::<Vec<u8>>();
let field_norms_data = OwnedBytes::new(field_norms_id);
FieldNormReader {
data: field_norms_data,
}
FieldNormReader::OneByte(field_norms_data)
}
}

View File

@@ -4,7 +4,7 @@ use super::fieldnorm_to_id;
use super::FieldNormsSerializer;
use crate::schema::Field;
use crate::schema::Schema;
use std::{io, iter};
use std::io;
/// The `FieldNormsWriter` is in charge of tracking the fieldnorm byte
/// of each document for each field with field norms.
@@ -13,7 +13,7 @@ use std::{io, iter};
/// byte per document per field.
pub struct FieldNormsWriter {
fields: Vec<Field>,
fieldnorms_buffer: Vec<Vec<u8>>,
fieldnorms_buffer: Vec<Option<Vec<u8>>>,
}
impl FieldNormsWriter {
@@ -23,7 +23,7 @@ impl FieldNormsWriter {
schema
.fields()
.filter_map(|(field, field_entry)| {
if field_entry.is_indexed() {
if field_entry.has_fieldnorms() {
Some(field)
} else {
None
@@ -36,17 +36,14 @@ impl FieldNormsWriter {
/// specified in the schema.
pub fn for_schema(schema: &Schema) -> FieldNormsWriter {
let fields = FieldNormsWriter::fields_with_fieldnorm(schema);
let max_field = fields
.iter()
.map(Field::field_id)
.max()
.map(|max_field_id| max_field_id as usize + 1)
.unwrap_or(0);
let num_fields = schema.num_fields();
let mut fieldnorms_buffer: Vec<Option<Vec<u8>>> = vec![None; num_fields];
for field in &fields {
fieldnorms_buffer[field.field_id() as usize] = Some(Vec::new());
}
FieldNormsWriter {
fields,
fieldnorms_buffer: iter::repeat_with(Vec::new)
.take(max_field)
.collect::<Vec<_>>(),
fieldnorms_buffer,
}
}
@@ -55,8 +52,10 @@ impl FieldNormsWriter {
///
/// Will extend with 0-bytes for documents that have not been seen.
pub fn fill_up_to_max_doc(&mut self, max_doc: DocId) {
for field in self.fields.iter() {
self.fieldnorms_buffer[field.field_id() as usize].resize(max_doc as usize, 0u8);
for buffer_opt in self.fieldnorms_buffer.iter_mut() {
if let Some(buffer) = buffer_opt {
buffer.resize(max_doc as usize, 0u8);
}
}
}
@@ -69,21 +68,22 @@ impl FieldNormsWriter {
/// * field - the field being set
/// * fieldnorm - the number of terms present in document `doc` in field `field`
pub fn record(&mut self, doc: DocId, field: Field, fieldnorm: u32) {
let fieldnorm_buffer: &mut Vec<u8> = &mut self.fieldnorms_buffer[field.field_id() as usize];
assert!(
fieldnorm_buffer.len() <= doc as usize,
"Cannot register a given fieldnorm twice"
);
// we fill intermediary `DocId` as having a fieldnorm of 0.
fieldnorm_buffer.resize(doc as usize + 1, 0u8);
fieldnorm_buffer[doc as usize] = fieldnorm_to_id(fieldnorm);
if let Some(fieldnorm_buffer) = self.fieldnorms_buffer[field.field_id() as usize].as_mut() {
assert!(
fieldnorm_buffer.len() <= doc as usize,
"Cannot register a given fieldnorm twice" // we fill intermediary `DocId` as having a fieldnorm of 0.
);
fieldnorm_buffer.resize(doc as usize + 1, 0u8);
fieldnorm_buffer[doc as usize] = fieldnorm_to_id(fieldnorm);
}
}
/// Serialize the seen fieldnorm values to the serializer for all fields.
pub fn serialize(&self, mut fieldnorms_serializer: FieldNormsSerializer) -> io::Result<()> {
for &field in self.fields.iter() {
let fieldnorm_values: &[u8] = &self.fieldnorms_buffer[field.field_id() as usize][..];
fieldnorms_serializer.serialize_field(field, fieldnorm_values)?;
if let Some(buffer) = self.fieldnorms_buffer[field.field_id() as usize].as_ref() {
fieldnorms_serializer.serialize_field(field, &buffer[..])?;
}
}
fieldnorms_serializer.close()?;
Ok(())

View File

@@ -160,7 +160,7 @@ pub use self::docset::{DocSet, TERMINATED};
pub use crate::common::HasLen;
pub use crate::common::{f64_to_u64, i64_to_u64, u64_to_f64, u64_to_i64};
pub use crate::core::{Executor, SegmentComponent};
pub use crate::core::{FieldSearcher, Index, IndexMeta, Searcher, Segment, SegmentId, SegmentMeta};
pub use crate::core::{Index, IndexMeta, Searcher, Segment, SegmentId, SegmentMeta};
pub use crate::core::{InvertedIndexReader, SegmentReader};
pub use crate::directory::Directory;
pub use crate::indexer::operation::UserOperation;

View File

@@ -15,15 +15,19 @@ mod stacker;
mod term_info;
pub(crate) use self::block_search::BlockSearcher;
pub use self::block_segment_postings::BlockSegmentPostings;
pub use self::postings::Postings;
pub(crate) use self::postings_writer::MultiFieldPostingsWriter;
pub use self::segment_postings::SegmentPostings;
pub use self::serializer::{FieldSerializer, InvertedIndexSerializer};
pub use self::postings::Postings;
pub(crate) use self::skip::{BlockInfo, SkipReader};
pub(crate) use self::stacker::compute_table_size;
pub use self::term_info::TermInfo;
pub use self::block_segment_postings::BlockSegmentPostings;
pub use self::segment_postings::SegmentPostings;
pub(crate) use self::stacker::compute_table_size;
pub(crate) type UnorderedTermId = u64;
#[cfg_attr(feature = "cargo-clippy", allow(clippy::enum_variant_names))]
@@ -47,13 +51,16 @@ pub mod tests {
use crate::indexer::SegmentWriter;
use crate::merge_policy::NoMergePolicy;
use crate::query::Scorer;
use crate::schema::{Document, Schema, Term, INDEXED, STRING, TEXT};
use crate::schema::{Field, TextOptions};
use crate::schema::{IndexRecordOption, TextFieldIndexing};
use crate::schema::{Schema, Term, INDEXED, TEXT};
use crate::tokenizer::{SimpleTokenizer, MAX_TOKEN_LEN};
use crate::DocId;
use crate::HasLen;
use crate::Score;
use once_cell::sync::Lazy;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use std::iter;
#[test]
@@ -484,6 +491,53 @@ pub mod tests {
Ok(())
}
pub static TERM_A: Lazy<Term> = Lazy::new(|| {
let field = Field::from_field_id(0);
Term::from_field_text(field, "a")
});
pub static TERM_B: Lazy<Term> = Lazy::new(|| {
let field = Field::from_field_id(0);
Term::from_field_text(field, "b")
});
pub static TERM_C: Lazy<Term> = Lazy::new(|| {
let field = Field::from_field_id(0);
Term::from_field_text(field, "c")
});
pub static TERM_D: Lazy<Term> = Lazy::new(|| {
let field = Field::from_field_id(0);
Term::from_field_text(field, "d")
});
pub static INDEX: Lazy<Index> = Lazy::new(|| {
let mut schema_builder = Schema::builder();
let text_field = schema_builder.add_text_field("text", STRING);
let schema = schema_builder.build();
let mut rng: StdRng = StdRng::from_seed([1u8; 32]);
let index = Index::create_in_ram(schema);
let posting_list_size = 1_000_000;
{
let mut index_writer = index.writer_for_tests().unwrap();
for _ in 0..posting_list_size {
let mut doc = Document::default();
if rng.gen_bool(1f64 / 15f64) {
doc.add_text(text_field, "a");
}
if rng.gen_bool(1f64 / 10f64) {
doc.add_text(text_field, "b");
}
if rng.gen_bool(1f64 / 5f64) {
doc.add_text(text_field, "c");
}
doc.add_text(text_field, "d");
index_writer.add_document(doc);
}
assert!(index_writer.commit().is_ok());
}
index
});
/// Wraps a given docset, and forward alls call but the
/// `.skip_next(...)`. This is useful to test that a specialized
/// implementation of `.skip_next(...)` is consistent
@@ -548,65 +602,15 @@ pub mod tests {
#[cfg(all(test, feature = "unstable"))]
mod bench {
use super::tests::*;
use crate::docset::TERMINATED;
use crate::query::Intersection;
use crate::schema::IndexRecordOption;
use crate::schema::{Document, Field, Schema, Term, STRING};
use crate::tests;
use crate::DocSet;
use crate::Index;
use once_cell::sync::Lazy;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use test::{self, Bencher};
pub static TERM_A: Lazy<Term> = Lazy::new(|| {
let field = Field::from_field_id(0);
Term::from_field_text(field, "a")
});
pub static TERM_B: Lazy<Term> = Lazy::new(|| {
let field = Field::from_field_id(0);
Term::from_field_text(field, "b")
});
pub static TERM_C: Lazy<Term> = Lazy::new(|| {
let field = Field::from_field_id(0);
Term::from_field_text(field, "c")
});
pub static TERM_D: Lazy<Term> = Lazy::new(|| {
let field = Field::from_field_id(0);
Term::from_field_text(field, "d")
});
pub static INDEX: Lazy<Index> = Lazy::new(|| {
let mut schema_builder = Schema::builder();
let text_field = schema_builder.add_text_field("text", STRING);
let schema = schema_builder.build();
let mut rng: StdRng = StdRng::from_seed([1u8; 32]);
let index = Index::create_in_ram(schema);
let posting_list_size = 1_000_000;
{
let mut index_writer = index.writer_for_tests().unwrap();
for _ in 0..posting_list_size {
let mut doc = Document::default();
if rng.gen_bool(1f64 / 15f64) {
doc.add_text(text_field, "a");
}
if rng.gen_bool(1f64 / 10f64) {
doc.add_text(text_field, "b");
}
if rng.gen_bool(1f64 / 5f64) {
doc.add_text(text_field, "c");
}
doc.add_text(text_field, "d");
index_writer.add_document(doc);
}
assert!(index_writer.commit().is_ok());
}
index
});
#[bench]
fn bench_segment_postings(b: &mut Bencher) {
let reader = INDEX.reader().unwrap();
@@ -616,9 +620,7 @@ mod bench {
b.iter(|| {
let mut segment_postings = segment_reader
.inverted_index(TERM_A.field())
.unwrap()
.read_postings(&*TERM_A, IndexRecordOption::Basic)
.unwrap()
.read_postings(&*TERM_A, IndexRecordOption::Basic)?
.unwrap();
while segment_postings.advance() != TERMINATED {}
});
@@ -632,25 +634,21 @@ mod bench {
b.iter(|| {
let segment_postings_a = segment_reader
.inverted_index(TERM_A.field())
.unwrap()
.read_postings(&*TERM_A, IndexRecordOption::Basic)
.unwrap()
.unwrap();
let segment_postings_b = segment_reader
.inverted_index(TERM_B.field())
.unwrap()
.read_postings(&*TERM_B, IndexRecordOption::Basic)
.unwrap()
.unwrap();
let segment_postings_c = segment_reader
.inverted_index(TERM_C.field())
.unwrap()
.read_postings(&*TERM_C, IndexRecordOption::Basic)
.unwrap()
.unwrap();
let segment_postings_d = segment_reader
.inverted_index(TERM_D.field())
.unwrap()
.read_postings(&*TERM_D, IndexRecordOption::Basic)
.unwrap()
.unwrap();
@@ -672,7 +670,6 @@ mod bench {
let mut segment_postings = segment_reader
.inverted_index(TERM_A.field())
.unwrap()
.read_postings(&*TERM_A, IndexRecordOption::Basic)
.unwrap()
.unwrap();
@@ -690,9 +687,7 @@ mod bench {
b.iter(|| {
let mut segment_postings = segment_reader
.inverted_index(TERM_A.field())
.unwrap()
.read_postings(&*TERM_A, IndexRecordOption::Basic)
.unwrap()
.unwrap();
for doc in &existing_docs {
if segment_postings.seek(*doc) == TERMINATED {
@@ -731,9 +726,7 @@ mod bench {
let n: u32 = test::black_box(17);
let mut segment_postings = segment_reader
.inverted_index(TERM_A.field())
.unwrap()
.read_postings(&*TERM_A, IndexRecordOption::Basic)
.unwrap()
.unwrap();
let mut s = 0u32;
while segment_postings.doc() != TERMINATED {

View File

@@ -177,16 +177,14 @@ impl<'a> FieldSerializer<'a> {
}
fn current_term_info(&self) -> TermInfo {
let positions_idx =
if let Some(positions_serializer) = self.positions_serializer_opt.as_ref() {
positions_serializer.positions_idx()
} else {
0u64
};
let positions_idx = self
.positions_serializer_opt
.as_ref()
.map(PositionSerializer::positions_idx)
.unwrap_or(0u64);
TermInfo {
doc_freq: 0,
postings_start_offset: self.postings_serializer.addr(),
postings_stop_offset: 0u64,
postings_offset: self.postings_serializer.addr(),
positions_idx,
}
}
@@ -240,11 +238,10 @@ impl<'a> FieldSerializer<'a> {
/// using `VInt` encoding.
pub fn close_term(&mut self) -> io::Result<()> {
if self.term_open {
self.postings_serializer
.close_term(self.current_term_info.doc_freq)?;
self.current_term_info.postings_stop_offset = self.postings_serializer.addr();
self.term_dictionary_builder
.insert_value(&self.current_term_info)?;
self.postings_serializer
.close_term(self.current_term_info.doc_freq)?;
self.term_open = false;
}
Ok(())
@@ -325,9 +322,8 @@ pub struct PostingsSerializer<W: Write> {
bm25_weight: Option<BM25Weight>,
num_docs: u32, // Number of docs in the segment
avg_fieldnorm: Score, // Average number of term in the field for that segment.
// this value is used to compute the block wand information.
// this value is used to compute the block wand information.
}
impl<W: Write> PostingsSerializer<W> {
@@ -337,10 +333,6 @@ impl<W: Write> PostingsSerializer<W> {
mode: IndexRecordOption,
fieldnorm_reader: Option<FieldNormReader>,
) -> PostingsSerializer<W> {
let num_docs = fieldnorm_reader
.as_ref()
.map(|fieldnorm_reader| fieldnorm_reader.num_docs())
.unwrap_or(0u32);
PostingsSerializer {
output_write: CountingWriter::wrap(write),
@@ -356,20 +348,25 @@ impl<W: Write> PostingsSerializer<W> {
fieldnorm_reader,
bm25_weight: None,
num_docs,
avg_fieldnorm,
}
}
/// Returns the number of documents in the segment currently being serialized.
/// This function may return `None` if there are no fieldnorm for that field.
fn num_docs_in_segment(&self) -> Option<u32> {
self.fieldnorm_reader
.as_ref()
.map(|reader| reader.num_docs())
}
pub fn new_term(&mut self, term_doc_freq: u32) {
if self.mode.has_freq() && self.num_docs > 0 {
let bm25_weight = BM25Weight::for_one_term(
term_doc_freq as u64,
self.num_docs as u64,
self.avg_fieldnorm,
);
self.bm25_weight = Some(bm25_weight);
if self.mode.has_freq() {
return;
}
self.bm25_weight = self.num_docs_in_segment().map(|num_docs| {
BM25Weight::for_one_term(term_doc_freq as u64, num_docs as u64, self.avg_fieldnorm)
});
}
fn write_block(&mut self) {

View File

@@ -7,50 +7,35 @@ use std::io;
pub struct TermInfo {
/// Number of documents in the segment containing the term
pub doc_freq: u32,
/// Start offset of the posting list within the postings (`.idx`) file.
pub postings_start_offset: u64,
/// Stop offset of the posting list within the postings (`.idx`) file.
/// The byte range is `[start_offset..stop_offset)`.
pub postings_stop_offset: u64,
/// Start offset within the postings (`.idx`) file.
pub postings_offset: u64,
/// Start offset of the first block within the position (`.pos`) file.
pub positions_idx: u64,
}
impl TermInfo {
pub(crate) fn posting_num_bytes(&self) -> u32 {
let num_bytes = self.postings_stop_offset - self.postings_start_offset;
assert!(num_bytes <= std::u32::MAX as u64);
num_bytes as u32
}
}
impl FixedSize for TermInfo {
/// Size required for the binary serialization of a `TermInfo` object.
/// This is large, but in practise, `TermInfo` are encoded in blocks and
/// only the first `TermInfo` of a block is serialized uncompressed.
/// The subsequent `TermInfo` are delta encoded and bitpacked.
const SIZE_IN_BYTES: usize = 2 * u32::SIZE_IN_BYTES + 2 * u64::SIZE_IN_BYTES;
const SIZE_IN_BYTES: usize = u32::SIZE_IN_BYTES + 2 * u64::SIZE_IN_BYTES;
}
impl BinarySerializable for TermInfo {
fn serialize<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
self.doc_freq.serialize(writer)?;
self.postings_start_offset.serialize(writer)?;
self.posting_num_bytes().serialize(writer)?;
self.postings_offset.serialize(writer)?;
self.positions_idx.serialize(writer)?;
Ok(())
}
fn deserialize<R: io::Read>(reader: &mut R) -> io::Result<Self> {
let doc_freq = u32::deserialize(reader)?;
let postings_start_offset = u64::deserialize(reader)?;
let postings_num_bytes = u32::deserialize(reader)?;
let postings_stop_offset = postings_start_offset + u64::from(postings_num_bytes);
let postings_offset = u64::deserialize(reader)?;
let positions_idx = u64::deserialize(reader)?;
Ok(TermInfo {
doc_freq,
postings_start_offset,
postings_stop_offset,
postings_offset,
positions_idx,
})
}

View File

@@ -268,7 +268,7 @@ mod tests {
}
fn nearly_equals(left: Score, right: Score) -> bool {
(left - right).abs() < 0.0001 * (left + right).abs()
(left - right).abs() < 0.000001 * (left + right).abs()
}
fn compute_checkpoints_for_each_pruning(
@@ -424,116 +424,9 @@ mod tests {
}
}
#[test]
fn test_fn_reproduce_proptest() {
let postings_lists = &[
vec![
(0, 1),
(1, 1),
(2, 1),
(3, 1),
(4, 1),
(6, 1),
(7, 7),
(8, 1),
(10, 1),
(12, 1),
(13, 1),
(14, 1),
(15, 1),
(16, 1),
(19, 1),
(20, 1),
(21, 1),
(22, 1),
(24, 1),
(25, 1),
(26, 1),
(28, 1),
(30, 1),
(31, 1),
(33, 1),
(34, 1),
(35, 1),
(36, 95),
(37, 1),
(39, 1),
(41, 1),
(44, 1),
(46, 1),
],
vec![
(0, 5),
(2, 1),
(4, 1),
(5, 84),
(6, 47),
(7, 26),
(8, 50),
(9, 34),
(11, 73),
(12, 11),
(13, 51),
(14, 45),
(15, 18),
(18, 60),
(19, 80),
(20, 63),
(23, 79),
(24, 69),
(26, 35),
(28, 82),
(29, 19),
(30, 2),
(31, 7),
(33, 40),
(34, 1),
(35, 33),
(36, 27),
(37, 24),
(38, 65),
(39, 32),
(40, 85),
(41, 1),
(42, 69),
(43, 11),
(45, 45),
(47, 97),
],
vec![
(2, 1),
(4, 1),
(7, 94),
(8, 1),
(9, 1),
(10, 1),
(12, 1),
(15, 1),
(22, 1),
(23, 1),
(26, 1),
(27, 1),
(32, 1),
(33, 1),
(34, 1),
(36, 96),
(39, 1),
(41, 1),
],
];
let fieldnorms = &[
685, 239, 780, 564, 664, 827, 5, 56, 930, 887, 263, 665, 167, 127, 120, 919, 292, 92,
489, 734, 814, 724, 700, 304, 128, 779, 311, 877, 774, 15, 866, 368, 894, 371, 982,
502, 507, 669, 680, 76, 594, 626, 578, 331, 170, 639, 665, 186,
][..];
test_block_wand_aux(postings_lists, fieldnorms);
}
proptest! {
#![proptest_config(ProptestConfig::with_cases(500))]
#[ignore]
#[test]
#[ignore]
fn test_block_wand_three_term_scorers((posting_lists, fieldnorms) in gen_term_scorers(3)) {
test_block_wand_aux(&posting_lists[..], &fieldnorms[..]);
}

View File

@@ -310,7 +310,7 @@ mod tests {
));
let query = BooleanQuery::from(vec![(Occur::Should, term_a), (Occur::Should, term_b)]);
let explanation = query.explain(&searcher, DocAddress(0, 0u32))?;
assert_nearly_equals!(explanation.value(), 0.6931472);
assert_nearly_equals!(explanation.value(), 0.6931472f32);
Ok(())
}
}

View File

@@ -197,7 +197,7 @@ mod tests {
let searcher = index.reader()?.searcher();
{
let explanation = term_query.explain(&searcher, DocAddress(0u32, 1u32))?;
assert_nearly_equals!(explanation.value(), 0.6931472);
assert_nearly_equals!(explanation.value(), 0.6931472f32);
}
{
let explanation_err = term_query.explain(&searcher, DocAddress(0u32, 0u32));

View File

@@ -92,14 +92,18 @@ impl TermQuery {
searcher: &Searcher,
scoring_enabled: bool,
) -> crate::Result<TermWeight> {
let term = self.term.clone();
let field_entry = searcher.schema().get_field_entry(term.field());
let field_entry = searcher
.schema()
.get_field_entry(self.term.field());
if !field_entry.is_indexed() {
return Err(crate::TantivyError::SchemaError(format!(
"Field {:?} is not indexed",
field_entry.name()
)));
let error_msg = format!("Field {:?} is not indexed.", field_entry.name());
return Err(crate::TantivyError::SchemaError(error_msg));
}
let has_fieldnorms = searcher
.schema()
.get_field_entry(self.term.field())
.has_fieldnorms();
let term = self.term.clone();
let bm25_weight = BM25Weight::for_terms(searcher, &[term])?;
let index_record_option = if scoring_enabled {
self.index_record_option
@@ -110,7 +114,7 @@ impl TermQuery {
self.term.clone(),
index_record_option,
bm25_weight,
scoring_enabled,
has_fieldnorms,
))
}
}

View File

@@ -16,7 +16,7 @@ pub struct TermWeight {
term: Term,
index_record_option: IndexRecordOption,
similarity_weight: BM25Weight,
scoring_enabled: bool,
has_fieldnorms: bool,
}
impl Weight for TermWeight {
@@ -89,13 +89,13 @@ impl TermWeight {
term: Term,
index_record_option: IndexRecordOption,
similarity_weight: BM25Weight,
scoring_enabled: bool,
has_fieldnorms: bool,
) -> TermWeight {
TermWeight {
term,
index_record_option,
similarity_weight,
scoring_enabled,
has_fieldnorms,
}
}
@@ -106,10 +106,10 @@ impl TermWeight {
) -> crate::Result<TermScorer> {
let field = self.term.field();
let inverted_index = reader.inverted_index(field)?;
let fieldnorm_reader = if self.scoring_enabled {
let fieldnorm_reader = if self.has_fieldnorms {
reader.get_fieldnorms_reader(field)?
} else {
FieldNormReader::constant(reader.max_doc(), 1)
FieldNormReader::const_fieldnorm_id(1u8, reader.num_docs())
};
let similarity_weight = self.similarity_weight.boost_by(boost);
let postings_opt: Option<SegmentPostings> =

View File

@@ -3,9 +3,9 @@ mod pool;
pub use self::pool::LeasedItem;
use self::pool::Pool;
use crate::core::Segment;
use crate::directory::Directory;
use crate::directory::WatchHandle;
use crate::directory::META_LOCK;
use crate::directory::{Directory, WatchCallback};
use crate::Index;
use crate::Searcher;
use crate::SegmentReader;
@@ -88,7 +88,7 @@ impl IndexReaderBuilder {
let watch_handle = inner_reader_arc
.index
.directory()
.watch(WatchCallback::new(callback))?;
.watch(Box::new(callback))?;
watch_handle_opt = Some(watch_handle);
}
}

View File

@@ -112,6 +112,21 @@ impl FieldEntry {
}
}
pub fn has_fieldnorms(&self) -> bool {
match self.field_type {
FieldType::Str(ref options) => options
.get_indexing_options()
.map(|options| options.fieldnorms())
.unwrap_or(false),
FieldType::U64(ref options)
| FieldType::I64(ref options)
| FieldType::F64(ref options)
| FieldType::Date(ref options) => options.index_option().has_fieldnorms(),
FieldType::HierarchicalFacet => false,
FieldType::Bytes(ref _options) => false,
}
}
/// Returns true iff the field is a int (signed or unsigned) fast field
pub fn is_fast(&self) -> bool {
match self.field_type {
@@ -272,7 +287,8 @@ impl<'de> Deserialize<'de> for FieldEntry {
#[cfg(test)]
mod tests {
use super::*;
use crate::schema::TEXT;
use crate::schema::{Schema, STRING, TEXT};
use crate::Index;
use serde_json;
#[test]
@@ -291,7 +307,8 @@ mod tests {
"options": {
"indexing": {
"record": "position",
"tokenizer": "default"
"tokenizer": "default",
"fieldnorms": true
},
"stored": false
}
@@ -309,4 +326,19 @@ mod tests {
_ => panic!("expected FieldType::Str"),
}
}
#[test]
fn test_fieldnorms() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
let text = schema_builder.add_text_field("text", STRING);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_for_tests()?;
index_writer.add_document(doc!(text=>"abc"));
index_writer.commit()?;
let searcher = index.reader()?.searcher();
let err = searcher.segment_reader(0u32).get_fieldnorms_reader(text);
assert!(matches!(err, Err(crate::TantivyError::SchemaError(_))));
Ok(())
}
}

View File

@@ -14,10 +14,50 @@ pub enum Cardinality {
MultiValues,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum IntOptionIndex {
#[serde(rename = "no_index")]
NoIndex,
#[serde(rename = "index_no_fieldnorms")]
IndexNoFieldnorms,
#[serde(rename = "index_with_fieldnorms")]
IndexWithFieldnorms,
}
impl BitOr<IntOptionIndex> for IntOptionIndex {
type Output = IntOptionIndex;
fn bitor(self, other: IntOptionIndex) -> IntOptionIndex {
match (self, other) {
(_, Self::IndexWithFieldnorms) | (Self::IndexWithFieldnorms, _) => {
Self::IndexWithFieldnorms
}
(_, Self::IndexNoFieldnorms) | (Self::IndexNoFieldnorms, _) => Self::IndexNoFieldnorms,
(Self::NoIndex, Self::NoIndex) => Self::NoIndex,
}
}
}
impl IntOptionIndex {
pub fn is_indexed(&self) -> bool {
match *self {
Self::NoIndex => false,
Self::IndexNoFieldnorms | Self::IndexWithFieldnorms => true,
}
}
pub fn has_fieldnorms(&self) -> bool {
match *self {
Self::NoIndex | Self::IndexNoFieldnorms => false,
Self::IndexWithFieldnorms => true,
}
}
}
/// Define how an u64, i64, of f64 field should be handled by tantivy.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct IntOptions {
indexed: bool,
indexed: IntOptionIndex,
#[serde(skip_serializing_if = "Option::is_none")]
fast: Option<Cardinality>,
stored: bool,
@@ -31,7 +71,7 @@ impl IntOptions {
/// Returns true iff the value is indexed.
pub fn is_indexed(&self) -> bool {
self.indexed
self.indexed.is_indexed()
}
/// Returns true iff the value is a fast field.
@@ -48,12 +88,21 @@ impl IntOptions {
self
}
pub fn index_option(&self) -> &IntOptionIndex {
&self.indexed
}
pub fn set_indexed(mut self) -> IntOptions {
self.indexed = IntOptionIndex::IndexWithFieldnorms;
self
}
/// Set the field as indexed.
///
/// Setting an integer as indexed will generate
/// a posting list for each value taken by the integer.
pub fn set_indexed(mut self) -> IntOptions {
self.indexed = true;
pub fn set_index_option(mut self, int_option_index: IntOptionIndex) -> IntOptions {
self.indexed = int_option_index;
self
}
@@ -80,7 +129,7 @@ impl IntOptions {
impl Default for IntOptions {
fn default() -> IntOptions {
IntOptions {
indexed: false,
indexed: IntOptionIndex::NoIndex,
stored: false,
fast: None,
}
@@ -96,7 +145,7 @@ impl From<()> for IntOptions {
impl From<FastFlag> for IntOptions {
fn from(_: FastFlag) -> Self {
IntOptions {
indexed: false,
indexed: IntOptionIndex::NoIndex,
stored: false,
fast: Some(Cardinality::SingleValue),
}
@@ -106,7 +155,7 @@ impl From<FastFlag> for IntOptions {
impl From<StoredFlag> for IntOptions {
fn from(_: StoredFlag) -> Self {
IntOptions {
indexed: false,
indexed: IntOptionIndex::NoIndex,
stored: true,
fast: None,
}
@@ -116,7 +165,7 @@ impl From<StoredFlag> for IntOptions {
impl From<IndexedFlag> for IntOptions {
fn from(_: IndexedFlag) -> Self {
IntOptions {
indexed: true,
indexed: IntOptionIndex::IndexWithFieldnorms,
stored: false,
fast: None,
}

View File

@@ -231,6 +231,10 @@ impl Schema {
&self.0.fields[field.field_id() as usize]
}
pub fn num_fields(&self) -> usize {
self.0.fields.len()
}
/// Return the field name for a given `Field`.
pub fn get_field_name(&self, field: Field) -> &str {
self.get_field_entry(field).name()
@@ -444,7 +448,8 @@ mod tests {
"options": {
"indexing": {
"record": "position",
"tokenizer": "default"
"tokenizer": "default",
"fieldnorms": true
},
"stored": false
}
@@ -455,7 +460,8 @@ mod tests {
"options": {
"indexing": {
"record": "basic",
"tokenizer": "raw"
"tokenizer": "raw",
"fieldnorms": false
},
"stored": false
}
@@ -464,7 +470,7 @@ mod tests {
"name": "count",
"type": "u64",
"options": {
"indexed": false,
"indexed": "no_index",
"fast": "single",
"stored": true
}
@@ -473,7 +479,7 @@ mod tests {
"name": "popularity",
"type": "i64",
"options": {
"indexed": false,
"indexed": "no_index",
"fast": "single",
"stored": true
}
@@ -482,7 +488,7 @@ mod tests {
"name": "score",
"type": "f64",
"options": {
"indexed": true,
"indexed": "index_with_fieldnorms",
"fast": "single",
"stored": false
}
@@ -747,7 +753,8 @@ mod tests {
"options": {
"indexing": {
"record": "position",
"tokenizer": "default"
"tokenizer": "default",
"fieldnorms": true
},
"stored": false
}
@@ -756,7 +763,7 @@ mod tests {
"name": "popularity",
"type": "i64",
"options": {
"indexed": false,
"indexed": "no_index",
"fast": "single",
"stored": true
}
@@ -777,7 +784,8 @@ mod tests {
"options": {
"indexing": {
"record": "basic",
"tokenizer": "raw"
"tokenizer": "raw",
"fieldnorms": false
},
"stored": true
}
@@ -786,7 +794,7 @@ mod tests {
"name": "_timestamp",
"type": "date",
"options": {
"indexed": true,
"indexed": "index_with_fieldnorms",
"fast": "single",
"stored": true
}
@@ -797,7 +805,8 @@ mod tests {
"options": {
"indexing": {
"record": "position",
"tokenizer": "default"
"tokenizer": "default",
"fieldnorms": true
},
"stored": false
}
@@ -806,7 +815,7 @@ mod tests {
"name": "popularity",
"type": "i64",
"options": {
"indexed": false,
"indexed": "no_index",
"fast": "single",
"stored": true
}

View File

@@ -55,6 +55,7 @@ impl Default for TextOptions {
pub struct TextFieldIndexing {
record: IndexRecordOption,
tokenizer: Cow<'static, str>,
fieldnorms: bool,
}
impl Default for TextFieldIndexing {
@@ -62,6 +63,7 @@ impl Default for TextFieldIndexing {
TextFieldIndexing {
tokenizer: Cow::Borrowed("default"),
record: IndexRecordOption::Basic,
fieldnorms: false,
}
}
}
@@ -78,6 +80,15 @@ impl TextFieldIndexing {
&self.tokenizer
}
pub fn set_fieldnorms(mut self, fieldnorms: bool) -> TextFieldIndexing {
self.fieldnorms = fieldnorms;
self
}
pub fn fieldnorms(&self) -> bool {
self.fieldnorms
}
/// Sets which information should be indexed with the tokens.
///
/// See [IndexRecordOption](./enum.IndexRecordOption.html) for more detail.
@@ -99,6 +110,7 @@ pub const STRING: TextOptions = TextOptions {
indexing: Some(TextFieldIndexing {
tokenizer: Cow::Borrowed("raw"),
record: IndexRecordOption::Basic,
fieldnorms: false,
}),
stored: false,
};
@@ -108,6 +120,7 @@ pub const TEXT: TextOptions = TextOptions {
indexing: Some(TextFieldIndexing {
tokenizer: Cow::Borrowed("default"),
record: IndexRecordOption::WithFreqsAndPositions,
fieldnorms: true,
}),
stored: false,
};

View File

@@ -1,165 +0,0 @@
use crate::common::VInt;
use crate::store::index::{Checkpoint, CHECKPOINT_PERIOD};
use crate::DocId;
use std::io;
/// Represents a block of checkpoints.
///
/// The DocStore index checkpoints are organized into block
/// for code-readability and compression purpose.
///
/// A block can be of any size.
pub struct CheckpointBlock {
pub checkpoints: Vec<Checkpoint>,
}
impl Default for CheckpointBlock {
fn default() -> CheckpointBlock {
CheckpointBlock {
checkpoints: Vec::with_capacity(2 * CHECKPOINT_PERIOD),
}
}
}
impl CheckpointBlock {
/// If non-empty returns [start_doc, end_doc)
/// for the overall block.
pub fn doc_interval(&self) -> Option<(DocId, DocId)> {
let start_doc_opt = self
.checkpoints
.first()
.cloned()
.map(|checkpoint| checkpoint.start_doc);
let end_doc_opt = self
.checkpoints
.last()
.cloned()
.map(|checkpoint| checkpoint.end_doc);
match (start_doc_opt, end_doc_opt) {
(Some(start_doc), Some(end_doc)) => Some((start_doc, end_doc)),
_ => None,
}
}
/// Adding another checkpoint in the block.
pub fn push(&mut self, checkpoint: Checkpoint) {
self.checkpoints.push(checkpoint);
}
/// Returns the number of checkpoints in the block.
pub fn len(&self) -> usize {
self.checkpoints.len()
}
pub fn get(&self, idx: usize) -> Checkpoint {
self.checkpoints[idx]
}
pub fn clear(&mut self) {
self.checkpoints.clear();
}
pub fn serialize(&mut self, buffer: &mut Vec<u8>) {
VInt(self.checkpoints.len() as u64).serialize_into_vec(buffer);
if self.checkpoints.is_empty() {
return;
}
VInt(self.checkpoints[0].start_doc as u64).serialize_into_vec(buffer);
VInt(self.checkpoints[0].start_offset as u64).serialize_into_vec(buffer);
for checkpoint in &self.checkpoints {
let delta_doc = checkpoint.end_doc - checkpoint.start_doc;
VInt(delta_doc as u64).serialize_into_vec(buffer);
VInt(checkpoint.end_offset - checkpoint.start_offset).serialize_into_vec(buffer);
}
}
pub fn deserialize(&mut self, data: &mut &[u8]) -> io::Result<()> {
if data.is_empty() {
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, ""));
}
self.checkpoints.clear();
let len = VInt::deserialize_u64(data)? as usize;
if len == 0 {
return Ok(());
}
let mut doc = VInt::deserialize_u64(data)? as DocId;
let mut start_offset = VInt::deserialize_u64(data)?;
for _ in 0..len {
let num_docs = VInt::deserialize_u64(data)? as DocId;
let block_num_bytes = VInt::deserialize_u64(data)?;
self.checkpoints.push(Checkpoint {
start_doc: doc,
end_doc: doc + num_docs,
start_offset,
end_offset: start_offset + block_num_bytes,
});
doc += num_docs;
start_offset += block_num_bytes;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use crate::store::index::block::CheckpointBlock;
use crate::store::index::Checkpoint;
use crate::DocId;
use std::io;
fn test_aux_ser_deser(checkpoints: &[Checkpoint]) -> io::Result<()> {
let mut block = CheckpointBlock::default();
for &checkpoint in checkpoints {
block.push(checkpoint);
}
let mut buffer = Vec::new();
block.serialize(&mut buffer);
let mut block_deser = CheckpointBlock::default();
let checkpoint = Checkpoint {
start_doc: 0,
end_doc: 1,
start_offset: 2,
end_offset: 3,
};
block_deser.push(checkpoint); // < check that value is erased before deser
let mut data = &buffer[..];
block_deser.deserialize(&mut data)?;
assert!(data.is_empty());
assert_eq!(checkpoints, &block_deser.checkpoints[..]);
Ok(())
}
#[test]
fn test_block_serialize_empty() -> io::Result<()> {
test_aux_ser_deser(&[])
}
#[test]
fn test_block_serialize_simple() -> io::Result<()> {
let checkpoints = vec![Checkpoint {
start_doc: 10,
end_doc: 12,
start_offset: 100,
end_offset: 120,
}];
test_aux_ser_deser(&checkpoints)
}
#[test]
fn test_block_serialize() -> io::Result<()> {
let offsets: Vec<u64> = (0..11).map(|i| i * i * i).collect();
let mut checkpoints = vec![];
let mut start_doc = 0;
for i in 0..10 {
let end_doc = (i * i) as DocId;
checkpoints.push(Checkpoint {
start_doc,
end_doc,
start_offset: offsets[i],
end_offset: offsets[i + 1],
});
start_doc = end_doc;
}
test_aux_ser_deser(&checkpoints)
}
}

View File

@@ -1,230 +0,0 @@
const CHECKPOINT_PERIOD: usize = 8;
use std::fmt;
mod block;
mod skip_index;
mod skip_index_builder;
use crate::DocId;
pub use self::skip_index::SkipIndex;
pub use self::skip_index_builder::SkipIndexBuilder;
/// A checkpoint contains meta-information about
/// a block. Either a block of documents, or another block
/// of checkpoints.
///
/// All of the intervals here defined are semi-open.
/// The checkpoint describes that the block within the bytes
/// `[start_offset..end_offset)` spans over the docs
/// `[start_doc..end_doc)`.
#[derive(Clone, Copy, Eq, PartialEq)]
pub struct Checkpoint {
pub start_doc: DocId,
pub end_doc: DocId,
pub start_offset: u64,
pub end_offset: u64,
}
impl fmt::Debug for Checkpoint {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"(doc=[{}..{}), bytes=[{}..{}))",
self.start_doc, self.end_doc, self.start_offset, self.end_offset
)
}
}
#[cfg(test)]
mod tests {
use std::io;
use proptest::strategy::{BoxedStrategy, Strategy};
use crate::directory::OwnedBytes;
use crate::store::index::Checkpoint;
use crate::DocId;
use super::{SkipIndex, SkipIndexBuilder};
#[test]
fn test_skip_index_empty() -> io::Result<()> {
let mut output: Vec<u8> = Vec::new();
let skip_index_builder: SkipIndexBuilder = SkipIndexBuilder::new();
skip_index_builder.write(&mut output)?;
let skip_index: SkipIndex = SkipIndex::from(OwnedBytes::new(output));
let mut skip_cursor = skip_index.checkpoints();
assert!(skip_cursor.next().is_none());
Ok(())
}
#[test]
fn test_skip_index_single_el() -> io::Result<()> {
let mut output: Vec<u8> = Vec::new();
let mut skip_index_builder: SkipIndexBuilder = SkipIndexBuilder::new();
let checkpoint = Checkpoint {
start_doc: 0,
end_doc: 2,
start_offset: 0,
end_offset: 3,
};
skip_index_builder.insert(checkpoint);
skip_index_builder.write(&mut output)?;
let skip_index: SkipIndex = SkipIndex::from(OwnedBytes::new(output));
let mut skip_cursor = skip_index.checkpoints();
assert_eq!(skip_cursor.next(), Some(checkpoint));
assert_eq!(skip_cursor.next(), None);
Ok(())
}
#[test]
fn test_skip_index() -> io::Result<()> {
let mut output: Vec<u8> = Vec::new();
let checkpoints = vec![
Checkpoint {
start_doc: 0,
end_doc: 3,
start_offset: 4,
end_offset: 9,
},
Checkpoint {
start_doc: 3,
end_doc: 4,
start_offset: 9,
end_offset: 25,
},
Checkpoint {
start_doc: 4,
end_doc: 6,
start_offset: 25,
end_offset: 49,
},
Checkpoint {
start_doc: 6,
end_doc: 8,
start_offset: 49,
end_offset: 81,
},
Checkpoint {
start_doc: 8,
end_doc: 10,
start_offset: 81,
end_offset: 100,
},
];
let mut skip_index_builder: SkipIndexBuilder = SkipIndexBuilder::new();
for &checkpoint in &checkpoints {
skip_index_builder.insert(checkpoint);
}
skip_index_builder.write(&mut output)?;
let skip_index: SkipIndex = SkipIndex::from(OwnedBytes::new(output));
assert_eq!(
&skip_index.checkpoints().collect::<Vec<_>>()[..],
&checkpoints[..]
);
Ok(())
}
fn offset_test(doc: DocId) -> u64 {
(doc as u64) * (doc as u64)
}
#[test]
fn test_skip_index_long() -> io::Result<()> {
let mut output: Vec<u8> = Vec::new();
let checkpoints: Vec<Checkpoint> = (0..1000)
.map(|i| Checkpoint {
start_doc: i,
end_doc: i + 1,
start_offset: offset_test(i),
end_offset: offset_test(i + 1),
})
.collect();
let mut skip_index_builder = SkipIndexBuilder::new();
for checkpoint in &checkpoints {
skip_index_builder.insert(*checkpoint);
}
skip_index_builder.write(&mut output)?;
assert_eq!(output.len(), 4035);
let resulting_checkpoints: Vec<Checkpoint> = SkipIndex::from(OwnedBytes::new(output))
.checkpoints()
.collect();
assert_eq!(&resulting_checkpoints, &checkpoints);
Ok(())
}
fn integrate_delta(mut vals: Vec<u64>) -> Vec<u64> {
let mut prev = 0u64;
for val in vals.iter_mut() {
let new_val = *val + prev;
prev = new_val;
*val = new_val;
}
vals
}
// Generates a sequence of n valid checkpoints, with n < max_len.
fn monotonic_checkpoints(max_len: usize) -> BoxedStrategy<Vec<Checkpoint>> {
(1..max_len)
.prop_flat_map(move |len: usize| {
(
proptest::collection::vec(1u64..20u64, len as usize).prop_map(integrate_delta),
proptest::collection::vec(1u64..26u64, len as usize).prop_map(integrate_delta),
)
.prop_map(|(docs, offsets)| {
(0..docs.len() - 1)
.map(move |i| Checkpoint {
start_doc: docs[i] as DocId,
end_doc: docs[i + 1] as DocId,
start_offset: offsets[i],
end_offset: offsets[i + 1],
})
.collect::<Vec<Checkpoint>>()
})
})
.boxed()
}
fn seek_manual<I: Iterator<Item = Checkpoint>>(
checkpoints: I,
target: DocId,
) -> Option<Checkpoint> {
checkpoints
.into_iter()
.filter(|checkpoint| checkpoint.end_doc > target)
.next()
}
fn test_skip_index_aux(skip_index: SkipIndex, checkpoints: &[Checkpoint]) {
if let Some(last_checkpoint) = checkpoints.last() {
for doc in 0u32..last_checkpoint.end_doc {
let expected = seek_manual(skip_index.checkpoints(), doc);
assert_eq!(expected, skip_index.seek(doc), "Doc {}", doc);
}
assert!(skip_index.seek(last_checkpoint.end_doc).is_none());
}
}
use proptest::prelude::*;
proptest! {
#![proptest_config(ProptestConfig::with_cases(20))]
#[test]
fn test_proptest_skip(checkpoints in monotonic_checkpoints(100)) {
let mut skip_index_builder = SkipIndexBuilder::new();
for checkpoint in checkpoints.iter().cloned() {
skip_index_builder.insert(checkpoint);
}
let mut buffer = Vec::new();
skip_index_builder.write(&mut buffer).unwrap();
let skip_index = SkipIndex::from(OwnedBytes::new(buffer));
let iter_checkpoints: Vec<Checkpoint> = skip_index.checkpoints().collect();
assert_eq!(&checkpoints[..], &iter_checkpoints[..]);
test_skip_index_aux(skip_index, &checkpoints[..]);
}
}
}

View File

@@ -1,112 +0,0 @@
use crate::common::{BinarySerializable, VInt};
use crate::directory::OwnedBytes;
use crate::store::index::block::CheckpointBlock;
use crate::store::index::Checkpoint;
use crate::DocId;
pub struct LayerCursor<'a> {
remaining: &'a [u8],
block: CheckpointBlock,
cursor: usize,
}
impl<'a> Iterator for LayerCursor<'a> {
type Item = Checkpoint;
fn next(&mut self) -> Option<Checkpoint> {
if self.cursor == self.block.len() {
if self.remaining.is_empty() {
return None;
}
let (block_mut, remaining_mut) = (&mut self.block, &mut self.remaining);
if let Err(_) = block_mut.deserialize(remaining_mut) {
return None;
}
self.cursor = 0;
}
let res = Some(self.block.get(self.cursor));
self.cursor += 1;
res
}
}
struct Layer {
data: OwnedBytes,
}
impl Layer {
fn cursor<'a>(&'a self) -> impl Iterator<Item = Checkpoint> + 'a {
self.cursor_at_offset(0u64)
}
fn cursor_at_offset<'a>(&'a self, start_offset: u64) -> impl Iterator<Item = Checkpoint> + 'a {
let data = &self.data.as_slice();
LayerCursor {
remaining: &data[start_offset as usize..],
block: CheckpointBlock::default(),
cursor: 0,
}
}
fn seek_start_at_offset(&self, target: DocId, offset: u64) -> Option<Checkpoint> {
self.cursor_at_offset(offset)
.filter(|checkpoint| checkpoint.end_doc > target)
.next()
}
}
pub struct SkipIndex {
layers: Vec<Layer>,
}
impl SkipIndex {
pub(crate) fn checkpoints<'a>(&'a self) -> impl Iterator<Item = Checkpoint> + 'a {
self.layers
.last()
.into_iter()
.flat_map(|layer| layer.cursor())
}
pub fn seek(&self, target: DocId) -> Option<Checkpoint> {
let first_layer_len = self
.layers
.first()
.map(|layer| layer.data.len() as u64)
.unwrap_or(0u64);
let mut cur_checkpoint = Checkpoint {
start_doc: 0u32,
end_doc: 1u32,
start_offset: 0u64,
end_offset: first_layer_len,
};
for layer in &self.layers {
if let Some(checkpoint) =
layer.seek_start_at_offset(target, cur_checkpoint.start_offset)
{
cur_checkpoint = checkpoint;
} else {
return None;
}
}
Some(cur_checkpoint)
}
}
impl From<OwnedBytes> for SkipIndex {
fn from(mut data: OwnedBytes) -> SkipIndex {
let offsets: Vec<u64> = Vec::<VInt>::deserialize(&mut data)
.unwrap()
.into_iter()
.map(|el| el.0)
.collect();
let mut start_offset = 0;
let mut layers = Vec::new();
for end_offset in offsets {
layers.push(Layer {
data: data.slice(start_offset as usize, end_offset as usize),
});
start_offset = end_offset;
}
SkipIndex { layers }
}
}

View File

@@ -1,115 +0,0 @@
use crate::common::{BinarySerializable, VInt};
use crate::store::index::block::CheckpointBlock;
use crate::store::index::{Checkpoint, CHECKPOINT_PERIOD};
use std::io;
use std::io::Write;
// Each skip contains iterator over pairs (last doc in block, offset to start of block).
struct LayerBuilder {
buffer: Vec<u8>,
pub block: CheckpointBlock,
}
impl LayerBuilder {
fn finish(self) -> Vec<u8> {
self.buffer
}
fn new() -> LayerBuilder {
LayerBuilder {
buffer: Vec::new(),
block: CheckpointBlock::default(),
}
}
/// Serializes the block, and return a checkpoint representing
/// the entire block.
///
/// If the block was empty to begin with, simply return None.
fn flush_block(&mut self) -> Option<Checkpoint> {
self.block.doc_interval().map(|(start_doc, end_doc)| {
let start_offset = self.buffer.len() as u64;
self.block.serialize(&mut self.buffer);
let end_offset = self.buffer.len() as u64;
self.block.clear();
Checkpoint {
start_doc,
end_doc,
start_offset,
end_offset,
}
})
}
fn push(&mut self, checkpoint: Checkpoint) {
self.block.push(checkpoint);
}
fn insert(&mut self, checkpoint: Checkpoint) -> Option<Checkpoint> {
self.push(checkpoint);
let emit_skip_info = (self.block.len() % CHECKPOINT_PERIOD) == 0;
if emit_skip_info {
self.flush_block()
} else {
None
}
}
}
pub struct SkipIndexBuilder {
layers: Vec<LayerBuilder>,
}
impl SkipIndexBuilder {
pub fn new() -> SkipIndexBuilder {
SkipIndexBuilder { layers: Vec::new() }
}
fn get_layer(&mut self, layer_id: usize) -> &mut LayerBuilder {
if layer_id == self.layers.len() {
let layer_builder = LayerBuilder::new();
self.layers.push(layer_builder);
}
&mut self.layers[layer_id]
}
pub fn insert(&mut self, checkpoint: Checkpoint) {
let mut skip_pointer = Some(checkpoint);
for layer_id in 0.. {
if let Some(checkpoint) = skip_pointer {
skip_pointer = self.get_layer(layer_id).insert(checkpoint);
} else {
break;
}
}
}
pub fn write<W: Write>(mut self, output: &mut W) -> io::Result<()> {
let mut last_pointer = None;
for skip_layer in self.layers.iter_mut() {
if let Some(checkpoint) = last_pointer {
skip_layer.push(checkpoint);
}
last_pointer = skip_layer.flush_block();
}
let layer_buffers: Vec<Vec<u8>> = self
.layers
.into_iter()
.rev()
.map(|layer| layer.finish())
.collect();
let mut layer_offset = 0;
let mut layer_sizes = Vec::new();
for layer_buffer in &layer_buffers {
layer_offset += layer_buffer.len() as u64;
layer_sizes.push(VInt(layer_offset));
}
layer_sizes.serialize(output)?;
for layer_buffer in layer_buffers {
output.write_all(&layer_buffer[..])?;
}
Ok(())
}
}

View File

@@ -33,8 +33,8 @@ and should rely on either
!*/
mod index;
mod reader;
mod skiplist;
mod writer;
pub use self::reader::StoreReader;
pub use self::writer::StoreWriter;

View File

@@ -1,91 +1,69 @@
use super::decompress;
use super::index::SkipIndex;
use super::skiplist::SkipList;
use crate::common::VInt;
use crate::common::{BinarySerializable, HasLen};
use crate::directory::{FileSlice, OwnedBytes};
use crate::schema::Document;
use crate::space_usage::StoreSpaceUsage;
use crate::store::index::Checkpoint;
use crate::DocId;
use lru::LruCache;
use std::cell::RefCell;
use std::io;
use std::mem::size_of;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
const LRU_CACHE_CAPACITY: usize = 100;
type Block = Arc<Vec<u8>>;
type BlockCache = Arc<Mutex<LruCache<u64, Block>>>;
/// Reads document off tantivy's [`Store`](./index.html)
#[derive(Clone)]
pub struct StoreReader {
data: FileSlice,
cache: BlockCache,
cache_hits: Arc<AtomicUsize>,
cache_misses: Arc<AtomicUsize>,
skip_index: Arc<SkipIndex>,
space_usage: StoreSpaceUsage,
offset_index_file: OwnedBytes,
current_block_offset: RefCell<usize>,
current_block: RefCell<Vec<u8>>,
max_doc: DocId,
}
impl StoreReader {
/// Opens a store reader
// TODO rename open
pub fn open(store_file: FileSlice) -> io::Result<StoreReader> {
let (data_file, offset_index_file) = split_file(store_file)?;
let index_data = offset_index_file.read_bytes()?;
let space_usage = StoreSpaceUsage::new(data_file.len(), offset_index_file.len());
let skip_index = SkipIndex::from(index_data);
let (data_file, offset_index_file, max_doc) = split_file(store_file)?;
Ok(StoreReader {
data: data_file,
cache: Arc::new(Mutex::new(LruCache::new(LRU_CACHE_CAPACITY))),
cache_hits: Default::default(),
cache_misses: Default::default(),
skip_index: Arc::new(skip_index),
space_usage,
offset_index_file: offset_index_file.read_bytes()?,
current_block_offset: RefCell::new(usize::max_value()),
current_block: RefCell::new(Vec::new()),
max_doc,
})
}
pub(crate) fn block_checkpoints<'a>(&'a self) -> impl Iterator<Item = Checkpoint> + 'a {
self.skip_index.checkpoints()
pub(crate) fn block_index(&self) -> SkipList<'_, u64> {
SkipList::from(self.offset_index_file.as_slice())
}
fn block_checkpoint(&self, doc_id: DocId) -> Option<Checkpoint> {
self.skip_index.seek(doc_id)
fn block_offset(&self, doc_id: DocId) -> (DocId, u64) {
self.block_index()
.seek(u64::from(doc_id) + 1)
.map(|(doc, offset)| (doc as DocId, offset))
.unwrap_or((0u32, 0u64))
}
pub(crate) fn block_data(&self) -> io::Result<OwnedBytes> {
self.data.read_bytes()
}
fn compressed_block(&self, checkpoint: &Checkpoint) -> io::Result<OwnedBytes> {
self.data
.slice(
checkpoint.start_offset as usize,
checkpoint.end_offset as usize,
)
.read_bytes()
fn compressed_block(&self, addr: usize) -> io::Result<OwnedBytes> {
let (block_len_bytes, block_body) = self.data.slice_from(addr).split(4);
let block_len = u32::deserialize(&mut block_len_bytes.read_bytes()?)?;
block_body.slice_to(block_len as usize).read_bytes()
}
fn read_block(&self, checkpoint: &Checkpoint) -> io::Result<Block> {
if let Some(block) = self.cache.lock().unwrap().get(&checkpoint.start_offset) {
self.cache_hits.fetch_add(1, Ordering::SeqCst);
return Ok(block.clone());
fn read_block(&self, block_offset: usize) -> io::Result<()> {
if block_offset != *self.current_block_offset.borrow() {
let mut current_block_mut = self.current_block.borrow_mut();
current_block_mut.clear();
let compressed_block = self.compressed_block(block_offset)?;
decompress(compressed_block.as_slice(), &mut current_block_mut)?;
*self.current_block_offset.borrow_mut() = block_offset;
}
self.cache_misses.fetch_add(1, Ordering::SeqCst);
let compressed_block = self.compressed_block(checkpoint)?;
let mut decompressed_block = vec![];
decompress(compressed_block.as_slice(), &mut decompressed_block)?;
let block = Arc::new(decompressed_block);
self.cache
.lock()
.unwrap()
.put(checkpoint.start_offset, block.clone());
Ok(block)
Ok(())
}
/// Reads a given document.
@@ -96,15 +74,14 @@ impl StoreReader {
/// It should not be called to score documents
/// for instance.
pub fn get(&self, doc_id: DocId) -> crate::Result<Document> {
let checkpoint = self.block_checkpoint(doc_id).ok_or_else(|| {
crate::TantivyError::InvalidArgument(format!("Failed to lookup Doc #{}.", doc_id))
})?;
let mut cursor = &self.read_block(&checkpoint)?[..];
for _ in checkpoint.start_doc..doc_id {
let (first_doc_id, block_offset) = self.block_offset(doc_id);
self.read_block(block_offset as usize)?;
let current_block_mut = self.current_block.borrow_mut();
let mut cursor = &current_block_mut[..];
for _ in first_doc_id..doc_id {
let doc_length = VInt::deserialize(&mut cursor)?.val() as usize;
cursor = &cursor[doc_length..];
}
let doc_length = VInt::deserialize(&mut cursor)?.val() as usize;
cursor = &cursor[..doc_length];
Ok(Document::deserialize(&mut cursor)?)
@@ -112,93 +89,21 @@ impl StoreReader {
/// Summarize total space usage of this store reader.
pub fn space_usage(&self) -> StoreSpaceUsage {
self.space_usage.clone()
StoreSpaceUsage::new(self.data.len(), self.offset_index_file.len())
}
}
fn split_file(data: FileSlice) -> io::Result<(FileSlice, FileSlice)> {
let (data, footer_len_bytes) = data.split_from_end(size_of::<u64>());
let serialized_offset: OwnedBytes = footer_len_bytes.read_bytes()?;
fn split_file(data: FileSlice) -> io::Result<(FileSlice, FileSlice, DocId)> {
let data_len = data.len();
let footer_offset = data_len - size_of::<u64>() - size_of::<u32>();
let serialized_offset: OwnedBytes = data.slice(footer_offset, data_len).read_bytes()?;
let mut serialized_offset_buf = serialized_offset.as_slice();
let offset = u64::deserialize(&mut serialized_offset_buf)? as usize;
Ok(data.split(offset))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::schema::Document;
use crate::schema::Field;
use crate::{directory::RAMDirectory, store::tests::write_lorem_ipsum_store, Directory};
use std::path::Path;
fn get_text_field<'a>(doc: &'a Document, field: &'a Field) -> Option<&'a str> {
doc.get_first(*field).and_then(|f| f.text())
}
#[test]
fn test_store_lru_cache() -> crate::Result<()> {
let directory = RAMDirectory::create();
let path = Path::new("store");
let writer = directory.open_write(path)?;
let schema = write_lorem_ipsum_store(writer, 500);
let title = schema.get_field("title").unwrap();
let store_file = directory.open_read(path)?;
let store = StoreReader::open(store_file)?;
assert_eq!(store.cache.lock().unwrap().len(), 0);
assert_eq!(store.cache_hits.load(Ordering::SeqCst), 0);
assert_eq!(store.cache_misses.load(Ordering::SeqCst), 0);
let doc = store.get(0)?;
assert_eq!(get_text_field(&doc, &title), Some("Doc 0"));
assert_eq!(store.cache.lock().unwrap().len(), 1);
assert_eq!(store.cache_hits.load(Ordering::SeqCst), 0);
assert_eq!(store.cache_misses.load(Ordering::SeqCst), 1);
assert_eq!(
store
.cache
.lock()
.unwrap()
.peek_lru()
.map(|(&k, _)| k as usize),
Some(0)
);
let doc = store.get(499)?;
assert_eq!(get_text_field(&doc, &title), Some("Doc 499"));
assert_eq!(store.cache.lock().unwrap().len(), 2);
assert_eq!(store.cache_hits.load(Ordering::SeqCst), 0);
assert_eq!(store.cache_misses.load(Ordering::SeqCst), 2);
assert_eq!(
store
.cache
.lock()
.unwrap()
.peek_lru()
.map(|(&k, _)| k as usize),
Some(0)
);
let doc = store.get(0)?;
assert_eq!(get_text_field(&doc, &title), Some("Doc 0"));
assert_eq!(store.cache.lock().unwrap().len(), 2);
assert_eq!(store.cache_hits.load(Ordering::SeqCst), 1);
assert_eq!(store.cache_misses.load(Ordering::SeqCst), 2);
assert_eq!(
store
.cache
.lock()
.unwrap()
.peek_lru()
.map(|(&k, _)| k as usize),
Some(18806)
);
Ok(())
}
let offset = u64::deserialize(&mut serialized_offset_buf)?;
let offset = offset as usize;
let max_doc = u32::deserialize(&mut serialized_offset_buf)?;
Ok((
data.slice(0, offset),
data.slice(offset, footer_offset),
max_doc,
))
}

168
src/store/skiplist/mod.rs Normal file
View File

@@ -0,0 +1,168 @@
#![allow(dead_code)]
mod skiplist;
mod skiplist_builder;
pub use self::skiplist::SkipList;
pub use self::skiplist_builder::SkipListBuilder;
#[cfg(test)]
mod tests {
use super::{SkipList, SkipListBuilder};
#[test]
fn test_skiplist() {
let mut output: Vec<u8> = Vec::new();
let mut skip_list_builder: SkipListBuilder<u32> = SkipListBuilder::new(8);
skip_list_builder.insert(2, &3).unwrap();
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
let mut skip_list: SkipList<'_, u32> = SkipList::from(output.as_slice());
assert_eq!(skip_list.next(), Some((2, 3)));
}
#[test]
fn test_skiplist2() {
let mut output: Vec<u8> = Vec::new();
let skip_list_builder: SkipListBuilder<u32> = SkipListBuilder::new(8);
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
let mut skip_list: SkipList<'_, u32> = SkipList::from(output.as_slice());
assert_eq!(skip_list.next(), None);
}
#[test]
fn test_skiplist3() {
let mut output: Vec<u8> = Vec::new();
let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(2);
skip_list_builder.insert(2, &()).unwrap();
skip_list_builder.insert(3, &()).unwrap();
skip_list_builder.insert(5, &()).unwrap();
skip_list_builder.insert(7, &()).unwrap();
skip_list_builder.insert(9, &()).unwrap();
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
let mut skip_list: SkipList<'_, ()> = SkipList::from(output.as_slice());
assert_eq!(skip_list.next().unwrap(), (2, ()));
assert_eq!(skip_list.next().unwrap(), (3, ()));
assert_eq!(skip_list.next().unwrap(), (5, ()));
assert_eq!(skip_list.next().unwrap(), (7, ()));
assert_eq!(skip_list.next().unwrap(), (9, ()));
assert_eq!(skip_list.next(), None);
}
#[test]
fn test_skiplist4() {
let mut output: Vec<u8> = Vec::new();
let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(2);
skip_list_builder.insert(2, &()).unwrap();
skip_list_builder.insert(3, &()).unwrap();
skip_list_builder.insert(5, &()).unwrap();
skip_list_builder.insert(7, &()).unwrap();
skip_list_builder.insert(9, &()).unwrap();
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
let mut skip_list: SkipList<'_, ()> = SkipList::from(output.as_slice());
assert_eq!(skip_list.next().unwrap(), (2, ()));
skip_list.seek(5);
assert_eq!(skip_list.next().unwrap(), (5, ()));
assert_eq!(skip_list.next().unwrap(), (7, ()));
assert_eq!(skip_list.next().unwrap(), (9, ()));
assert_eq!(skip_list.next(), None);
}
#[test]
fn test_skiplist5() {
let mut output: Vec<u8> = Vec::new();
let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(4);
skip_list_builder.insert(2, &()).unwrap();
skip_list_builder.insert(3, &()).unwrap();
skip_list_builder.insert(5, &()).unwrap();
skip_list_builder.insert(6, &()).unwrap();
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
let mut skip_list: SkipList<'_, ()> = SkipList::from(output.as_slice());
assert_eq!(skip_list.next().unwrap(), (2, ()));
skip_list.seek(6);
assert_eq!(skip_list.next().unwrap(), (6, ()));
assert_eq!(skip_list.next(), None);
}
#[test]
fn test_skiplist6() {
let mut output: Vec<u8> = Vec::new();
let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(2);
skip_list_builder.insert(2, &()).unwrap();
skip_list_builder.insert(3, &()).unwrap();
skip_list_builder.insert(5, &()).unwrap();
skip_list_builder.insert(7, &()).unwrap();
skip_list_builder.insert(9, &()).unwrap();
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
let mut skip_list: SkipList<'_, ()> = SkipList::from(output.as_slice());
assert_eq!(skip_list.next().unwrap(), (2, ()));
skip_list.seek(10);
assert_eq!(skip_list.next(), None);
}
#[test]
fn test_skiplist7() {
let mut output: Vec<u8> = Vec::new();
let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(4);
for i in 0..1000 {
skip_list_builder.insert(i, &()).unwrap();
}
skip_list_builder.insert(1004, &()).unwrap();
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
let mut skip_list: SkipList<'_, ()> = SkipList::from(output.as_slice());
assert_eq!(skip_list.next().unwrap(), (0, ()));
skip_list.seek(431);
assert_eq!(skip_list.next().unwrap(), (431, ()));
skip_list.seek(1003);
assert_eq!(skip_list.next().unwrap(), (1004, ()));
assert_eq!(skip_list.next(), None);
}
#[test]
fn test_skiplist8() {
let mut output: Vec<u8> = Vec::new();
let mut skip_list_builder: SkipListBuilder<u64> = SkipListBuilder::new(8);
skip_list_builder.insert(2, &3).unwrap();
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
assert_eq!(output.len(), 11);
assert_eq!(output[0], 1u8 + 128u8);
}
#[test]
fn test_skiplist9() {
let mut output: Vec<u8> = Vec::new();
let mut skip_list_builder: SkipListBuilder<u64> = SkipListBuilder::new(4);
for i in 0..4 * 4 * 4 {
skip_list_builder.insert(i, &i).unwrap();
}
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
assert_eq!(output.len(), 774);
assert_eq!(output[0], 4u8 + 128u8);
}
#[test]
fn test_skiplist10() {
// checking that void gets serialized to nothing.
let mut output: Vec<u8> = Vec::new();
let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(4);
for i in 0..((4 * 4 * 4) - 1) {
skip_list_builder.insert(i, &()).unwrap();
}
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
assert_eq!(output.len(), 230);
assert_eq!(output[0], 128u8 + 3u8);
}
#[test]
fn test_skiplist11() {
// checking that void gets serialized to nothing.
let mut output: Vec<u8> = Vec::new();
let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(4);
for i in 0..(4 * 4) {
skip_list_builder.insert(i, &()).unwrap();
}
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
assert_eq!(output.len(), 65);
assert_eq!(output[0], 128u8 + 3u8);
}
}

View File

@@ -0,0 +1,133 @@
use crate::common::{BinarySerializable, VInt};
use std::cmp::max;
use std::marker::PhantomData;
static EMPTY: [u8; 0] = [];
struct Layer<'a, T> {
data: &'a [u8],
cursor: &'a [u8],
next_id: Option<u64>,
_phantom_: PhantomData<T>,
}
impl<'a, T: BinarySerializable> Iterator for Layer<'a, T> {
type Item = (u64, T);
fn next(&mut self) -> Option<(u64, T)> {
if let Some(cur_id) = self.next_id {
let cur_val = T::deserialize(&mut self.cursor).unwrap();
self.next_id = VInt::deserialize_u64(&mut self.cursor).ok();
Some((cur_id, cur_val))
} else {
None
}
}
}
impl<'a, T: BinarySerializable> From<&'a [u8]> for Layer<'a, T> {
fn from(data: &'a [u8]) -> Layer<'a, T> {
let mut cursor = data;
let next_id = VInt::deserialize_u64(&mut cursor).ok();
Layer {
data,
cursor,
next_id,
_phantom_: PhantomData,
}
}
}
impl<'a, T: BinarySerializable> Layer<'a, T> {
fn empty() -> Layer<'a, T> {
Layer {
data: &EMPTY,
cursor: &EMPTY,
next_id: None,
_phantom_: PhantomData,
}
}
fn seek_offset(&mut self, offset: usize) {
self.cursor = &self.data[offset..];
self.next_id = VInt::deserialize_u64(&mut self.cursor).ok();
}
// Returns the last element (key, val)
// such that (key < doc_id)
//
// If there is no such element anymore,
// returns None.
//
// If the element exists, it will be returned
// at the next call to `.next()`.
fn seek(&mut self, key: u64) -> Option<(u64, T)> {
let mut result: Option<(u64, T)> = None;
loop {
if let Some(next_id) = self.next_id {
if next_id < key {
if let Some(v) = self.next() {
result = Some(v);
continue;
}
}
}
return result;
}
}
}
pub struct SkipList<'a, T: BinarySerializable> {
data_layer: Layer<'a, T>,
skip_layers: Vec<Layer<'a, u64>>,
}
impl<'a, T: BinarySerializable> Iterator for SkipList<'a, T> {
type Item = (u64, T);
fn next(&mut self) -> Option<(u64, T)> {
self.data_layer.next()
}
}
impl<'a, T: BinarySerializable> SkipList<'a, T> {
pub fn seek(&mut self, key: u64) -> Option<(u64, T)> {
let mut next_layer_skip: Option<(u64, u64)> = None;
for skip_layer in &mut self.skip_layers {
if let Some((_, offset)) = next_layer_skip {
skip_layer.seek_offset(offset as usize);
}
next_layer_skip = skip_layer.seek(key);
}
if let Some((_, offset)) = next_layer_skip {
self.data_layer.seek_offset(offset as usize);
}
self.data_layer.seek(key)
}
}
impl<'a, T: BinarySerializable> From<&'a [u8]> for SkipList<'a, T> {
fn from(mut data: &'a [u8]) -> SkipList<'a, T> {
let offsets: Vec<u64> = Vec::<VInt>::deserialize(&mut data)
.unwrap()
.into_iter()
.map(|el| el.0)
.collect();
let num_layers = offsets.len();
let layers_data: &[u8] = data;
let data_layer: Layer<'a, T> = if num_layers == 0 {
Layer::empty()
} else {
let first_layer_data: &[u8] = &layers_data[..offsets[0] as usize];
Layer::from(first_layer_data)
};
let skip_layers = (0..max(1, num_layers) - 1)
.map(|i| (offsets[i] as usize, offsets[i + 1] as usize))
.map(|(start, stop)| Layer::from(&layers_data[start..stop]))
.collect();
SkipList {
skip_layers,
data_layer,
}
}
}

View File

@@ -0,0 +1,98 @@
use crate::common::{is_power_of_2, BinarySerializable, VInt};
use std::io;
use std::io::Write;
use std::marker::PhantomData;
struct LayerBuilder<T: BinarySerializable> {
period_mask: usize,
buffer: Vec<u8>,
len: usize,
_phantom_: PhantomData<T>,
}
impl<T: BinarySerializable> LayerBuilder<T> {
fn written_size(&self) -> usize {
self.buffer.len()
}
fn write(&self, output: &mut dyn Write) -> Result<(), io::Error> {
output.write_all(&self.buffer)?;
Ok(())
}
fn with_period(period: usize) -> LayerBuilder<T> {
assert!(is_power_of_2(period), "The period has to be a power of 2.");
LayerBuilder {
period_mask: (period - 1),
buffer: Vec::new(),
len: 0,
_phantom_: PhantomData,
}
}
fn insert(&mut self, key: u64, value: &T) -> io::Result<Option<(u64, u64)>> {
self.len += 1;
let offset = self.written_size() as u64;
VInt(key).serialize_into_vec(&mut self.buffer);
value.serialize(&mut self.buffer)?;
let emit_skip_info = (self.period_mask & self.len) == 0;
if emit_skip_info {
Ok(Some((key, offset)))
} else {
Ok(None)
}
}
}
pub struct SkipListBuilder<T: BinarySerializable> {
period: usize,
data_layer: LayerBuilder<T>,
skip_layers: Vec<LayerBuilder<u64>>,
}
impl<T: BinarySerializable> SkipListBuilder<T> {
pub fn new(period: usize) -> SkipListBuilder<T> {
SkipListBuilder {
period,
data_layer: LayerBuilder::with_period(period),
skip_layers: Vec::new(),
}
}
fn get_skip_layer(&mut self, layer_id: usize) -> &mut LayerBuilder<u64> {
if layer_id == self.skip_layers.len() {
let layer_builder = LayerBuilder::with_period(self.period);
self.skip_layers.push(layer_builder);
}
&mut self.skip_layers[layer_id]
}
pub fn insert(&mut self, key: u64, dest: &T) -> io::Result<()> {
let mut skip_pointer = self.data_layer.insert(key, dest)?;
for layer_id in 0.. {
if let Some((skip_doc_id, skip_offset)) = skip_pointer {
skip_pointer = self
.get_skip_layer(layer_id)
.insert(skip_doc_id, &skip_offset)?;
} else {
break;
}
}
Ok(())
}
pub fn write<W: Write>(self, output: &mut W) -> io::Result<()> {
let mut size: u64 = self.data_layer.buffer.len() as u64;
let mut layer_sizes = vec![VInt(size)];
for layer in self.skip_layers.iter().rev() {
size += layer.buffer.len() as u64;
layer_sizes.push(VInt(size));
}
layer_sizes.serialize(output)?;
self.data_layer.write(output)?;
for layer in self.skip_layers.iter().rev() {
layer.write(output)?;
}
Ok(())
}
}

View File

@@ -1,12 +1,11 @@
use super::compress;
use super::index::SkipIndexBuilder;
use super::skiplist::SkipListBuilder;
use super::StoreReader;
use crate::common::CountingWriter;
use crate::common::{BinarySerializable, VInt};
use crate::directory::TerminatingWrite;
use crate::directory::WritePtr;
use crate::schema::Document;
use crate::store::index::Checkpoint;
use crate::DocId;
use std::io::{self, Write};
@@ -22,8 +21,7 @@ const BLOCK_SIZE: usize = 16_384;
///
pub struct StoreWriter {
doc: DocId,
first_doc_in_block: DocId,
offset_index_writer: SkipIndexBuilder,
offset_index_writer: SkipListBuilder<u64>,
writer: CountingWriter<WritePtr>,
intermediary_buffer: Vec<u8>,
current_block: Vec<u8>,
@@ -37,8 +35,7 @@ impl StoreWriter {
pub fn new(writer: WritePtr) -> StoreWriter {
StoreWriter {
doc: 0,
first_doc_in_block: 0,
offset_index_writer: SkipIndexBuilder::new(),
offset_index_writer: SkipListBuilder::new(4),
writer: CountingWriter::wrap(writer),
intermediary_buffer: Vec::new(),
current_block: Vec::new(),
@@ -71,9 +68,11 @@ impl StoreWriter {
pub fn stack(&mut self, store_reader: &StoreReader) -> io::Result<()> {
if !self.current_block.is_empty() {
self.write_and_compress_block()?;
self.offset_index_writer
.insert(u64::from(self.doc), &(self.writer.written_bytes() as u64))?;
}
let doc_shift = self.doc;
let start_shift = self.writer.written_bytes() as u64;
let doc_offset = self.doc;
let start_offset = self.writer.written_bytes() as u64;
// just bulk write all of the block of the given reader.
self.writer
@@ -81,33 +80,22 @@ impl StoreWriter {
// concatenate the index of the `store_reader`, after translating
// its start doc id and its start file offset.
for mut checkpoint in store_reader.block_checkpoints() {
checkpoint.start_doc += doc_shift;
checkpoint.end_doc += doc_shift;
checkpoint.start_offset += start_shift;
checkpoint.end_offset += start_shift;
self.offset_index_writer.insert(checkpoint);
self.doc = checkpoint.end_doc;
for (next_doc_id, block_addr) in store_reader.block_index() {
self.doc = doc_offset + next_doc_id as u32;
self.offset_index_writer
.insert(u64::from(self.doc), &(start_offset + block_addr))?;
}
Ok(())
}
fn write_and_compress_block(&mut self) -> io::Result<()> {
assert!(self.doc > 0);
self.intermediary_buffer.clear();
compress(&self.current_block[..], &mut self.intermediary_buffer)?;
let start_offset = self.writer.written_bytes();
(self.intermediary_buffer.len() as u32).serialize(&mut self.writer)?;
self.writer.write_all(&self.intermediary_buffer)?;
let end_offset = self.writer.written_bytes();
let end_doc = self.doc;
self.offset_index_writer.insert(Checkpoint {
start_doc: self.first_doc_in_block,
end_doc,
start_offset,
end_offset,
});
self.offset_index_writer
.insert(u64::from(self.doc), &(self.writer.written_bytes() as u64))?;
self.current_block.clear();
self.first_doc_in_block = self.doc;
Ok(())
}
@@ -122,6 +110,7 @@ impl StoreWriter {
let header_offset: u64 = self.writer.written_bytes() as u64;
self.offset_index_writer.write(&mut self.writer)?;
header_offset.serialize(&mut self.writer)?;
self.doc.serialize(&mut self.writer)?;
self.writer.terminate()
}
}

View File

@@ -60,10 +60,12 @@ impl<'a> TermMerger<'a> {
pub(crate) fn matching_segments<'b: 'a>(
&'b self,
) -> impl 'b + Iterator<Item = (usize, TermOrdinal)> {
self.current_streamers
.iter()
.map(|heap_item| (heap_item.segment_ord, heap_item.streamer.term_ord()))
) -> Box<dyn 'b + Iterator<Item = (usize, TermOrdinal)>> {
Box::new(
self.current_streamers
.iter()
.map(|heap_item| (heap_item.segment_ord, heap_item.streamer.term_ord())),
)
}
fn advance_segments(&mut self) {

View File

@@ -44,13 +44,11 @@ mod tests {
const BLOCK_SIZE: usize = 1_500;
fn make_term_info(term_ord: u64) -> TermInfo {
let offset = |term_ord: u64| term_ord * 100 + term_ord * term_ord;
fn make_term_info(val: u64) -> TermInfo {
TermInfo {
doc_freq: term_ord as u32,
postings_start_offset: offset(term_ord),
postings_stop_offset: offset(term_ord + 1),
positions_idx: offset(term_ord) * 2u64,
doc_freq: val as u32,
positions_idx: val * 2u64,
postings_offset: val * 3u64,
}
}
@@ -199,7 +197,7 @@ mod tests {
// term requires more than 16bits
term_dictionary_builder.insert("abcdefghijklmnopqrstuvwxy", &make_term_info(1))?;
term_dictionary_builder.insert("abcdefghijklmnopqrstuvwxyz", &make_term_info(2))?;
term_dictionary_builder.insert("abr", &make_term_info(3))?;
term_dictionary_builder.insert("abr", &make_term_info(2))?;
term_dictionary_builder.finish()?
};
let term_dict_file = FileSlice::from(buffer);
@@ -213,7 +211,6 @@ mod tests {
assert_eq!(kv_stream.value(), &make_term_info(2));
assert!(kv_stream.advance());
assert_eq!(kv_stream.key(), "abr".as_bytes());
assert_eq!(kv_stream.value(), &make_term_info(3));
assert!(!kv_stream.advance());
Ok(())
}

View File

@@ -55,32 +55,22 @@ impl TermInfoBlockMeta {
self.doc_freq_nbits + self.postings_offset_nbits + self.positions_idx_nbits
}
// Here inner_offset is the offset within the block, WITHOUT the first term_info.
// In other word, term_info #1,#2,#3 gets inner_offset 0,1,2... While term_info #0
// is encoded without bitpacking.
fn deserialize_term_info(&self, data: &[u8], inner_offset: usize) -> TermInfo {
assert!(inner_offset < BLOCK_LEN - 1);
let num_bits = self.num_bits() as usize;
let mut cursor = num_bits * inner_offset;
let posting_start_addr = num_bits * inner_offset;
// the stop offset is the start offset of the next term info.
let posting_stop_addr = posting_start_addr + num_bits;
let doc_freq_addr = posting_start_addr + self.postings_offset_nbits as usize;
let positions_idx_addr = doc_freq_addr + self.doc_freq_nbits as usize;
let doc_freq = extract_bits(data, cursor, self.doc_freq_nbits) as u32;
cursor += self.doc_freq_nbits as usize;
let postings_start_offset = self.ref_term_info.postings_start_offset
+ extract_bits(data, posting_start_addr, self.postings_offset_nbits);
let postings_stop_offset = self.ref_term_info.postings_start_offset
+ extract_bits(data, posting_stop_addr, self.postings_offset_nbits);
let doc_freq = extract_bits(data, doc_freq_addr, self.doc_freq_nbits) as u32;
let positions_idx = self.ref_term_info.positions_idx
+ extract_bits(data, positions_idx_addr, self.positions_idx_nbits);
let postings_offset = extract_bits(data, cursor, self.postings_offset_nbits);
cursor += self.postings_offset_nbits as usize;
let positions_idx = extract_bits(data, cursor, self.positions_idx_nbits);
TermInfo {
doc_freq,
postings_start_offset,
postings_stop_offset,
positions_idx,
postings_offset: postings_offset + self.ref_term_info.postings_offset,
positions_idx: positions_idx + self.ref_term_info.positions_idx,
}
}
}
@@ -162,17 +152,16 @@ fn bitpack_serialize<W: Write>(
term_info_block_meta: &TermInfoBlockMeta,
term_info: &TermInfo,
) -> io::Result<()> {
bit_packer.write(
term_info.postings_start_offset,
term_info_block_meta.postings_offset_nbits,
write,
)?;
bit_packer.write(
u64::from(term_info.doc_freq),
term_info_block_meta.doc_freq_nbits,
write,
)?;
bit_packer.write(
term_info.postings_offset,
term_info_block_meta.postings_offset_nbits,
write,
)?;
bit_packer.write(
term_info.positions_idx,
term_info_block_meta.positions_idx_nbits,
@@ -192,27 +181,23 @@ impl TermInfoStoreWriter {
}
fn flush_block(&mut self) -> io::Result<()> {
if self.term_infos.is_empty() {
return Ok(());
}
let mut bit_packer = BitPacker::new();
let ref_term_info = self.term_infos[0].clone();
let last_term_info = if let Some(last_term_info) = self.term_infos.last().cloned() {
last_term_info
} else {
return Ok(());
};
let postings_stop_offset =
last_term_info.postings_stop_offset - ref_term_info.postings_start_offset;
for term_info in &mut self.term_infos[1..] {
term_info.postings_start_offset -= ref_term_info.postings_start_offset;
term_info.postings_offset -= ref_term_info.postings_offset;
term_info.positions_idx -= ref_term_info.positions_idx;
}
let mut max_doc_freq: u32 = 0u32;
let max_postings_offset: u64 = postings_stop_offset;
let max_positions_idx: u64 = last_term_info.positions_idx;
let mut max_postings_offset: u64 = 0u64;
let mut max_positions_idx: u64 = 0u64;
for term_info in &self.term_infos[1..] {
max_doc_freq = cmp::max(max_doc_freq, term_info.doc_freq);
max_postings_offset = cmp::max(max_postings_offset, term_info.postings_offset);
max_positions_idx = cmp::max(max_positions_idx, term_info.positions_idx);
}
let max_doc_freq_nbits: u8 = compute_num_bits(u64::from(max_doc_freq));
@@ -237,12 +222,6 @@ impl TermInfoStoreWriter {
)?;
}
bit_packer.write(
postings_stop_offset,
term_info_block_meta.postings_offset_nbits,
&mut self.buffer_term_infos,
)?;
// Block need end up at the end of a byte.
bit_packer.flush(&mut self.buffer_term_infos)?;
self.term_infos.clear();
@@ -251,7 +230,6 @@ impl TermInfoStoreWriter {
}
pub fn write_term_info(&mut self, term_info: &TermInfo) -> io::Result<()> {
assert!(term_info.postings_stop_offset >= term_info.postings_start_offset);
self.num_terms += 1u64;
self.term_infos.push(term_info.clone());
if self.term_infos.len() >= BLOCK_LEN {
@@ -311,11 +289,10 @@ mod tests {
#[test]
fn test_term_info_block_meta_serialization() {
let term_info_block_meta = TermInfoBlockMeta {
offset: 2009u64,
offset: 2009,
ref_term_info: TermInfo {
doc_freq: 512,
postings_start_offset: 51,
postings_stop_offset: 57u64,
postings_offset: 51,
positions_idx: 3584,
},
doc_freq_nbits: 10,
@@ -333,12 +310,10 @@ mod tests {
fn test_pack() -> crate::Result<()> {
let mut store_writer = TermInfoStoreWriter::new();
let mut term_infos = vec![];
let offset = |i| (i * 13 + i * i) as u64;
for i in 0..1000 {
let term_info = TermInfo {
doc_freq: i as u32,
postings_start_offset: offset(i),
postings_stop_offset: offset(i + 1),
postings_offset: (i / 10) as u64,
positions_idx: (i * 7) as u64,
};
store_writer.write_term_info(&term_info)?;
@@ -348,12 +323,7 @@ mod tests {
store_writer.serialize(&mut buffer)?;
let term_info_store = TermInfoStore::open(FileSlice::from(buffer))?;
for i in 0..1000 {
assert_eq!(
term_info_store.get(i as u64),
term_infos[i],
"term info {}",
i
);
assert_eq!(term_info_store.get(i as u64), term_infos[i]);
}
Ok(())
}

View File

@@ -18,7 +18,7 @@ fn test_failpoints_managed_directory_gc_if_delete_fails() {
.unwrap()
.terminate()
.unwrap();
assert!(managed_directory.exists(test_path).unwrap());
assert!(managed_directory.exists(test_path));
// triggering gc and setting the delete operation to fail.
//
// We are checking that the gc operation is not removing the
@@ -29,12 +29,12 @@ fn test_failpoints_managed_directory_gc_if_delete_fails() {
// lock file.
fail::cfg("RAMDirectory::delete", "1*off->1*return").unwrap();
assert!(managed_directory.garbage_collect(Default::default).is_ok());
assert!(managed_directory.exists(test_path).unwrap());
assert!(managed_directory.exists(test_path));
// running the gc a second time should remove the file.
assert!(managed_directory.garbage_collect(Default::default).is_ok());
assert!(
!managed_directory.exists(test_path).unwrap(),
!managed_directory.exists(test_path),
"The file should have been deleted"
);
}