Compare commits

..

1 Commits

Author SHA1 Message Date
Paul Masurel
7cb018c640 Add an option to opt out fieldnorms for indexed fields.
Closes #922
2020-11-03 16:15:20 +09:00
85 changed files with 1571 additions and 4149 deletions

View File

@@ -7,8 +7,7 @@ Tantivy 0.14.0
- Added support for Brotli compression in the DocStore. (@ppodolsky)
- Added helper for building intersections and unions in BooleanQuery (@guilload)
- Bugfix in `Query::explain`
- Removed dependency on `notify` #924. Replaced with `FileWatcher` struct that polls meta file every 500ms in background thread. (@halvorboe @guilload)
- Added `FilterCollector`, which wraps another collector and filters docs using a predicate over a fast field (@barrotsteindev)
- Making it possible to opt out the generation of fieldnorms information for indexed fields. This change breaks compatibility as the meta.json file format is slightly changed. (#922, @pmasurel)
Tantivy 0.13.2
===================

View File

@@ -26,11 +26,11 @@ snap = "1"
tempfile = {version="3", optional=true}
log = "0.4"
serde = {version="1", features=["derive"]}
serde_cbor = "0.11"
serde_json = "1"
num_cpus = "1"
fs2={version="0.4", optional=true}
levenshtein_automata = "0.2"
notify = {version="4", optional=true}
uuid = { version = "0.8", features = ["v4", "serde"] }
crossbeam = "0.8"
futures = {version = "0.3", features=["thread-pool"] }
@@ -48,7 +48,6 @@ murmurhash32 = "0.2"
chrono = "0.4"
smallvec = "1"
rayon = "1"
lru = "0.6"
[target.'cfg(windows)'.dependencies]
winapi = "0.3"
@@ -74,7 +73,7 @@ overflow-checks = true
[features]
default = ["mmap"]
mmap = ["fs2", "tempfile", "memmap"]
mmap = ["fs2", "tempfile", "memmap", "notify"]
brotli-compression = ["brotli"]
lz4-compression = ["lz4"]
failpoints = ["fail/failpoints"]

View File

@@ -61,7 +61,7 @@ fn main() -> tantivy::Result<()> {
let query_ords: HashSet<u64> = facets
.iter()
.filter_map(|key| facet_dict.term_ord(key.encoded_str()).unwrap())
.filter_map(|key| facet_dict.term_ord(key.encoded_str()))
.collect();
let mut facet_ords_buffer: Vec<u64> = Vec::with_capacity(20);

View File

@@ -274,7 +274,7 @@ impl Collector for FacetCollector {
let mut collapse_facet_it = self.facets.iter().peekable();
collapse_facet_ords.push(0);
{
let mut facet_streamer = facet_reader.facet_dict().range().into_stream()?;
let mut facet_streamer = facet_reader.facet_dict().range().into_stream();
if facet_streamer.advance() {
'outer: loop {
// at the begining of this loop, facet_streamer
@@ -368,12 +368,9 @@ impl SegmentCollector for FacetSegmentCollector {
}
let mut facet = vec![];
let facet_ord = self.collapse_facet_ords[collapsed_facet_ord];
// TODO handle errors.
if facet_dict.ord_to_term(facet_ord as u64, &mut facet).is_ok() {
if let Ok(facet) = Facet::from_encoded(facet) {
facet_counts.insert(facet, count);
}
}
facet_dict.ord_to_term(facet_ord as u64, &mut facet);
// TODO
facet_counts.insert(Facet::from_encoded(facet).unwrap(), count);
}
FacetCounts { facet_counts }
}

View File

@@ -1,158 +0,0 @@
// # Custom collector example
//
// This example shows how you can implement your own
// collector. As an example, we will compute a collector
// that computes the standard deviation of a given fast field.
//
// Of course, you can have a look at the tantivy's built-in collectors
// such as the `CountCollector` for more examples.
// ---
// Importing tantivy...
use crate::collector::{Collector, SegmentCollector};
use crate::fastfield::FastFieldReader;
use crate::schema::Field;
use crate::{Score, SegmentReader, TantivyError};
/// The `FilterCollector` collector filters docs using a u64 fast field value and a predicate.
/// Only the documents for which the predicate returned "true" will be passed on to the next collector.
///
/// ```rust
/// use tantivy::collector::{TopDocs, FilterCollector};
/// use tantivy::query::QueryParser;
/// use tantivy::schema::{Schema, TEXT, INDEXED, FAST};
/// use tantivy::{doc, DocAddress, Index};
///
/// let mut schema_builder = Schema::builder();
/// let title = schema_builder.add_text_field("title", TEXT);
/// let price = schema_builder.add_u64_field("price", INDEXED | FAST);
/// let schema = schema_builder.build();
/// let index = Index::create_in_ram(schema);
///
/// let mut index_writer = index.writer_with_num_threads(1, 10_000_000).unwrap();
/// index_writer.add_document(doc!(title => "The Name of the Wind", price => 30_200u64));
/// index_writer.add_document(doc!(title => "The Diary of Muadib", price => 29_240u64));
/// index_writer.add_document(doc!(title => "A Dairy Cow", price => 21_240u64));
/// index_writer.add_document(doc!(title => "The Diary of a Young Girl", price => 20_120u64));
/// assert!(index_writer.commit().is_ok());
///
/// let reader = index.reader().unwrap();
/// let searcher = reader.searcher();
///
/// let query_parser = QueryParser::for_index(&index, vec![title]);
/// let query = query_parser.parse_query("diary").unwrap();
/// let no_filter_collector = FilterCollector::new(price, &|value| value > 20_120u64, TopDocs::with_limit(2));
/// let top_docs = searcher.search(&query, &no_filter_collector).unwrap();
///
/// assert_eq!(top_docs.len(), 1);
/// assert_eq!(top_docs[0].1, DocAddress(0, 1));
///
/// let filter_all_collector = FilterCollector::new(price, &|value| value < 5u64, TopDocs::with_limit(2));
/// let filtered_top_docs = searcher.search(&query, &filter_all_collector).unwrap();
///
/// assert_eq!(filtered_top_docs.len(), 0);
/// ```
pub struct FilterCollector<TCollector, TPredicate>
where
TPredicate: 'static,
{
field: Field,
collector: TCollector,
predicate: &'static TPredicate,
}
impl<TCollector, TPredicate> FilterCollector<TCollector, TPredicate>
where
TCollector: Collector + Send + Sync,
TPredicate: Fn(u64) -> bool + Send + Sync,
{
/// Create a new FilterCollector.
pub fn new(
field: Field,
predicate: &'static TPredicate,
collector: TCollector,
) -> FilterCollector<TCollector, TPredicate> {
FilterCollector {
field,
predicate,
collector,
}
}
}
impl<TCollector, TPredicate> Collector for FilterCollector<TCollector, TPredicate>
where
TCollector: Collector + Send + Sync,
TPredicate: 'static + Fn(u64) -> bool + Send + Sync,
{
// That's the type of our result.
// Our standard deviation will be a float.
type Fruit = TCollector::Fruit;
type Child = FilterSegmentCollector<TCollector::Child, TPredicate>;
fn for_segment(
&self,
segment_local_id: u32,
segment_reader: &SegmentReader,
) -> crate::Result<FilterSegmentCollector<TCollector::Child, TPredicate>> {
let fast_field_reader = segment_reader
.fast_fields()
.u64(self.field)
.ok_or_else(|| {
let field_name = segment_reader.schema().get_field_name(self.field);
TantivyError::SchemaError(format!(
"Field {:?} is not a u64 fast field.",
field_name
))
})?;
let segment_collector = self
.collector
.for_segment(segment_local_id, segment_reader)?;
Ok(FilterSegmentCollector {
fast_field_reader,
segment_collector,
predicate: self.predicate,
})
}
fn requires_scoring(&self) -> bool {
self.collector.requires_scoring()
}
fn merge_fruits(
&self,
segment_fruits: Vec<<TCollector::Child as SegmentCollector>::Fruit>,
) -> crate::Result<TCollector::Fruit> {
self.collector.merge_fruits(segment_fruits)
}
}
pub struct FilterSegmentCollector<TSegmentCollector, TPredicate>
where
TPredicate: 'static,
{
fast_field_reader: FastFieldReader<u64>,
segment_collector: TSegmentCollector,
predicate: &'static TPredicate,
}
impl<TSegmentCollector, TPredicate> SegmentCollector
for FilterSegmentCollector<TSegmentCollector, TPredicate>
where
TSegmentCollector: SegmentCollector,
TPredicate: 'static + Fn(u64) -> bool + Send + Sync,
{
type Fruit = TSegmentCollector::Fruit;
fn collect(&mut self, doc: u32, score: Score) {
let value = self.fast_field_reader.get(doc);
if (self.predicate)(value) {
self.segment_collector.collect(doc, score)
}
}
fn harvest(self) -> <TSegmentCollector as SegmentCollector>::Fruit {
self.segment_collector.harvest()
}
}

View File

@@ -114,9 +114,6 @@ use crate::query::Weight;
mod docset_collector;
pub use self::docset_collector::DocSetCollector;
mod filter_collector_wrapper;
pub use self::filter_collector_wrapper::FilterCollector;
/// `Fruit` is the type for the result of our collection.
/// e.g. `usize` for the `Count` collector.
pub trait Fruit: Send + downcast_rs::Downcast {}

View File

@@ -728,7 +728,7 @@ mod tests {
}
#[test]
fn test_top_collector_not_at_capacity_without_offset() {
fn test_top_collector_not_at_capacity() {
let index = make_index();
let field = index.schema().get_field("text").unwrap();
let query_parser = QueryParser::for_index(&index, vec![field]);

View File

@@ -20,10 +20,9 @@ impl<W: Write> CountingWriter<W> {
self.written_bytes
}
/// Returns the underlying write object.
/// Note that this method does not trigger any flushing.
pub fn finish(self) -> W {
self.underlying
pub fn finish(mut self) -> io::Result<(W, u64)> {
self.flush()?;
Ok((self.underlying, self.written_bytes))
}
}
@@ -47,6 +46,7 @@ impl<W: Write> Write for CountingWriter<W> {
impl<W: TerminatingWrite> TerminatingWrite for CountingWriter<W> {
fn terminate_ref(&mut self, token: AntiCallToken) -> io::Result<()> {
self.flush()?;
self.underlying.terminate_ref(token)
}
}
@@ -63,9 +63,8 @@ mod test {
let mut counting_writer = CountingWriter::wrap(buffer);
let bytes = (0u8..10u8).collect::<Vec<u8>>();
counting_writer.write_all(&bytes).unwrap();
let len = counting_writer.written_bytes();
let buffer_restituted: Vec<u8> = counting_writer.finish();
let (w, len): (Vec<u8>, u64) = counting_writer.finish().unwrap();
assert_eq!(len, 10u64);
assert_eq!(buffer_restituted.len(), 10);
assert_eq!(w.len(), 10);
}
}

View File

@@ -66,6 +66,10 @@ pub(crate) fn compute_num_bits(n: u64) -> u8 {
}
}
pub(crate) fn is_power_of_2(n: usize) -> bool {
(n > 0) && (n & (n - 1) == 0)
}
/// Has length trait
pub trait HasLen {
/// Return length

View File

@@ -5,7 +5,6 @@ use crate::core::SegmentId;
use crate::core::SegmentMeta;
use crate::core::SegmentMetaInventory;
use crate::core::META_FILEPATH;
use crate::directory::error::OpenReadError;
use crate::directory::ManagedDirectory;
#[cfg(feature = "mmap")]
use crate::directory::MmapDirectory;
@@ -60,7 +59,7 @@ impl Index {
/// Examines the directory to see if it contains an index.
///
/// Effectively, it only checks for the presence of the `meta.json` file.
pub fn exists<Dir: Directory>(dir: &Dir) -> Result<bool, OpenReadError> {
pub fn exists<Dir: Directory>(dir: &Dir) -> bool {
dir.exists(&META_FILEPATH)
}
@@ -107,7 +106,7 @@ impl Index {
schema: Schema,
) -> crate::Result<Index> {
let mmap_directory = MmapDirectory::open(directory_path)?;
if Index::exists(&mmap_directory)? {
if Index::exists(&mmap_directory) {
return Err(TantivyError::IndexAlreadyExists);
}
Index::create(mmap_directory, schema)
@@ -115,7 +114,7 @@ impl Index {
/// Opens or creates a new index in the provided directory
pub fn open_or_create<Dir: Directory>(dir: Dir, schema: Schema) -> crate::Result<Index> {
if !Index::exists(&dir)? {
if !Index::exists(&dir) {
return Index::create(dir, schema);
}
let index = Index::open(dir)?;
@@ -400,7 +399,7 @@ impl fmt::Debug for Index {
#[cfg(test)]
mod tests {
use crate::directory::{RAMDirectory, WatchCallback};
use crate::directory::RAMDirectory;
use crate::schema::Field;
use crate::schema::{Schema, INDEXED, TEXT};
use crate::IndexReader;
@@ -424,24 +423,24 @@ mod tests {
#[test]
fn test_index_exists() {
let directory = RAMDirectory::create();
assert!(!Index::exists(&directory).unwrap());
assert!(!Index::exists(&directory));
assert!(Index::create(directory.clone(), throw_away_schema()).is_ok());
assert!(Index::exists(&directory).unwrap());
assert!(Index::exists(&directory));
}
#[test]
fn open_or_create_should_create() {
let directory = RAMDirectory::create();
assert!(!Index::exists(&directory).unwrap());
assert!(!Index::exists(&directory));
assert!(Index::open_or_create(directory.clone(), throw_away_schema()).is_ok());
assert!(Index::exists(&directory).unwrap());
assert!(Index::exists(&directory));
}
#[test]
fn open_or_create_should_open() {
let directory = RAMDirectory::create();
assert!(Index::create(directory.clone(), throw_away_schema()).is_ok());
assert!(Index::exists(&directory).unwrap());
assert!(Index::exists(&directory));
assert!(Index::open_or_create(directory, throw_away_schema()).is_ok());
}
@@ -449,7 +448,7 @@ mod tests {
fn create_should_wipeoff_existing() {
let directory = RAMDirectory::create();
assert!(Index::create(directory.clone(), throw_away_schema()).is_ok());
assert!(Index::exists(&directory).unwrap());
assert!(Index::exists(&directory));
assert!(Index::create(directory.clone(), Schema::builder().build()).is_ok());
}
@@ -457,7 +456,7 @@ mod tests {
fn open_or_create_exists_but_schema_does_not_match() {
let directory = RAMDirectory::create();
assert!(Index::create(directory.clone(), throw_away_schema()).is_ok());
assert!(Index::exists(&directory).unwrap());
assert!(Index::exists(&directory));
assert!(Index::open_or_create(directory.clone(), throw_away_schema()).is_ok());
let err = Index::open_or_create(directory, Schema::builder().build());
assert_eq!(
@@ -511,28 +510,28 @@ mod tests {
}
#[test]
fn test_index_manual_policy_mmap() -> crate::Result<()> {
fn test_index_manual_policy_mmap() {
let schema = throw_away_schema();
let field = schema.get_field("num_likes").unwrap();
let mut index = Index::create_from_tempdir(schema)?;
let mut writer = index.writer_for_tests()?;
writer.commit()?;
let mut index = Index::create_from_tempdir(schema).unwrap();
let mut writer = index.writer_for_tests().unwrap();
writer.commit().unwrap();
let reader = index
.reader_builder()
.reload_policy(ReloadPolicy::Manual)
.try_into()?;
.try_into()
.unwrap();
assert_eq!(reader.searcher().num_docs(), 0);
writer.add_document(doc!(field=>1u64));
let (sender, receiver) = crossbeam::channel::unbounded();
let _handle = index.directory_mut().watch(WatchCallback::new(move || {
let _handle = index.directory_mut().watch(Box::new(move || {
let _ = sender.send(());
}));
writer.commit()?;
writer.commit().unwrap();
assert!(receiver.recv().is_ok());
assert_eq!(reader.searcher().num_docs(), 0);
reader.reload()?;
reader.reload().unwrap();
assert_eq!(reader.searcher().num_docs(), 1);
Ok(())
}
#[test]
@@ -555,11 +554,9 @@ mod tests {
fn test_index_on_commit_reload_policy_aux(field: Field, index: &Index, reader: &IndexReader) {
let mut reader_index = reader.index();
let (sender, receiver) = crossbeam::channel::unbounded();
let _watch_handle = reader_index
.directory_mut()
.watch(WatchCallback::new(move || {
let _ = sender.send(());
}));
let _watch_handle = reader_index.directory_mut().watch(Box::new(move || {
let _ = sender.send(());
}));
let mut writer = index.writer_for_tests().unwrap();
assert_eq!(reader.searcher().num_docs(), 0);
writer.add_document(doc!(field=>1u64));
@@ -598,7 +595,7 @@ mod tests {
writer.add_document(doc!(field => i));
}
let (sender, receiver) = crossbeam::channel::unbounded();
let _handle = directory.watch(WatchCallback::new(move || {
let _handle = directory.watch(Box::new(move || {
let _ = sender.send(());
}));
writer.commit().unwrap();

View File

@@ -301,7 +301,7 @@ mod tests {
let json = serde_json::ser::to_string(&index_metas).expect("serialization failed");
assert_eq!(
json,
r#"{"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","tokenizer":"default"},"stored":false}}],"opstamp":0}"#
r#"{"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","tokenizer":"default","fieldnorms":true},"stored":false}}],"opstamp":0}"#
);
}
}

View File

@@ -66,7 +66,7 @@ impl InvertedIndexReader {
}
/// Returns the term info associated with the term.
pub fn get_term_info(&self, term: &Term) -> io::Result<Option<TermInfo>> {
pub fn get_term_info(&self, term: &Term) -> Option<TermInfo> {
self.termdict.get(term.value_bytes())
}
@@ -90,9 +90,9 @@ impl InvertedIndexReader {
term_info: &TermInfo,
block_postings: &mut BlockSegmentPostings,
) -> io::Result<()> {
let start_offset = term_info.postings_start_offset as usize;
let stop_offset = term_info.postings_stop_offset as usize;
let postings_slice = self.postings_file_slice.slice(start_offset, stop_offset);
let postings_slice = self
.postings_file_slice
.slice_from(term_info.postings_offset as usize);
block_postings.reset(term_info.doc_freq, postings_slice.read_bytes()?);
Ok(())
}
@@ -106,9 +106,10 @@ impl InvertedIndexReader {
term: &Term,
option: IndexRecordOption,
) -> io::Result<Option<BlockSegmentPostings>> {
self.get_term_info(term)?
Ok(self
.get_term_info(term)
.map(move |term_info| self.read_block_postings_from_terminfo(&term_info, option))
.transpose()
.transpose()?)
}
/// Returns a block postings given a `term_info`.
@@ -120,10 +121,8 @@ impl InvertedIndexReader {
term_info: &TermInfo,
requested_option: IndexRecordOption,
) -> io::Result<BlockSegmentPostings> {
let postings_data = self.postings_file_slice.slice(
term_info.postings_start_offset as usize,
term_info.postings_stop_offset as usize,
);
let offset = term_info.postings_offset as usize;
let postings_data = self.postings_file_slice.slice_from(offset);
BlockSegmentPostings::open(
term_info.doc_freq,
postings_data,
@@ -180,7 +179,7 @@ impl InvertedIndexReader {
term: &Term,
option: IndexRecordOption,
) -> io::Result<Option<SegmentPostings>> {
self.get_term_info(term)?
self.get_term_info(term)
.map(move |term_info| self.read_postings_from_terminfo(&term_info, option))
.transpose()
}
@@ -190,7 +189,7 @@ impl InvertedIndexReader {
term: &Term,
option: IndexRecordOption,
) -> io::Result<Option<SegmentPostings>> {
self.get_term_info(term)?
self.get_term_info(term)
.map(|term_info| self.read_postings_from_terminfo(&term_info, option))
.transpose()
}
@@ -198,7 +197,7 @@ impl InvertedIndexReader {
/// Returns the number of documents containing the term.
pub fn doc_freq(&self, term: &Term) -> io::Result<u32> {
Ok(self
.get_term_info(term)?
.get_term_info(term)
.map(|term_info| term_info.doc_freq)
.unwrap_or(0u32))
}

View File

@@ -1,16 +1,17 @@
use crate::collector::Collector;
use crate::core::Executor;
use crate::core::InvertedIndexReader;
use crate::core::SegmentReader;
use crate::query::Query;
use crate::schema::Document;
use crate::schema::Schema;
use crate::schema::Term;
use crate::schema::{Field, Term};
use crate::space_usage::SearcherSpaceUsage;
use crate::store::StoreReader;
use crate::termdict::TermMerger;
use crate::DocAddress;
use crate::Index;
use std::sync::Arc;
use std::{fmt, io};
/// Holds a list of `SegmentReader`s ready for search.
@@ -147,6 +148,16 @@ impl Searcher {
collector.merge_fruits(fruits)
}
/// Return the field searcher associated to a `Field`.
pub fn field(&self, field: Field) -> crate::Result<FieldSearcher> {
let inv_index_readers: Vec<Arc<InvertedIndexReader>> = self
.segment_readers
.iter()
.map(|segment_reader| segment_reader.inverted_index(field))
.collect::<crate::Result<Vec<_>>>()?;
Ok(FieldSearcher::new(inv_index_readers))
}
/// Summarize total space usage of this searcher.
pub fn space_usage(&self) -> io::Result<SearcherSpaceUsage> {
let mut space_usage = SearcherSpaceUsage::new();
@@ -157,6 +168,27 @@ impl Searcher {
}
}
pub struct FieldSearcher {
inv_index_readers: Vec<Arc<InvertedIndexReader>>,
}
impl FieldSearcher {
fn new(inv_index_readers: Vec<Arc<InvertedIndexReader>>) -> FieldSearcher {
FieldSearcher { inv_index_readers }
}
/// Returns a Stream over all of the sorted unique terms of
/// for the given field.
pub fn terms(&self) -> TermMerger<'_> {
let term_streamers: Vec<_> = self
.inv_index_readers
.iter()
.map(|inverted_index| inverted_index.terms().stream())
.collect();
TermMerger::new(term_streamers)
}
}
impl fmt::Debug for Searcher {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let segment_ids = self

View File

@@ -1,8 +1,8 @@
use crate::directory::directory_lock::Lock;
use crate::directory::error::LockError;
use crate::directory::error::{DeleteError, OpenReadError, OpenWriteError};
use crate::directory::WatchCallback;
use crate::directory::WatchHandle;
use crate::directory::{FileHandle, WatchCallback};
use crate::directory::{FileSlice, WritePtr};
use std::fmt;
use std::io;
@@ -108,13 +108,10 @@ fn retry_policy(is_blocking: bool) -> RetryPolicy {
/// should be your default choice.
/// - The [`RAMDirectory`](struct.RAMDirectory.html), which
/// should be used mostly for tests.
///
pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
/// Opens a file and returns a boxed `FileHandle`.
/// Opens a virtual file for read.
///
/// Users of `Directory` should typically call `Directory::open_read(...)`,
/// while `Directory` implementor should implement `get_file_handle()`.
fn get_file_handle(&self, path: &Path) -> Result<Box<dyn FileHandle>, OpenReadError>;
/// Once a virtual file is open, its data may not
/// change.
///
@@ -122,10 +119,7 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
/// have no effect on the returned `FileSlice` object.
///
/// You should only use this to read files create with [Directory::open_write].
fn open_read(&self, path: &Path) -> Result<FileSlice, OpenReadError> {
let file_handle = self.get_file_handle(path)?;
Ok(FileSlice::new(file_handle))
}
fn open_read(&self, path: &Path) -> Result<FileSlice, OpenReadError>;
/// Removes a file
///
@@ -137,7 +131,7 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
fn delete(&self, path: &Path) -> Result<(), DeleteError>;
/// Returns true iff the file exists
fn exists(&self, path: &Path) -> Result<bool, OpenReadError>;
fn exists(&self, path: &Path) -> bool;
/// Opens a writer for the *virtual file* associated with
/// a Path.

View File

@@ -2,11 +2,10 @@ use stable_deref_trait::StableDeref;
use crate::common::HasLen;
use crate::directory::OwnedBytes;
use std::sync::{Arc, Weak};
use std::sync::Arc;
use std::{io, ops::Deref};
pub type ArcBytes = Arc<dyn Deref<Target = [u8]> + Send + Sync + 'static>;
pub type WeakArcBytes = Weak<dyn Deref<Target = [u8]> + Send + Sync + 'static>;
pub type BoxedData = Box<dyn Deref<Target = [u8]> + Send + Sync + 'static>;
/// Objects that represents files sections in tantivy.
///
@@ -41,7 +40,7 @@ where
B: StableDeref + Deref<Target = [u8]> + 'static + Send + Sync,
{
fn from(bytes: B) -> FileSlice {
FileSlice::new(Box::new(OwnedBytes::new(bytes)))
FileSlice::new(OwnedBytes::new(bytes))
}
}
@@ -51,25 +50,22 @@ where
///
#[derive(Clone)]
pub struct FileSlice {
data: Arc<dyn FileHandle>,
data: Arc<Box<dyn FileHandle>>,
start: usize,
stop: usize,
}
impl FileSlice {
/// Wraps a FileHandle.
pub fn new(file_handle: Box<dyn FileHandle>) -> Self {
let num_bytes = file_handle.len();
FileSlice::new_with_num_bytes(file_handle, num_bytes)
}
/// Wraps a FileHandle.
#[doc(hidden)]
pub fn new_with_num_bytes(file_handle: Box<dyn FileHandle>, num_bytes: usize) -> Self {
pub fn new<D>(data: D) -> Self
where
D: FileHandle,
{
let len = data.len();
FileSlice {
data: Arc::from(file_handle),
data: Arc::new(Box::new(data)),
start: 0,
stop: num_bytes,
stop: len,
}
}
@@ -150,12 +146,6 @@ impl FileSlice {
}
}
impl FileHandle for FileSlice {
fn read_bytes(&self, from: usize, to: usize) -> io::Result<OwnedBytes> {
self.read_bytes_slice(from, to)
}
}
impl HasLen for FileSlice {
fn len(&self) -> usize {
self.stop - self.start
@@ -170,7 +160,7 @@ mod tests {
#[test]
fn test_file_slice() -> io::Result<()> {
let file_slice = FileSlice::new(Box::new(b"abcdef".as_ref()));
let file_slice = FileSlice::new(b"abcdef".as_ref());
assert_eq!(file_slice.len(), 6);
assert_eq!(file_slice.slice_from(2).read_bytes()?.as_slice(), b"cdef");
assert_eq!(file_slice.slice_to(2).read_bytes()?.as_slice(), b"ab");
@@ -214,7 +204,7 @@ mod tests {
#[test]
fn test_slice_simple_read() -> io::Result<()> {
let slice = FileSlice::new(Box::new(&b"abcdef"[..]));
let slice = FileSlice::new(&b"abcdef"[..]);
assert_eq!(slice.len(), 6);
assert_eq!(slice.read_bytes()?.as_ref(), b"abcdef");
assert_eq!(slice.slice(1, 4).read_bytes()?.as_ref(), b"bcd");
@@ -223,7 +213,7 @@ mod tests {
#[test]
fn test_slice_read_slice() -> io::Result<()> {
let slice_deref = FileSlice::new(Box::new(&b"abcdef"[..]));
let slice_deref = FileSlice::new(&b"abcdef"[..]);
assert_eq!(slice_deref.read_bytes_slice(1, 4)?.as_ref(), b"bcd");
Ok(())
}
@@ -231,14 +221,14 @@ mod tests {
#[test]
#[should_panic(expected = "assertion failed: from <= to")]
fn test_slice_read_slice_invalid_range() {
let slice_deref = FileSlice::new(Box::new(&b"abcdef"[..]));
let slice_deref = FileSlice::new(&b"abcdef"[..]);
assert_eq!(slice_deref.read_bytes_slice(1, 0).unwrap().as_ref(), b"bcd");
}
#[test]
#[should_panic(expected = "`to` exceeds the fileslice length")]
fn test_slice_read_slice_invalid_range_exceeds() {
let slice_deref = FileSlice::new(Box::new(&b"abcdef"[..]));
let slice_deref = FileSlice::new(&b"abcdef"[..]);
assert_eq!(
slice_deref.read_bytes_slice(0, 10).unwrap().as_ref(),
b"bcd"

View File

@@ -1,178 +0,0 @@
use crate::directory::{WatchCallback, WatchCallbackList, WatchHandle};
use crc32fast::Hasher;
use std::fs;
use std::io;
use std::io::BufRead;
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
pub const POLLING_INTERVAL: Duration = Duration::from_millis(if cfg!(test) { 1 } else { 500 });
// Watches a file and executes registered callbacks when the file is modified.
pub struct FileWatcher {
path: Arc<PathBuf>,
callbacks: Arc<WatchCallbackList>,
state: Arc<AtomicUsize>, // 0: new, 1: runnable, 2: terminated
}
impl FileWatcher {
pub fn new(path: &PathBuf) -> FileWatcher {
FileWatcher {
path: Arc::new(path.clone()),
callbacks: Default::default(),
state: Default::default(),
}
}
pub fn spawn(&self) {
if self.state.compare_and_swap(0, 1, Ordering::SeqCst) > 0 {
return;
}
let path = self.path.clone();
let callbacks = self.callbacks.clone();
let state = self.state.clone();
thread::Builder::new()
.name("thread-tantivy-meta-file-watcher".to_string())
.spawn(move || {
let mut current_checksum = None;
while state.load(Ordering::SeqCst) == 1 {
if let Ok(checksum) = FileWatcher::compute_checksum(&path) {
// `None.unwrap_or_else(|| !checksum) != checksum` evaluates to `true`
if current_checksum.unwrap_or_else(|| !checksum) != checksum {
info!("Meta file {:?} was modified", path);
current_checksum = Some(checksum);
futures::executor::block_on(callbacks.broadcast());
}
}
thread::sleep(POLLING_INTERVAL);
}
})
.expect("Failed to spawn meta file watcher thread");
}
pub fn watch(&self, callback: WatchCallback) -> WatchHandle {
let handle = self.callbacks.subscribe(callback);
self.spawn();
handle
}
fn compute_checksum(path: &PathBuf) -> Result<u32, io::Error> {
let reader = match fs::File::open(path) {
Ok(f) => io::BufReader::new(f),
Err(e) => {
warn!("Failed to open meta file {:?}: {:?}", path, e);
return Err(e);
}
};
let mut hasher = Hasher::new();
for line in reader.lines() {
hasher.update(line?.as_bytes())
}
Ok(hasher.finalize())
}
}
impl Drop for FileWatcher {
fn drop(&mut self) {
self.state.store(2, Ordering::SeqCst);
}
}
#[cfg(test)]
mod tests {
use std::mem;
use crate::directory::mmap_directory::atomic_write;
use super::*;
#[test]
fn test_file_watcher_drop_watcher() -> crate::Result<()> {
let tmp_dir = tempfile::TempDir::new()?;
let tmp_file = tmp_dir.path().join("watched.txt");
let counter: Arc<AtomicUsize> = Default::default();
let (tx, rx) = crossbeam::channel::unbounded();
let timeout = Duration::from_millis(100);
let watcher = FileWatcher::new(&tmp_file);
let state = watcher.state.clone();
assert_eq!(state.load(Ordering::SeqCst), 0);
let counter_clone = counter.clone();
let _handle = watcher.watch(WatchCallback::new(move || {
let val = counter_clone.fetch_add(1, Ordering::SeqCst);
tx.send(val + 1).unwrap();
}));
assert_eq!(counter.load(Ordering::SeqCst), 0);
assert_eq!(state.load(Ordering::SeqCst), 1);
atomic_write(&tmp_file, b"foo")?;
assert_eq!(rx.recv_timeout(timeout), Ok(1));
atomic_write(&tmp_file, b"foo")?;
assert!(rx.recv_timeout(timeout).is_err());
atomic_write(&tmp_file, b"bar")?;
assert_eq!(rx.recv_timeout(timeout), Ok(2));
mem::drop(watcher);
atomic_write(&tmp_file, b"qux")?;
thread::sleep(Duration::from_millis(10));
assert_eq!(counter.load(Ordering::SeqCst), 2);
assert_eq!(state.load(Ordering::SeqCst), 2);
Ok(())
}
#[test]
fn test_file_watcher_drop_handle() -> crate::Result<()> {
let tmp_dir = tempfile::TempDir::new()?;
let tmp_file = tmp_dir.path().join("watched.txt");
let counter: Arc<AtomicUsize> = Default::default();
let (tx, rx) = crossbeam::channel::unbounded();
let timeout = Duration::from_millis(100);
let watcher = FileWatcher::new(&tmp_file);
let state = watcher.state.clone();
assert_eq!(state.load(Ordering::SeqCst), 0);
let counter_clone = counter.clone();
let handle = watcher.watch(WatchCallback::new(move || {
let val = counter_clone.fetch_add(1, Ordering::SeqCst);
tx.send(val + 1).unwrap();
}));
assert_eq!(counter.load(Ordering::SeqCst), 0);
assert_eq!(state.load(Ordering::SeqCst), 1);
atomic_write(&tmp_file, b"foo")?;
assert_eq!(rx.recv_timeout(timeout), Ok(1));
mem::drop(handle);
atomic_write(&tmp_file, b"qux")?;
assert_eq!(counter.load(Ordering::SeqCst), 1);
assert_eq!(state.load(Ordering::SeqCst), 1);
Ok(())
}
}

View File

@@ -1,10 +1,10 @@
use crate::core::{MANAGED_FILEPATH, META_FILEPATH};
use crate::directory::error::{DeleteError, LockError, OpenReadError, OpenWriteError};
use crate::directory::footer::{Footer, FooterProxy};
use crate::directory::DirectoryLock;
use crate::directory::GarbageCollectionResult;
use crate::directory::Lock;
use crate::directory::META_LOCK;
use crate::directory::{DirectoryLock, FileHandle};
use crate::directory::{FileSlice, WritePtr};
use crate::directory::{WatchCallback, WatchHandle};
use crate::error::DataCorruption;
@@ -274,11 +274,6 @@ impl ManagedDirectory {
}
impl Directory for ManagedDirectory {
fn get_file_handle(&self, path: &Path) -> Result<Box<dyn FileHandle>, OpenReadError> {
let file_slice = self.open_read(path)?;
Ok(Box::new(file_slice))
}
fn open_read(&self, path: &Path) -> result::Result<FileSlice, OpenReadError> {
let file_slice = self.directory.open_read(path)?;
let (footer, reader) = Footer::extract_footer(file_slice)
@@ -312,7 +307,7 @@ impl Directory for ManagedDirectory {
self.directory.delete(path)
}
fn exists(&self, path: &Path) -> Result<bool, OpenReadError> {
fn exists(&self, path: &Path) -> bool {
self.directory.exists(path)
}
@@ -360,22 +355,22 @@ mod tests_mmap_specific {
managed_directory
.atomic_write(test_path2, &[0u8, 1u8])
.unwrap();
assert!(managed_directory.exists(test_path1).unwrap());
assert!(managed_directory.exists(test_path2).unwrap());
assert!(managed_directory.exists(test_path1));
assert!(managed_directory.exists(test_path2));
let living_files: HashSet<PathBuf> = [test_path1.to_owned()].iter().cloned().collect();
assert!(managed_directory.garbage_collect(|| living_files).is_ok());
assert!(managed_directory.exists(test_path1).unwrap());
assert!(!managed_directory.exists(test_path2).unwrap());
assert!(managed_directory.exists(test_path1));
assert!(!managed_directory.exists(test_path2));
}
{
let mmap_directory = MmapDirectory::open(&tempdir_path).unwrap();
let mut managed_directory = ManagedDirectory::wrap(mmap_directory).unwrap();
assert!(managed_directory.exists(test_path1).unwrap());
assert!(!managed_directory.exists(test_path2).unwrap());
assert!(managed_directory.exists(test_path1));
assert!(!managed_directory.exists(test_path2));
let living_files: HashSet<PathBuf> = HashSet::new();
assert!(managed_directory.garbage_collect(|| living_files).is_ok());
assert!(!managed_directory.exists(test_path1).unwrap());
assert!(!managed_directory.exists(test_path2).unwrap());
assert!(!managed_directory.exists(test_path1));
assert!(!managed_directory.exists(test_path2));
}
}
@@ -392,7 +387,7 @@ mod tests_mmap_specific {
let mut write = managed_directory.open_write(test_path1).unwrap();
write.write_all(&[0u8, 1u8]).unwrap();
write.terminate().unwrap();
assert!(managed_directory.exists(test_path1).unwrap());
assert!(managed_directory.exists(test_path1));
let _mmap_read = managed_directory.open_read(test_path1).unwrap();
assert!(managed_directory
@@ -400,15 +395,15 @@ mod tests_mmap_specific {
.is_ok());
if cfg!(target_os = "windows") {
// On Windows, gc should try and fail the file as it is mmapped.
assert!(managed_directory.exists(test_path1).unwrap());
assert!(managed_directory.exists(test_path1));
// unmap should happen here.
drop(_mmap_read);
// The file should still be in the list of managed file and
// eventually be deleted once mmap is released.
assert!(managed_directory.garbage_collect(|| living_files).is_ok());
assert!(!managed_directory.exists(test_path1).unwrap());
assert!(!managed_directory.exists(test_path1));
} else {
assert!(!managed_directory.exists(test_path1).unwrap());
assert!(!managed_directory.exists(test_path1));
}
}

View File

@@ -1,17 +1,21 @@
use crate::core::META_FILEPATH;
use crate::directory::error::LockError;
use crate::directory::error::{DeleteError, OpenDirectoryError, OpenReadError, OpenWriteError};
use crate::directory::file_watcher::FileWatcher;
use crate::directory::AntiCallToken;
use crate::directory::BoxedData;
use crate::directory::Directory;
use crate::directory::DirectoryLock;
use crate::directory::FileSlice;
use crate::directory::Lock;
use crate::directory::WatchCallback;
use crate::directory::WatchCallbackList;
use crate::directory::WatchHandle;
use crate::directory::{AntiCallToken, FileHandle, OwnedBytes};
use crate::directory::{ArcBytes, WeakArcBytes};
use crate::directory::{TerminatingWrite, WritePtr};
use fs2::FileExt;
use memmap::Mmap;
use notify::RawEvent;
use notify::RecursiveMode;
use notify::Watcher;
use serde::{Deserialize, Serialize};
use stable_deref_trait::StableDeref;
use std::convert::From;
@@ -22,8 +26,12 @@ use std::io::{self, Seek, SeekFrom};
use std::io::{BufWriter, Read, Write};
use std::path::{Path, PathBuf};
use std::result;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::RwLock;
use std::sync::Weak;
use std::thread;
use std::{collections::HashMap, ops::Deref};
use tempfile::TempDir;
@@ -76,7 +84,7 @@ pub struct CacheInfo {
struct MmapCache {
counters: CacheCounters,
cache: HashMap<PathBuf, WeakArcBytes>,
cache: HashMap<PathBuf, Weak<BoxedData>>,
}
impl Default for MmapCache {
@@ -110,7 +118,7 @@ impl MmapCache {
}
// Returns None if the file exists but as a len of 0 (and hence is not mmappable).
fn get_mmap(&mut self, full_path: &Path) -> Result<Option<ArcBytes>, OpenReadError> {
fn get_mmap(&mut self, full_path: &Path) -> Result<Option<Arc<BoxedData>>, OpenReadError> {
if let Some(mmap_weak) = self.cache.get(full_path) {
if let Some(mmap_arc) = mmap_weak.upgrade() {
self.counters.hit += 1;
@@ -121,7 +129,7 @@ impl MmapCache {
self.counters.miss += 1;
let mmap_opt = open_mmap(full_path)?;
Ok(mmap_opt.map(|mmap| {
let mmap_arc: ArcBytes = Arc::new(mmap);
let mmap_arc: Arc<BoxedData> = Arc::new(Box::new(mmap));
let mmap_weak = Arc::downgrade(&mmap_arc);
self.cache.insert(full_path.to_owned(), mmap_weak);
mmap_arc
@@ -129,6 +137,67 @@ impl MmapCache {
}
}
struct WatcherWrapper {
_watcher: Mutex<notify::RecommendedWatcher>,
watcher_router: Arc<WatchCallbackList>,
}
impl WatcherWrapper {
pub fn new(path: &Path) -> Result<Self, OpenDirectoryError> {
let (tx, watcher_recv): (Sender<RawEvent>, Receiver<RawEvent>) = channel();
// We need to initialize the
let watcher = notify::raw_watcher(tx)
.and_then(|mut watcher| {
watcher.watch(path, RecursiveMode::Recursive)?;
Ok(watcher)
})
.map_err(|err| match err {
notify::Error::PathNotFound => OpenDirectoryError::DoesNotExist(path.to_owned()),
_ => {
panic!("Unknown error while starting watching directory {:?}", path);
}
})?;
let watcher_router: Arc<WatchCallbackList> = Default::default();
let watcher_router_clone = watcher_router.clone();
thread::Builder::new()
.name("meta-file-watch-thread".to_string())
.spawn(move || {
loop {
match watcher_recv.recv().map(|evt| evt.path) {
Ok(Some(changed_path)) => {
// ... Actually subject to false positive.
// We might want to be more accurate than this at one point.
if let Some(filename) = changed_path.file_name() {
if filename == *META_FILEPATH {
let _ = watcher_router_clone.broadcast();
}
}
}
Ok(None) => {
// not an event we are interested in.
}
Err(_e) => {
// the watch send channel was dropped
break;
}
}
}
})
.map_err(|io_error| OpenDirectoryError::IoError {
io_error,
directory_path: path.to_path_buf(),
})?;
Ok(WatcherWrapper {
_watcher: Mutex::new(watcher),
watcher_router,
})
}
pub fn watch(&mut self, watch_callback: WatchCallback) -> WatchHandle {
self.watcher_router.subscribe(watch_callback)
}
}
/// Directory storing data in files, read via mmap.
///
/// The Mmap object are cached to limit the
@@ -150,21 +219,40 @@ struct MmapDirectoryInner {
root_path: PathBuf,
mmap_cache: RwLock<MmapCache>,
_temp_directory: Option<TempDir>,
watcher: FileWatcher,
watcher: RwLock<Option<WatcherWrapper>>,
}
impl MmapDirectoryInner {
fn new(root_path: PathBuf, temp_directory: Option<TempDir>) -> MmapDirectoryInner {
MmapDirectoryInner {
root_path,
mmap_cache: Default::default(),
_temp_directory: temp_directory,
watcher: FileWatcher::new(&root_path.join(*META_FILEPATH)),
root_path,
watcher: RwLock::new(None),
}
}
fn watch(&self, callback: WatchCallback) -> crate::Result<WatchHandle> {
Ok(self.watcher.watch(callback))
fn watch(&self, watch_callback: WatchCallback) -> crate::Result<WatchHandle> {
// a lot of juggling here, to ensure we don't do anything that panics
// while the rwlock is held. That way we ensure that the rwlock cannot
// be poisoned.
//
// The downside is that we might create a watch wrapper that is not useful.
let need_initialization = self.watcher.read().unwrap().is_none();
if need_initialization {
let watch_wrapper = WatcherWrapper::new(&self.root_path)?;
let mut watch_wlock = self.watcher.write().unwrap();
// the watcher could have been initialized when we released the lock, and
// we do not want to lose the watched files that were set.
if watch_wlock.is_none() {
*watch_wlock = Some(watch_wrapper);
}
}
if let Some(watch_wrapper) = self.watcher.write().unwrap().as_mut() {
Ok(watch_wrapper.watch(watch_callback))
} else {
unreachable!("At this point, watch wrapper is supposed to be initialized");
}
}
}
@@ -314,7 +402,7 @@ impl TerminatingWrite for SafeFileWriter {
}
#[derive(Clone)]
struct MmapArc(Arc<dyn Deref<Target = [u8]> + Send + Sync>);
struct MmapArc(Arc<Box<dyn Deref<Target = [u8]> + Send + Sync>>);
impl Deref for MmapArc {
type Target = [u8];
@@ -325,26 +413,8 @@ impl Deref for MmapArc {
}
unsafe impl StableDeref for MmapArc {}
/// Writes a file in an atomic manner.
pub(crate) fn atomic_write(path: &Path, content: &[u8]) -> io::Result<()> {
// We create the temporary file in the same directory as the target file.
// Indeed the canonical temp directory and the target file might sit in different
// filesystem, in which case the atomic write may actually not work.
let parent_path = path.parent().ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"Path {:?} does not have parent directory.",
)
})?;
let mut tempfile = tempfile::Builder::new().tempfile_in(&parent_path)?;
tempfile.write_all(content)?;
tempfile.flush()?;
tempfile.into_temp_path().persist(path)?;
Ok(())
}
impl Directory for MmapDirectory {
fn get_file_handle(&self, path: &Path) -> result::Result<Box<dyn FileHandle>, OpenReadError> {
fn open_read(&self, path: &Path) -> result::Result<FileSlice, OpenReadError> {
debug!("Open Read {:?}", path);
let full_path = self.resolve_path(path);
@@ -357,16 +427,11 @@ impl Directory for MmapDirectory {
let io_err = make_io_err(msg);
OpenReadError::wrap_io_error(io_err, path.to_path_buf())
})?;
let owned_bytes = mmap_cache
.get_mmap(&full_path)?
.map(|mmap_arc| {
let mmap_arc_obj = MmapArc(mmap_arc);
OwnedBytes::new(mmap_arc_obj)
})
.unwrap_or_else(OwnedBytes::empty);
Ok(Box::new(owned_bytes))
if let Some(mmap_arc) = mmap_cache.get_mmap(&full_path)? {
Ok(FileSlice::from(MmapArc(mmap_arc)))
} else {
Ok(FileSlice::empty())
}
}
/// Any entry associated to the path in the mmap will be
@@ -391,9 +456,9 @@ impl Directory for MmapDirectory {
}
}
fn exists(&self, path: &Path) -> Result<bool, OpenReadError> {
fn exists(&self, path: &Path) -> bool {
let full_path = self.resolve_path(path);
Ok(full_path.exists())
full_path.exists()
}
fn open_write(&self, path: &Path) -> Result<WritePtr, OpenWriteError> {
@@ -448,9 +513,12 @@ impl Directory for MmapDirectory {
fn atomic_write(&self, path: &Path, content: &[u8]) -> io::Result<()> {
debug!("Atomic Write {:?}", path);
let mut tempfile = tempfile::Builder::new().tempfile_in(&self.inner.root_path)?;
tempfile.write_all(content)?;
tempfile.flush()?;
let full_path = self.resolve_path(path);
atomic_write(&full_path, content)?;
self.sync_directory()
tempfile.into_temp_path().persist(full_path)?;
Ok(())
}
fn acquire_lock(&self, lock: &Lock) -> Result<DirectoryLock, LockError> {
@@ -489,6 +557,8 @@ mod tests {
use crate::Index;
use crate::ReloadPolicy;
use crate::{common::HasLen, indexer::LogMergePolicy};
use std::fs;
use std::sync::atomic::{AtomicUsize, Ordering};
#[test]
fn test_open_non_existent_path() {
@@ -577,6 +647,27 @@ mod tests {
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 0);
}
#[test]
fn test_watch_wrapper() {
let counter: Arc<AtomicUsize> = Default::default();
let counter_clone = counter.clone();
let tmp_dir = tempfile::TempDir::new().unwrap();
let tmp_dirpath = tmp_dir.path().to_owned();
let mut watch_wrapper = WatcherWrapper::new(&tmp_dirpath).unwrap();
let tmp_file = tmp_dirpath.join(*META_FILEPATH);
let _handle = watch_wrapper.watch(Box::new(move || {
counter_clone.fetch_add(1, Ordering::SeqCst);
}));
let (sender, receiver) = crossbeam::channel::unbounded();
let _handle2 = watch_wrapper.watch(Box::new(move || {
let _ = sender.send(());
}));
assert_eq!(counter.load(Ordering::SeqCst), 0);
fs::write(&tmp_file, b"whateverwilldo").unwrap();
assert!(receiver.recv().is_ok());
assert!(counter.load(Ordering::SeqCst) >= 1);
}
#[test]
fn test_mmap_released() {
let mmap_directory = MmapDirectory::create_from_tempdir().unwrap();

View File

@@ -10,7 +10,6 @@ mod mmap_directory;
mod directory;
mod directory_lock;
mod file_slice;
mod file_watcher;
mod footer;
mod managed_directory;
mod owned_bytes;
@@ -23,7 +22,7 @@ pub mod error;
pub use self::directory::DirectoryLock;
pub use self::directory::{Directory, DirectoryClone};
pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK};
pub(crate) use self::file_slice::{ArcBytes, WeakArcBytes};
pub(crate) use self::file_slice::BoxedData;
pub use self::file_slice::{FileHandle, FileSlice};
pub use self::owned_bytes::OwnedBytes;
pub use self::ram_directory::RAMDirectory;

View File

@@ -1,6 +1,5 @@
use crate::directory::FileHandle;
use stable_deref_trait::StableDeref;
use std::convert::TryInto;
use std::mem;
use std::ops::Deref;
use std::sync::Arc;
@@ -96,24 +95,6 @@ impl OwnedBytes {
pub fn advance(&mut self, advance_len: usize) {
self.data = &self.data[advance_len..]
}
/// Reads an `u8` from the `OwnedBytes` and advance by one byte.
pub fn read_u8(&mut self) -> u8 {
assert!(!self.is_empty());
let byte = self.as_slice()[0];
self.advance(1);
byte
}
/// Reads an `u64` encoded as little-endian from the `OwnedBytes` and advance by 8 bytes.
pub fn read_u64(&mut self) -> u64 {
assert!(self.len() > 7);
let octlet: [u8; 8] = self.as_slice()[..8].try_into().unwrap();
self.advance(8);
u64::from_le_bytes(octlet)
}
}
impl fmt::Debug for OwnedBytes {
@@ -249,22 +230,6 @@ mod tests {
Ok(())
}
#[test]
fn test_owned_bytes_read_u8() -> io::Result<()> {
let mut bytes = OwnedBytes::new(b"\xFF".as_ref());
assert_eq!(bytes.read_u8(), 255);
assert_eq!(bytes.len(), 0);
Ok(())
}
#[test]
fn test_owned_bytes_read_u64() -> io::Result<()> {
let mut bytes = OwnedBytes::new(b"\0\xFF\xFF\xFF\xFF\xFF\xFF\xFF".as_ref());
assert_eq!(bytes.read_u64(), u64::MAX - 255);
assert_eq!(bytes.len(), 0);
Ok(())
}
#[test]
fn test_owned_bytes_split() {
let bytes = OwnedBytes::new(b"abcdefghi".as_ref());

View File

@@ -12,8 +12,6 @@ use std::path::{Path, PathBuf};
use std::result;
use std::sync::{Arc, RwLock};
use super::FileHandle;
/// Writer associated with the `RAMDirectory`
///
/// The Writer just writes a buffer.
@@ -165,11 +163,6 @@ impl RAMDirectory {
}
impl Directory for RAMDirectory {
fn get_file_handle(&self, path: &Path) -> Result<Box<dyn FileHandle>, OpenReadError> {
let file_slice = self.open_read(path)?;
Ok(Box::new(file_slice))
}
fn open_read(&self, path: &Path) -> result::Result<FileSlice, OpenReadError> {
self.fs.read().unwrap().open_read(path)
}
@@ -184,15 +177,8 @@ impl Directory for RAMDirectory {
self.fs.write().unwrap().delete(path)
}
fn exists(&self, path: &Path) -> Result<bool, OpenReadError> {
Ok(self
.fs
.read()
.map_err(|e| OpenReadError::IOError {
io_error: io::Error::new(io::ErrorKind::Other, e.to_string()),
filepath: path.to_path_buf(),
})?
.exists(path))
fn exists(&self, path: &Path) -> bool {
self.fs.read().unwrap().exists(path)
}
fn open_write(&self, path: &Path) -> Result<WritePtr, OpenWriteError> {

View File

@@ -130,7 +130,7 @@ fn ram_directory_panics_if_flush_forgotten() {
fn test_simple(directory: &dyn Directory) -> crate::Result<()> {
let test_path: &'static Path = Path::new("some_path_for_test");
let mut write_file = directory.open_write(test_path)?;
assert!(directory.exists(test_path).unwrap());
assert!(directory.exists(test_path));
write_file.write_all(&[4])?;
write_file.write_all(&[3])?;
write_file.write_all(&[7, 3, 5])?;
@@ -139,14 +139,14 @@ fn test_simple(directory: &dyn Directory) -> crate::Result<()> {
assert_eq!(read_file.as_slice(), &[4u8, 3u8, 7u8, 3u8, 5u8]);
mem::drop(read_file);
assert!(directory.delete(test_path).is_ok());
assert!(!directory.exists(test_path).unwrap());
assert!(!directory.exists(test_path));
Ok(())
}
fn test_rewrite_forbidden(directory: &dyn Directory) -> crate::Result<()> {
let test_path: &'static Path = Path::new("some_path_for_test");
directory.open_write(test_path)?;
assert!(directory.exists(test_path).unwrap());
assert!(directory.exists(test_path));
assert!(directory.open_write(test_path).is_err());
assert!(directory.delete(test_path).is_ok());
Ok(())
@@ -157,7 +157,7 @@ fn test_write_create_the_file(directory: &dyn Directory) {
{
assert!(directory.open_read(test_path).is_err());
let _w = directory.open_write(test_path).unwrap();
assert!(directory.exists(test_path).unwrap());
assert!(directory.exists(test_path));
assert!(directory.open_read(test_path).is_ok());
assert!(directory.delete(test_path).is_ok());
}
@@ -190,33 +190,38 @@ fn test_directory_delete(directory: &dyn Directory) -> crate::Result<()> {
}
fn test_watch(directory: &dyn Directory) {
let num_progress: Arc<AtomicUsize> = Default::default();
let counter: Arc<AtomicUsize> = Default::default();
let (tx, rx) = crossbeam::channel::unbounded();
let timeout = Duration::from_millis(500);
let handle = directory
.watch(WatchCallback::new(move || {
let val = counter.fetch_add(1, SeqCst);
tx.send(val + 1).unwrap();
let counter_clone = counter.clone();
let (sender, receiver) = crossbeam::channel::unbounded();
let watch_callback = Box::new(move || {
counter_clone.fetch_add(1, SeqCst);
});
// This callback is used to synchronize watching in our unit test.
// We bind it to a variable because the callback is removed when that
// handle is dropped.
let watch_handle = directory.watch(watch_callback).unwrap();
let _progress_listener = directory
.watch(Box::new(move || {
let val = num_progress.fetch_add(1, SeqCst);
let _ = sender.send(val);
}))
.unwrap();
for i in 0..10 {
assert!(i <= counter.load(SeqCst));
assert!(directory
.atomic_write(Path::new("meta.json"), b"random_test_data_2")
.is_ok());
assert_eq!(receiver.recv_timeout(Duration::from_millis(500)), Ok(i));
assert!(i + 1 <= counter.load(SeqCst)); // notify can trigger more than once.
}
mem::drop(watch_handle);
assert!(directory
.atomic_write(Path::new("meta.json"), b"foo")
.atomic_write(Path::new("meta.json"), b"random_test_data")
.is_ok());
assert_eq!(rx.recv_timeout(timeout), Ok(1));
assert!(directory
.atomic_write(Path::new("meta.json"), b"bar")
.is_ok());
assert_eq!(rx.recv_timeout(timeout), Ok(2));
mem::drop(handle);
assert!(directory
.atomic_write(Path::new("meta.json"), b"qux")
.is_ok());
assert!(rx.recv_timeout(timeout).is_err());
assert!(receiver.recv_timeout(Duration::from_millis(500)).is_ok());
assert!(10 <= counter.load(SeqCst));
}
fn test_lock_non_blocking(directory: &dyn Directory) {

View File

@@ -4,20 +4,8 @@ use std::sync::Arc;
use std::sync::RwLock;
use std::sync::Weak;
/// Cloneable wrapper for callbacks registered when watching files of a `Directory`.
#[derive(Clone)]
pub struct WatchCallback(Arc<dyn Fn() + Sync + Send>);
impl WatchCallback {
/// Wraps a `Fn()` to create a WatchCallback.
pub fn new<F: Fn() + Sync + Send + 'static>(op: F) -> Self {
WatchCallback(Arc::new(op))
}
fn call(&self) {
self.0()
}
}
/// Type alias for callbacks registered when watching files of a `Directory`.
pub type WatchCallback = Box<dyn Fn() + Sync + Send>;
/// Helper struct to implement the watch method in `Directory` implementations.
///
@@ -46,7 +34,7 @@ impl WatchHandle {
///
/// This function is only useful when implementing a readonly directory.
pub fn empty() -> WatchHandle {
WatchHandle::new(Arc::new(WatchCallback::new(|| {})))
WatchHandle::new(Arc::new(Box::new(|| {})))
}
}
@@ -59,13 +47,13 @@ impl WatchCallbackList {
WatchHandle::new(watch_callback_arc)
}
fn list_callback(&self) -> Vec<WatchCallback> {
let mut callbacks: Vec<WatchCallback> = vec![];
fn list_callback(&self) -> Vec<Arc<WatchCallback>> {
let mut callbacks = vec![];
let mut router_wlock = self.router.write().unwrap();
let mut i = 0;
while i < router_wlock.len() {
if let Some(watch) = router_wlock[i].upgrade() {
callbacks.push(watch.as_ref().clone());
callbacks.push(watch);
i += 1;
} else {
router_wlock.swap_remove(i);
@@ -87,7 +75,7 @@ impl WatchCallbackList {
.name("watch-callbacks".to_string())
.spawn(move || {
for callback in callbacks {
callback.call();
callback();
}
let _ = sender.send(());
});
@@ -103,7 +91,7 @@ impl WatchCallbackList {
#[cfg(test)]
mod tests {
use crate::directory::{WatchCallback, WatchCallbackList};
use crate::directory::WatchCallbackList;
use futures::executor::block_on;
use std::mem;
use std::sync::atomic::{AtomicUsize, Ordering};
@@ -114,7 +102,7 @@ mod tests {
let watch_event_router = WatchCallbackList::default();
let counter: Arc<AtomicUsize> = Default::default();
let counter_clone = counter.clone();
let inc_callback = WatchCallback::new(move || {
let inc_callback = Box::new(move || {
counter_clone.fetch_add(1, Ordering::SeqCst);
});
block_on(watch_event_router.broadcast());
@@ -142,7 +130,7 @@ mod tests {
let counter: Arc<AtomicUsize> = Default::default();
let inc_callback = |inc: usize| {
let counter_clone = counter.clone();
WatchCallback::new(move || {
Box::new(move || {
counter_clone.fetch_add(inc, Ordering::SeqCst);
})
};
@@ -170,7 +158,7 @@ mod tests {
let watch_event_router = WatchCallbackList::default();
let counter: Arc<AtomicUsize> = Default::default();
let counter_clone = counter.clone();
let inc_callback = WatchCallback::new(move || {
let inc_callback = Box::new(move || {
counter_clone.fetch_add(1, Ordering::SeqCst);
});
let handle_a = watch_event_router.subscribe(inc_callback);

View File

@@ -10,7 +10,7 @@ use std::borrow::BorrowMut;
pub const TERMINATED: DocId = std::i32::MAX as u32;
/// Represents an iterable set of sorted doc ids.
pub trait DocSet: Send {
pub trait DocSet {
/// Goes to the next element.
///
/// The DocId of the next element is returned.
@@ -129,14 +129,6 @@ impl<'a> DocSet for &'a mut dyn DocSet {
fn size_hint(&self) -> u32 {
(**self).size_hint()
}
fn count(&mut self, delete_bitset: &DeleteBitSet) -> u32 {
(**self).count(delete_bitset)
}
fn count_including_deleted(&mut self) -> u32 {
(**self).count_including_deleted()
}
}
impl<TDocSet: DocSet + ?Sized> DocSet for Box<TDocSet> {

View File

@@ -86,7 +86,7 @@ mod tests {
let term = Term::from_field_bytes(field, b"lucene".as_ref());
let term_query = TermQuery::new(term, IndexRecordOption::Basic);
let term_weight = term_query.specialized_weight(&searcher, true)?;
let term_scorer = term_weight.specialized_scorer(searcher.segment_reader(0), 1.0)?;
let term_scorer = term_weight.specialized_scorer(searcher.segment_reader(0), 1.0f32)?;
assert_eq!(term_scorer.doc(), 0u32);
Ok(())
}
@@ -98,9 +98,9 @@ mod tests {
let field = searcher.schema().get_field("string_bytes").unwrap();
let term = Term::from_field_bytes(field, b"lucene".as_ref());
let term_query = TermQuery::new(term, IndexRecordOption::Basic);
let term_weight_err = term_query.specialized_weight(&searcher, false);
let term_weight_res = term_query.specialized_weight(&searcher, false);
assert!(matches!(
term_weight_err,
term_weight_res,
Err(crate::TantivyError::SchemaError(_))
));
Ok(())

View File

@@ -1,5 +1,4 @@
use super::MultiValueIntFastFieldReader;
use crate::error::DataCorruption;
use crate::schema::Facet;
use crate::termdict::TermDictionary;
use crate::termdict::TermOrdinal;
@@ -63,13 +62,12 @@ impl FacetReader {
&mut self,
facet_ord: TermOrdinal,
output: &mut Facet,
) -> crate::Result<()> {
) -> Result<(), str::Utf8Error> {
let found_term = self
.term_dict
.ord_to_term(facet_ord as u64, &mut self.buffer)?;
.ord_to_term(facet_ord as u64, &mut self.buffer);
assert!(found_term, "Term ordinal {} no found.", facet_ord);
let facet_str = str::from_utf8(&self.buffer[..])
.map_err(|utf8_err| DataCorruption::comment_only(utf8_err.to_string()))?;
let facet_str = str::from_utf8(&self.buffer[..])?;
output.set_facet_str(facet_str);
Ok(())
}

View File

@@ -629,7 +629,7 @@ mod bench {
{
let fast_fields_composite = CompositeFile::open(&file).unwrap();
let data = fast_fields_composite.open_read(*FIELD).unwrap();
let fast_field_reader = FastFieldReader::<u64>::open(data).unwrap();
let fast_field_reader = FastFieldReader::<u64>::open(data);
b.iter(|| {
let n = test::black_box(7000u32);
@@ -663,7 +663,7 @@ mod bench {
{
let fast_fields_composite = CompositeFile::open(&file).unwrap();
let data = fast_fields_composite.open_read(*FIELD).unwrap();
let fast_field_reader = FastFieldReader::<u64>::open(data).unwrap();
let fast_field_reader = FastFieldReader::<u64>::open(data);
b.iter(|| {
let n = test::black_box(1000u32);

View File

@@ -49,7 +49,7 @@ impl FieldNormReaders {
///
/// This metric is important to compute the score of a
/// document : a document having a query word in one its short fields
/// (e.g. title) is likely to be more relevant than in one of its longer field
/// (e.g. title)is likely to be more relevant than in one of its longer field
/// (e.g. body).
///
/// tantivy encodes `fieldnorm` on one byte with some precision loss,
@@ -61,55 +61,30 @@ impl FieldNormReaders {
/// precompute computationally expensive functions of the fieldnorm
/// in a very short array.
#[derive(Clone)]
pub struct FieldNormReader(ReaderImplEnum);
impl From<ReaderImplEnum> for FieldNormReader {
fn from(reader_enum: ReaderImplEnum) -> FieldNormReader {
FieldNormReader(reader_enum)
}
}
#[derive(Clone)]
enum ReaderImplEnum {
FromData(OwnedBytes),
Const {
num_docs: u32,
fieldnorm_id: u8,
fieldnorm: u32,
},
pub enum FieldNormReader {
ConstFieldNorm { fieldnorm_id: u8, num_docs: u32 },
OneByte(OwnedBytes),
}
impl FieldNormReader {
/// Creates a `FieldNormReader` with a constant fieldnorm.
///
/// The fieldnorm will be subjected to compression as if it was coming
/// from an array-backed fieldnorm reader.
pub fn constant(num_docs: u32, fieldnorm: u32) -> FieldNormReader {
let fieldnorm_id = fieldnorm_to_id(fieldnorm);
let fieldnorm = id_to_fieldnorm(fieldnorm_id);
ReaderImplEnum::Const {
num_docs,
pub fn const_fieldnorm_id(fieldnorm_id: u8, num_docs: u32) -> FieldNormReader {
FieldNormReader::ConstFieldNorm {
fieldnorm_id,
fieldnorm,
num_docs,
}
.into()
}
/// Opens a field norm reader given its file.
pub fn open(fieldnorm_file: FileSlice) -> crate::Result<Self> {
let data = fieldnorm_file.read_bytes()?;
Ok(FieldNormReader::new(data))
}
fn new(data: OwnedBytes) -> Self {
ReaderImplEnum::FromData(data).into()
Ok(FieldNormReader::OneByte(data))
}
/// Returns the number of documents in this segment.
pub fn num_docs(&self) -> u32 {
match &self.0 {
ReaderImplEnum::FromData(data) => data.len() as u32,
ReaderImplEnum::Const { num_docs, .. } => *num_docs,
match self {
Self::ConstFieldNorm { num_docs, .. } => *num_docs,
FieldNormReader::OneByte(vals) => vals.len() as u32,
}
}
@@ -122,25 +97,19 @@ impl FieldNormReader {
///
/// The fieldnorm is effectively decoded from the
/// `fieldnorm_id` by doing a simple table lookup.
#[inline(always)]
pub fn fieldnorm(&self, doc_id: DocId) -> u32 {
match &self.0 {
ReaderImplEnum::FromData(data) => {
let fieldnorm_id = data.as_slice()[doc_id as usize];
id_to_fieldnorm(fieldnorm_id)
}
ReaderImplEnum::Const { fieldnorm, .. } => *fieldnorm,
}
let fieldnorm_id = self.fieldnorm_id(doc_id);
id_to_fieldnorm(fieldnorm_id)
}
/// Returns the `fieldnorm_id` associated to a document.
#[inline(always)]
pub fn fieldnorm_id(&self, doc_id: DocId) -> u8 {
match &self.0 {
ReaderImplEnum::FromData(data) => {
let fieldnorm_id = data.as_slice()[doc_id as usize];
fieldnorm_id
}
ReaderImplEnum::Const { fieldnorm_id, .. } => *fieldnorm_id,
match self {
FieldNormReader::ConstFieldNorm { fieldnorm_id, .. } => *fieldnorm_id,
FieldNormReader::OneByte(data) => data.as_slice()[doc_id as usize],
}
}
@@ -165,7 +134,7 @@ impl FieldNormReader {
.map(FieldNormReader::fieldnorm_to_id)
.collect::<Vec<u8>>();
let field_norms_data = OwnedBytes::new(field_norms_id);
FieldNormReader::new(field_norms_data)
FieldNormReader::OneByte(field_norms_data)
}
}
@@ -184,20 +153,4 @@ mod tests {
assert_eq!(fieldnorm_reader.fieldnorm(3), 4);
assert_eq!(fieldnorm_reader.fieldnorm(4), 983_064);
}
#[test]
fn test_const_fieldnorm_reader_small_fieldnorm_id() {
let fieldnorm_reader = FieldNormReader::constant(1_000_000u32, 10u32);
assert_eq!(fieldnorm_reader.num_docs(), 1_000_000u32);
assert_eq!(fieldnorm_reader.fieldnorm(0u32), 10u32);
assert_eq!(fieldnorm_reader.fieldnorm_id(0u32), 10u8);
}
#[test]
fn test_const_fieldnorm_reader_large_fieldnorm_id() {
let fieldnorm_reader = FieldNormReader::constant(1_000_000u32, 300u32);
assert_eq!(fieldnorm_reader.num_docs(), 1_000_000u32);
assert_eq!(fieldnorm_reader.fieldnorm(0u32), 280u32);
assert_eq!(fieldnorm_reader.fieldnorm_id(0u32), 72u8);
}
}

View File

@@ -4,7 +4,7 @@ use super::fieldnorm_to_id;
use super::FieldNormsSerializer;
use crate::schema::Field;
use crate::schema::Schema;
use std::{io, iter};
use std::io;
/// The `FieldNormsWriter` is in charge of tracking the fieldnorm byte
/// of each document for each field with field norms.
@@ -13,7 +13,7 @@ use std::{io, iter};
/// byte per document per field.
pub struct FieldNormsWriter {
fields: Vec<Field>,
fieldnorms_buffer: Vec<Vec<u8>>,
fieldnorms_buffer: Vec<Option<Vec<u8>>>,
}
impl FieldNormsWriter {
@@ -23,7 +23,7 @@ impl FieldNormsWriter {
schema
.fields()
.filter_map(|(field, field_entry)| {
if field_entry.is_indexed() {
if field_entry.has_fieldnorms() {
Some(field)
} else {
None
@@ -36,17 +36,14 @@ impl FieldNormsWriter {
/// specified in the schema.
pub fn for_schema(schema: &Schema) -> FieldNormsWriter {
let fields = FieldNormsWriter::fields_with_fieldnorm(schema);
let max_field = fields
.iter()
.map(Field::field_id)
.max()
.map(|max_field_id| max_field_id as usize + 1)
.unwrap_or(0);
let num_fields = schema.num_fields();
let mut fieldnorms_buffer: Vec<Option<Vec<u8>>> = vec![None; num_fields];
for field in &fields {
fieldnorms_buffer[field.field_id() as usize] = Some(Vec::new());
}
FieldNormsWriter {
fields,
fieldnorms_buffer: iter::repeat_with(Vec::new)
.take(max_field)
.collect::<Vec<_>>(),
fieldnorms_buffer,
}
}
@@ -55,8 +52,10 @@ impl FieldNormsWriter {
///
/// Will extend with 0-bytes for documents that have not been seen.
pub fn fill_up_to_max_doc(&mut self, max_doc: DocId) {
for field in self.fields.iter() {
self.fieldnorms_buffer[field.field_id() as usize].resize(max_doc as usize, 0u8);
for buffer_opt in self.fieldnorms_buffer.iter_mut() {
if let Some(buffer) = buffer_opt {
buffer.resize(max_doc as usize, 0u8);
}
}
}
@@ -69,21 +68,22 @@ impl FieldNormsWriter {
/// * field - the field being set
/// * fieldnorm - the number of terms present in document `doc` in field `field`
pub fn record(&mut self, doc: DocId, field: Field, fieldnorm: u32) {
let fieldnorm_buffer: &mut Vec<u8> = &mut self.fieldnorms_buffer[field.field_id() as usize];
assert!(
fieldnorm_buffer.len() <= doc as usize,
"Cannot register a given fieldnorm twice"
);
// we fill intermediary `DocId` as having a fieldnorm of 0.
fieldnorm_buffer.resize(doc as usize + 1, 0u8);
fieldnorm_buffer[doc as usize] = fieldnorm_to_id(fieldnorm);
if let Some(fieldnorm_buffer) = self.fieldnorms_buffer[field.field_id() as usize].as_mut() {
assert!(
fieldnorm_buffer.len() <= doc as usize,
"Cannot register a given fieldnorm twice" // we fill intermediary `DocId` as having a fieldnorm of 0.
);
fieldnorm_buffer.resize(doc as usize + 1, 0u8);
fieldnorm_buffer[doc as usize] = fieldnorm_to_id(fieldnorm);
}
}
/// Serialize the seen fieldnorm values to the serializer for all fields.
pub fn serialize(&self, mut fieldnorms_serializer: FieldNormsSerializer) -> io::Result<()> {
for &field in self.fields.iter() {
let fieldnorm_values: &[u8] = &self.fieldnorms_buffer[field.field_id() as usize][..];
fieldnorms_serializer.serialize_field(field, fieldnorm_values)?;
if let Some(buffer) = self.fieldnorms_buffer[field.field_id() as usize].as_ref() {
fieldnorms_serializer.serialize_field(field, &buffer[..])?;
}
}
fieldnorms_serializer.close()?;
Ok(())

View File

@@ -53,7 +53,7 @@ impl DeleteQueue {
return block;
}
let block = Arc::new(Block {
operations: Arc::new([]),
operations: Arc::default(),
next: NextBlock::from(self.clone()),
});
wlock.last_block = Arc::downgrade(&block);
@@ -108,7 +108,7 @@ impl DeleteQueue {
let delete_operations = mem::replace(&mut self_wlock.writer, vec![]);
let new_block = Arc::new(Block {
operations: Arc::from(delete_operations.into_boxed_slice()),
operations: Arc::new(delete_operations.into_boxed_slice()),
next: NextBlock::from(self.clone()),
});
@@ -167,7 +167,7 @@ impl NextBlock {
}
struct Block {
operations: Arc<[DeleteOperation]>,
operations: Arc<Box<[DeleteOperation]>>,
next: NextBlock,
}

View File

@@ -449,7 +449,7 @@ impl IndexWriter {
}
/// Accessor to the merge policy.
pub fn get_merge_policy(&self) -> Arc<dyn MergePolicy> {
pub fn get_merge_policy(&self) -> Arc<Box<dyn MergePolicy>> {
self.segment_updater.get_merge_policy()
}

View File

@@ -503,6 +503,7 @@ impl IndexMerger {
let mut positions_buffer: Vec<u32> = Vec::with_capacity(1_000);
let mut delta_computer = DeltaComputer::new();
let mut field_term_streams = Vec::new();
let mut max_term_ords: Vec<TermOrdinal> = Vec::new();
let field_readers: Vec<Arc<InvertedIndexReader>> = self
@@ -511,13 +512,10 @@ impl IndexMerger {
.map(|reader| reader.inverted_index(indexed_field))
.collect::<crate::Result<Vec<_>>>()?;
let mut field_term_streams = Vec::new();
for field_reader in &field_readers {
let terms = field_reader.terms();
field_term_streams.push(terms.stream());
max_term_ords.push(terms.num_terms() as u64);
let term_stream = terms.stream()?;
field_term_streams.push(term_stream);
}
let mut term_ord_mapping_opt = if *field_type == FieldType::HierarchicalFacet {

View File

@@ -9,15 +9,6 @@ pub struct DeleteOperation {
pub term: Term,
}
impl Default for DeleteOperation {
fn default() -> Self {
DeleteOperation {
opstamp: 0u64,
term: Term::new(),
}
}
}
/// Timestamped Add operation.
#[derive(Eq, PartialEq, Debug)]
pub struct AddOperation {

View File

@@ -154,7 +154,7 @@ pub(crate) struct InnerSegmentUpdater {
index: Index,
segment_manager: SegmentManager,
merge_policy: RwLock<Arc<dyn MergePolicy>>,
merge_policy: RwLock<Arc<Box<dyn MergePolicy>>>,
killed: AtomicBool,
stamper: Stamper,
merge_operations: MergeOperationInventory,
@@ -193,19 +193,19 @@ impl SegmentUpdater {
merge_thread_pool,
index,
segment_manager,
merge_policy: RwLock::new(Arc::new(DefaultMergePolicy::default())),
merge_policy: RwLock::new(Arc::new(Box::new(DefaultMergePolicy::default()))),
killed: AtomicBool::new(false),
stamper,
merge_operations: Default::default(),
})))
}
pub fn get_merge_policy(&self) -> Arc<dyn MergePolicy> {
pub fn get_merge_policy(&self) -> Arc<Box<dyn MergePolicy>> {
self.merge_policy.read().unwrap().clone()
}
pub fn set_merge_policy(&self, merge_policy: Box<dyn MergePolicy>) {
let arc_merge_policy = Arc::from(merge_policy);
let arc_merge_policy = Arc::new(merge_policy);
*self.merge_policy.write().unwrap() = arc_merge_policy;
}

View File

@@ -8,7 +8,7 @@ use std::io::{self, Write};
pub struct PositionSerializer<W: io::Write> {
bit_packer: BitPacker4x,
write_stream: CountingWriter<W>,
write_skip_index: W,
write_skiplist: W,
block: Vec<u32>,
buffer: Vec<u8>,
num_ints: u64,
@@ -16,11 +16,11 @@ pub struct PositionSerializer<W: io::Write> {
}
impl<W: io::Write> PositionSerializer<W> {
pub fn new(write_stream: W, write_skip_index: W) -> PositionSerializer<W> {
pub fn new(write_stream: W, write_skiplist: W) -> PositionSerializer<W> {
PositionSerializer {
bit_packer: BitPacker4x::new(),
write_stream: CountingWriter::wrap(write_stream),
write_skip_index,
write_skiplist,
block: Vec::with_capacity(128),
buffer: vec![0u8; 128 * 4],
num_ints: 0u64,
@@ -52,7 +52,7 @@ impl<W: io::Write> PositionSerializer<W> {
fn flush_block(&mut self) -> io::Result<()> {
let num_bits = self.bit_packer.num_bits(&self.block[..]);
self.write_skip_index.write_all(&[num_bits])?;
self.write_skiplist.write_all(&[num_bits])?;
let written_len = self
.bit_packer
.compress(&self.block[..], &mut self.buffer, num_bits);
@@ -70,10 +70,10 @@ impl<W: io::Write> PositionSerializer<W> {
self.flush_block()?;
}
for &long_skip in &self.long_skips {
long_skip.serialize(&mut self.write_skip_index)?;
long_skip.serialize(&mut self.write_skiplist)?;
}
(self.long_skips.len() as u32).serialize(&mut self.write_skip_index)?;
self.write_skip_index.flush()?;
(self.long_skips.len() as u32).serialize(&mut self.write_skiplist)?;
self.write_skiplist.flush()?;
self.write_stream.flush()?;
Ok(())
}

View File

@@ -469,7 +469,7 @@ mod tests {
let segment_reader = searcher.segment_reader(0);
let inverted_index = segment_reader.inverted_index(int_field).unwrap();
let term = Term::from_field_u64(int_field, 0u64);
let term_info = inverted_index.get_term_info(&term).unwrap().unwrap();
let term_info = inverted_index.get_term_info(&term).unwrap();
inverted_index
.read_block_postings_from_terminfo(&term_info, IndexRecordOption::Basic)
.unwrap()
@@ -513,7 +513,7 @@ mod tests {
{
let term = Term::from_field_u64(int_field, 0u64);
let inverted_index = segment_reader.inverted_index(int_field)?;
let term_info = inverted_index.get_term_info(&term)?.unwrap();
let term_info = inverted_index.get_term_info(&term).unwrap();
block_segments = inverted_index
.read_block_postings_from_terminfo(&term_info, IndexRecordOption::Basic)?;
}
@@ -521,7 +521,7 @@ mod tests {
{
let term = Term::from_field_u64(int_field, 1u64);
let inverted_index = segment_reader.inverted_index(int_field)?;
let term_info = inverted_index.get_term_info(&term)?.unwrap();
let term_info = inverted_index.get_term_info(&term).unwrap();
inverted_index.reset_block_postings_from_terminfo(&term_info, &mut block_segments)?;
}
assert_eq!(block_segments.docs(), &[1, 3, 5]);

View File

@@ -15,15 +15,19 @@ mod stacker;
mod term_info;
pub(crate) use self::block_search::BlockSearcher;
pub use self::block_segment_postings::BlockSegmentPostings;
pub use self::postings::Postings;
pub(crate) use self::postings_writer::MultiFieldPostingsWriter;
pub use self::segment_postings::SegmentPostings;
pub use self::serializer::{FieldSerializer, InvertedIndexSerializer};
pub use self::postings::Postings;
pub(crate) use self::skip::{BlockInfo, SkipReader};
pub(crate) use self::stacker::compute_table_size;
pub use self::term_info::TermInfo;
pub use self::block_segment_postings::BlockSegmentPostings;
pub use self::segment_postings::SegmentPostings;
pub(crate) use self::stacker::compute_table_size;
pub(crate) type UnorderedTermId = u64;
#[cfg_attr(feature = "cargo-clippy", allow(clippy::enum_variant_names))]
@@ -47,14 +51,17 @@ pub mod tests {
use crate::indexer::SegmentWriter;
use crate::merge_policy::NoMergePolicy;
use crate::query::Scorer;
use crate::schema::{Document, Schema, Term, INDEXED, STRING, TEXT};
use crate::schema::{Field, TextOptions};
use crate::schema::{IndexRecordOption, TextFieldIndexing};
use crate::schema::{Schema, Term, INDEXED, TEXT};
use crate::tokenizer::{SimpleTokenizer, MAX_TOKEN_LEN};
use crate::DocId;
use crate::HasLen;
use crate::Score;
use std::{iter, mem};
use once_cell::sync::Lazy;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use std::iter;
#[test]
pub fn test_position_write() -> crate::Result<()> {
@@ -71,7 +78,6 @@ pub mod tests {
field_serializer.write_doc(doc_id, 4, &delta_positions)?;
}
field_serializer.close_term()?;
mem::drop(field_serializer);
posting_serializer.close()?;
let read = segment.open_read(SegmentComponent::POSITIONS)?;
assert!(read.len() <= 140);
@@ -180,7 +186,7 @@ pub mod tests {
let inverted_index = segment_reader.inverted_index(text_field)?;
assert_eq!(inverted_index.terms().num_terms(), 1);
let mut bytes = vec![];
assert!(inverted_index.terms().ord_to_term(0, &mut bytes)?);
assert!(inverted_index.terms().ord_to_term(0, &mut bytes));
assert_eq!(&bytes, b"hello");
}
{
@@ -192,7 +198,7 @@ pub mod tests {
let inverted_index = segment_reader.inverted_index(text_field)?;
assert_eq!(inverted_index.terms().num_terms(), 1);
let mut bytes = vec![];
assert!(inverted_index.terms().ord_to_term(0, &mut bytes)?);
assert!(inverted_index.terms().ord_to_term(0, &mut bytes));
assert_eq!(&bytes[..], ok_token_text.as_bytes());
}
Ok(())
@@ -485,6 +491,53 @@ pub mod tests {
Ok(())
}
pub static TERM_A: Lazy<Term> = Lazy::new(|| {
let field = Field::from_field_id(0);
Term::from_field_text(field, "a")
});
pub static TERM_B: Lazy<Term> = Lazy::new(|| {
let field = Field::from_field_id(0);
Term::from_field_text(field, "b")
});
pub static TERM_C: Lazy<Term> = Lazy::new(|| {
let field = Field::from_field_id(0);
Term::from_field_text(field, "c")
});
pub static TERM_D: Lazy<Term> = Lazy::new(|| {
let field = Field::from_field_id(0);
Term::from_field_text(field, "d")
});
pub static INDEX: Lazy<Index> = Lazy::new(|| {
let mut schema_builder = Schema::builder();
let text_field = schema_builder.add_text_field("text", STRING);
let schema = schema_builder.build();
let mut rng: StdRng = StdRng::from_seed([1u8; 32]);
let index = Index::create_in_ram(schema);
let posting_list_size = 1_000_000;
{
let mut index_writer = index.writer_for_tests().unwrap();
for _ in 0..posting_list_size {
let mut doc = Document::default();
if rng.gen_bool(1f64 / 15f64) {
doc.add_text(text_field, "a");
}
if rng.gen_bool(1f64 / 10f64) {
doc.add_text(text_field, "b");
}
if rng.gen_bool(1f64 / 5f64) {
doc.add_text(text_field, "c");
}
doc.add_text(text_field, "d");
index_writer.add_document(doc);
}
assert!(index_writer.commit().is_ok());
}
index
});
/// Wraps a given docset, and forward alls call but the
/// `.skip_next(...)`. This is useful to test that a specialized
/// implementation of `.skip_next(...)` is consistent
@@ -549,65 +602,15 @@ pub mod tests {
#[cfg(all(test, feature = "unstable"))]
mod bench {
use super::tests::*;
use crate::docset::TERMINATED;
use crate::query::Intersection;
use crate::schema::IndexRecordOption;
use crate::schema::{Document, Field, Schema, Term, STRING};
use crate::tests;
use crate::DocSet;
use crate::Index;
use once_cell::sync::Lazy;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use test::{self, Bencher};
pub static TERM_A: Lazy<Term> = Lazy::new(|| {
let field = Field::from_field_id(0);
Term::from_field_text(field, "a")
});
pub static TERM_B: Lazy<Term> = Lazy::new(|| {
let field = Field::from_field_id(0);
Term::from_field_text(field, "b")
});
pub static TERM_C: Lazy<Term> = Lazy::new(|| {
let field = Field::from_field_id(0);
Term::from_field_text(field, "c")
});
pub static TERM_D: Lazy<Term> = Lazy::new(|| {
let field = Field::from_field_id(0);
Term::from_field_text(field, "d")
});
pub static INDEX: Lazy<Index> = Lazy::new(|| {
let mut schema_builder = Schema::builder();
let text_field = schema_builder.add_text_field("text", STRING);
let schema = schema_builder.build();
let mut rng: StdRng = StdRng::from_seed([1u8; 32]);
let index = Index::create_in_ram(schema);
let posting_list_size = 1_000_000;
{
let mut index_writer = index.writer_for_tests().unwrap();
for _ in 0..posting_list_size {
let mut doc = Document::default();
if rng.gen_bool(1f64 / 15f64) {
doc.add_text(text_field, "a");
}
if rng.gen_bool(1f64 / 10f64) {
doc.add_text(text_field, "b");
}
if rng.gen_bool(1f64 / 5f64) {
doc.add_text(text_field, "c");
}
doc.add_text(text_field, "d");
index_writer.add_document(doc);
}
assert!(index_writer.commit().is_ok());
}
index
});
#[bench]
fn bench_segment_postings(b: &mut Bencher) {
let reader = INDEX.reader().unwrap();
@@ -617,9 +620,7 @@ mod bench {
b.iter(|| {
let mut segment_postings = segment_reader
.inverted_index(TERM_A.field())
.unwrap()
.read_postings(&*TERM_A, IndexRecordOption::Basic)
.unwrap()
.read_postings(&*TERM_A, IndexRecordOption::Basic)?
.unwrap();
while segment_postings.advance() != TERMINATED {}
});
@@ -633,25 +634,21 @@ mod bench {
b.iter(|| {
let segment_postings_a = segment_reader
.inverted_index(TERM_A.field())
.unwrap()
.read_postings(&*TERM_A, IndexRecordOption::Basic)
.unwrap()
.unwrap();
let segment_postings_b = segment_reader
.inverted_index(TERM_B.field())
.unwrap()
.read_postings(&*TERM_B, IndexRecordOption::Basic)
.unwrap()
.unwrap();
let segment_postings_c = segment_reader
.inverted_index(TERM_C.field())
.unwrap()
.read_postings(&*TERM_C, IndexRecordOption::Basic)
.unwrap()
.unwrap();
let segment_postings_d = segment_reader
.inverted_index(TERM_D.field())
.unwrap()
.read_postings(&*TERM_D, IndexRecordOption::Basic)
.unwrap()
.unwrap();
@@ -673,7 +670,6 @@ mod bench {
let mut segment_postings = segment_reader
.inverted_index(TERM_A.field())
.unwrap()
.read_postings(&*TERM_A, IndexRecordOption::Basic)
.unwrap()
.unwrap();
@@ -691,9 +687,7 @@ mod bench {
b.iter(|| {
let mut segment_postings = segment_reader
.inverted_index(TERM_A.field())
.unwrap()
.read_postings(&*TERM_A, IndexRecordOption::Basic)
.unwrap()
.unwrap();
for doc in &existing_docs {
if segment_postings.seek(*doc) == TERMINATED {
@@ -732,9 +726,7 @@ mod bench {
let n: u32 = test::black_box(17);
let mut segment_postings = segment_reader
.inverted_index(TERM_A.field())
.unwrap()
.read_postings(&*TERM_A, IndexRecordOption::Basic)
.unwrap()
.unwrap();
let mut s = 0u32;
while segment_postings.doc() != TERMINATED {

View File

@@ -177,16 +177,14 @@ impl<'a> FieldSerializer<'a> {
}
fn current_term_info(&self) -> TermInfo {
let positions_idx =
if let Some(positions_serializer) = self.positions_serializer_opt.as_ref() {
positions_serializer.positions_idx()
} else {
0u64
};
let positions_idx = self
.positions_serializer_opt
.as_ref()
.map(PositionSerializer::positions_idx)
.unwrap_or(0u64);
TermInfo {
doc_freq: 0,
postings_start_offset: self.postings_serializer.addr(),
postings_stop_offset: 0u64,
postings_offset: self.postings_serializer.addr(),
positions_idx,
}
}
@@ -240,11 +238,10 @@ impl<'a> FieldSerializer<'a> {
/// using `VInt` encoding.
pub fn close_term(&mut self) -> io::Result<()> {
if self.term_open {
self.postings_serializer
.close_term(self.current_term_info.doc_freq)?;
self.current_term_info.postings_stop_offset = self.postings_serializer.addr();
self.term_dictionary_builder
.insert_value(&self.current_term_info)?;
self.postings_serializer
.close_term(self.current_term_info.doc_freq)?;
self.term_open = false;
}
Ok(())
@@ -325,9 +322,8 @@ pub struct PostingsSerializer<W: Write> {
bm25_weight: Option<BM25Weight>,
num_docs: u32, // Number of docs in the segment
avg_fieldnorm: Score, // Average number of term in the field for that segment.
// this value is used to compute the block wand information.
// this value is used to compute the block wand information.
}
impl<W: Write> PostingsSerializer<W> {
@@ -337,10 +333,6 @@ impl<W: Write> PostingsSerializer<W> {
mode: IndexRecordOption,
fieldnorm_reader: Option<FieldNormReader>,
) -> PostingsSerializer<W> {
let num_docs = fieldnorm_reader
.as_ref()
.map(|fieldnorm_reader| fieldnorm_reader.num_docs())
.unwrap_or(0u32);
PostingsSerializer {
output_write: CountingWriter::wrap(write),
@@ -356,20 +348,25 @@ impl<W: Write> PostingsSerializer<W> {
fieldnorm_reader,
bm25_weight: None,
num_docs,
avg_fieldnorm,
}
}
/// Returns the number of documents in the segment currently being serialized.
/// This function may return `None` if there are no fieldnorm for that field.
fn num_docs_in_segment(&self) -> Option<u32> {
self.fieldnorm_reader
.as_ref()
.map(|reader| reader.num_docs())
}
pub fn new_term(&mut self, term_doc_freq: u32) {
if self.mode.has_freq() && self.num_docs > 0 {
let bm25_weight = BM25Weight::for_one_term(
term_doc_freq as u64,
self.num_docs as u64,
self.avg_fieldnorm,
);
self.bm25_weight = Some(bm25_weight);
if self.mode.has_freq() {
return;
}
self.bm25_weight = self.num_docs_in_segment().map(|num_docs| {
BM25Weight::for_one_term(term_doc_freq as u64, num_docs as u64, self.avg_fieldnorm)
});
}
fn write_block(&mut self) {

View File

@@ -7,50 +7,35 @@ use std::io;
pub struct TermInfo {
/// Number of documents in the segment containing the term
pub doc_freq: u32,
/// Start offset of the posting list within the postings (`.idx`) file.
pub postings_start_offset: u64,
/// Stop offset of the posting list within the postings (`.idx`) file.
/// The byte range is `[start_offset..stop_offset)`.
pub postings_stop_offset: u64,
/// Start offset within the postings (`.idx`) file.
pub postings_offset: u64,
/// Start offset of the first block within the position (`.pos`) file.
pub positions_idx: u64,
}
impl TermInfo {
pub(crate) fn posting_num_bytes(&self) -> u32 {
let num_bytes = self.postings_stop_offset - self.postings_start_offset;
assert!(num_bytes <= std::u32::MAX as u64);
num_bytes as u32
}
}
impl FixedSize for TermInfo {
/// Size required for the binary serialization of a `TermInfo` object.
/// This is large, but in practise, `TermInfo` are encoded in blocks and
/// only the first `TermInfo` of a block is serialized uncompressed.
/// The subsequent `TermInfo` are delta encoded and bitpacked.
const SIZE_IN_BYTES: usize = 2 * u32::SIZE_IN_BYTES + 2 * u64::SIZE_IN_BYTES;
const SIZE_IN_BYTES: usize = u32::SIZE_IN_BYTES + 2 * u64::SIZE_IN_BYTES;
}
impl BinarySerializable for TermInfo {
fn serialize<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
self.doc_freq.serialize(writer)?;
self.postings_start_offset.serialize(writer)?;
self.posting_num_bytes().serialize(writer)?;
self.postings_offset.serialize(writer)?;
self.positions_idx.serialize(writer)?;
Ok(())
}
fn deserialize<R: io::Read>(reader: &mut R) -> io::Result<Self> {
let doc_freq = u32::deserialize(reader)?;
let postings_start_offset = u64::deserialize(reader)?;
let postings_num_bytes = u32::deserialize(reader)?;
let postings_stop_offset = postings_start_offset + u64::from(postings_num_bytes);
let postings_offset = u64::deserialize(reader)?;
let positions_idx = u64::deserialize(reader)?;
Ok(TermInfo {
doc_freq,
postings_start_offset,
postings_stop_offset,
postings_offset,
positions_idx,
})
}

View File

@@ -7,7 +7,6 @@ use crate::schema::{Field, IndexRecordOption};
use crate::termdict::{TermDictionary, TermStreamer};
use crate::TantivyError;
use crate::{DocId, Score};
use std::io;
use std::sync::Arc;
use tantivy_fst::Automaton;
@@ -20,7 +19,6 @@ pub struct AutomatonWeight<A> {
impl<A> AutomatonWeight<A>
where
A: Automaton + Send + Sync + 'static,
A::State: Clone,
{
/// Create a new AutomationWeight
pub fn new<IntoArcA: Into<Arc<A>>>(field: Field, automaton: IntoArcA) -> AutomatonWeight<A> {
@@ -30,10 +28,7 @@ where
}
}
fn automaton_stream<'a>(
&'a self,
term_dict: &'a TermDictionary,
) -> io::Result<TermStreamer<'a, &'a A>> {
fn automaton_stream<'a>(&'a self, term_dict: &'a TermDictionary) -> TermStreamer<'a, &'a A> {
let automaton: &A = &*self.automaton;
let term_stream_builder = term_dict.search(automaton);
term_stream_builder.into_stream()
@@ -43,14 +38,13 @@ where
impl<A> Weight for AutomatonWeight<A>
where
A: Automaton + Send + Sync + 'static,
A::State: Clone,
{
fn scorer(&self, reader: &SegmentReader, boost: Score) -> crate::Result<Box<dyn Scorer>> {
let max_doc = reader.max_doc();
let mut doc_bitset = BitSet::with_max_value(max_doc);
let inverted_index = reader.inverted_index(self.field)?;
let term_dict = inverted_index.terms();
let mut term_stream = self.automaton_stream(term_dict)?;
let mut term_stream = self.automaton_stream(term_dict);
while term_stream.advance() {
let term_info = term_stream.value();
let mut block_segment_postings = inverted_index
@@ -104,7 +98,6 @@ mod tests {
index
}
#[derive(Clone, Copy)]
enum State {
Start,
NotMatching,

View File

@@ -106,7 +106,7 @@ impl BM25Weight {
BM25Weight::new(idf_explain, avg_fieldnorm)
}
pub(crate) fn new(idf_explain: Explanation, average_fieldnorm: Score) -> BM25Weight {
fn new(idf_explain: Explanation, average_fieldnorm: Score) -> BM25Weight {
let weight = idf_explain.value() * (1.0 + K1);
BM25Weight {
idf_explain,

View File

@@ -268,7 +268,7 @@ mod tests {
}
fn nearly_equals(left: Score, right: Score) -> bool {
(left - right).abs() < 0.0001 * (left + right).abs()
(left - right).abs() < 0.000001 * (left + right).abs()
}
fn compute_checkpoints_for_each_pruning(
@@ -424,116 +424,9 @@ mod tests {
}
}
#[test]
fn test_fn_reproduce_proptest() {
let postings_lists = &[
vec![
(0, 1),
(1, 1),
(2, 1),
(3, 1),
(4, 1),
(6, 1),
(7, 7),
(8, 1),
(10, 1),
(12, 1),
(13, 1),
(14, 1),
(15, 1),
(16, 1),
(19, 1),
(20, 1),
(21, 1),
(22, 1),
(24, 1),
(25, 1),
(26, 1),
(28, 1),
(30, 1),
(31, 1),
(33, 1),
(34, 1),
(35, 1),
(36, 95),
(37, 1),
(39, 1),
(41, 1),
(44, 1),
(46, 1),
],
vec![
(0, 5),
(2, 1),
(4, 1),
(5, 84),
(6, 47),
(7, 26),
(8, 50),
(9, 34),
(11, 73),
(12, 11),
(13, 51),
(14, 45),
(15, 18),
(18, 60),
(19, 80),
(20, 63),
(23, 79),
(24, 69),
(26, 35),
(28, 82),
(29, 19),
(30, 2),
(31, 7),
(33, 40),
(34, 1),
(35, 33),
(36, 27),
(37, 24),
(38, 65),
(39, 32),
(40, 85),
(41, 1),
(42, 69),
(43, 11),
(45, 45),
(47, 97),
],
vec![
(2, 1),
(4, 1),
(7, 94),
(8, 1),
(9, 1),
(10, 1),
(12, 1),
(15, 1),
(22, 1),
(23, 1),
(26, 1),
(27, 1),
(32, 1),
(33, 1),
(34, 1),
(36, 96),
(39, 1),
(41, 1),
],
];
let fieldnorms = &[
685, 239, 780, 564, 664, 827, 5, 56, 930, 887, 263, 665, 167, 127, 120, 919, 292, 92,
489, 734, 814, 724, 700, 304, 128, 779, 311, 877, 774, 15, 866, 368, 894, 371, 982,
502, 507, 669, 680, 76, 594, 626, 578, 331, 170, 639, 665, 186,
][..];
test_block_wand_aux(postings_lists, fieldnorms);
}
proptest! {
#![proptest_config(ProptestConfig::with_cases(500))]
#[ignore]
#[test]
#[ignore]
fn test_block_wand_three_term_scorers((posting_lists, fieldnorms) in gen_term_scorers(3)) {
test_block_wand_aux(&posting_lists[..], &fieldnorms[..]);
}

View File

@@ -1,5 +1,3 @@
use rayon::iter::IntoParallelRefIterator;
use crate::core::SegmentReader;
use crate::postings::FreqReadingOption;
use crate::query::explanation::does_not_match;
@@ -24,7 +22,7 @@ enum SpecializedScorer {
fn scorer_union<TScoreCombiner>(scorers: Vec<Box<dyn Scorer>>) -> SpecializedScorer
where
TScoreCombiner: ScoreCombiner + Send,
TScoreCombiner: ScoreCombiner,
{
assert!(!scorers.is_empty());
if scorers.len() == 1 {
@@ -54,7 +52,7 @@ where
SpecializedScorer::Other(Box::new(Union::<_, TScoreCombiner>::from(scorers)))
}
fn into_box_scorer<TScoreCombiner: ScoreCombiner + Send>(scorer: SpecializedScorer) -> Box<dyn Scorer> {
fn into_box_scorer<TScoreCombiner: ScoreCombiner>(scorer: SpecializedScorer) -> Box<dyn Scorer> {
match scorer {
SpecializedScorer::TermUnion(term_scorers) => {
let union_scorer = Union::<TermScorer, TScoreCombiner>::from(term_scorers);
@@ -82,32 +80,18 @@ impl BooleanWeight {
reader: &SegmentReader,
boost: Score,
) -> crate::Result<HashMap<Occur, Vec<Box<dyn Scorer>>>> {
use rayon::iter::ParallelIterator;
use rayon::iter::IndexedParallelIterator;
let mut per_occur_scorers: HashMap<Occur, Vec<Box<dyn Scorer>>> = HashMap::new();
let mut items_res: Vec<crate::Result<(Occur, Box<dyn Scorer>)>> = Vec::new();
let pool = rayon::ThreadPoolBuilder::new().num_threads(self.weights.len()).build().unwrap();
pool.install(|| {
self.weights.iter()
.collect::<Vec<_>>()
.par_iter()
.map(|(occur, subweight)| {
let sub_scorer: Box<dyn Scorer> = subweight.scorer(reader, boost)?;
Ok((*occur, sub_scorer))
})
.collect_into_vec(&mut items_res);
});
for item_res in items_res {
let (occur, sub_scorer) = item_res?;
for &(ref occur, ref subweight) in &self.weights {
let sub_scorer: Box<dyn Scorer> = subweight.scorer(reader, boost)?;
per_occur_scorers
.entry(occur)
.entry(*occur)
.or_insert_with(Vec::new)
.push(sub_scorer);
}
Ok(per_occur_scorers)
}
fn complex_scorer<TScoreCombiner: ScoreCombiner >(
fn complex_scorer<TScoreCombiner: ScoreCombiner>(
&self,
reader: &SegmentReader,
boost: Score,

View File

@@ -310,7 +310,7 @@ mod tests {
));
let query = BooleanQuery::from(vec![(Occur::Should, term_a), (Occur::Should, term_b)]);
let explanation = query.explain(&searcher, DocAddress(0, 0u32))?;
assert_nearly_equals!(explanation.value(), 0.6931472);
assert_nearly_equals!(explanation.value(), 0.6931472f32);
Ok(())
}
}

View File

@@ -11,7 +11,6 @@ use crate::schema::{Field, IndexRecordOption, Term};
use crate::termdict::{TermDictionary, TermStreamer};
use crate::{DocId, Score};
use std::collections::Bound;
use std::io;
use std::ops::Range;
fn map_bound<TFrom, TTo, Transform: Fn(&TFrom) -> TTo>(
@@ -275,7 +274,7 @@ pub struct RangeWeight {
}
impl RangeWeight {
fn term_range<'a>(&self, term_dict: &'a TermDictionary) -> io::Result<TermStreamer<'a>> {
fn term_range<'a>(&self, term_dict: &'a TermDictionary) -> TermStreamer<'a> {
use std::collections::Bound::*;
let mut term_stream_builder = term_dict.range();
term_stream_builder = match self.left_bound {
@@ -299,7 +298,7 @@ impl Weight for RangeWeight {
let inverted_index = reader.inverted_index(self.field)?;
let term_dict = inverted_index.terms();
let mut term_range = self.term_range(term_dict)?;
let mut term_range = self.term_range(term_dict);
while term_range.advance() {
let term_info = term_range.value();
let mut block_segment_postings = inverted_index

View File

@@ -12,7 +12,7 @@ use std::marker::PhantomData;
/// This is useful for queries like `+somethingrequired somethingoptional`.
///
/// Note that `somethingoptional` has no impact on the `DocSet`.
pub struct RequiredOptionalScorer<TReqScorer, TOptScorer, TScoreCombiner: ScoreCombiner> {
pub struct RequiredOptionalScorer<TReqScorer, TOptScorer, TScoreCombiner> {
req_scorer: TReqScorer,
opt_scorer: TOptScorer,
score_cache: Option<Score>,
@@ -23,7 +23,6 @@ impl<TReqScorer, TOptScorer, TScoreCombiner>
RequiredOptionalScorer<TReqScorer, TOptScorer, TScoreCombiner>
where
TOptScorer: DocSet,
TScoreCombiner: ScoreCombiner,
{
/// Creates a new `RequiredOptionalScorer`.
pub fn new(
@@ -44,7 +43,6 @@ impl<TReqScorer, TOptScorer, TScoreCombiner> DocSet
where
TReqScorer: DocSet,
TOptScorer: DocSet,
TScoreCombiner: ScoreCombiner,
{
fn advance(&mut self) -> DocId {
self.score_cache = None;

View File

@@ -3,7 +3,7 @@ use crate::Score;
/// The `ScoreCombiner` trait defines how to compute
/// an overall score given a list of scores.
pub trait ScoreCombiner: Default + Clone + Send + Copy + 'static {
pub trait ScoreCombiner: Default + Clone + Copy + 'static {
/// Aggregates the score combiner with the given scorer.
///
/// The `ScoreCombiner` may decide to call `.scorer.score()`

View File

@@ -197,7 +197,7 @@ mod tests {
let searcher = index.reader()?.searcher();
{
let explanation = term_query.explain(&searcher, DocAddress(0u32, 1u32))?;
assert_nearly_equals!(explanation.value(), 0.6931472);
assert_nearly_equals!(explanation.value(), 0.6931472f32);
}
{
let explanation_err = term_query.explain(&searcher, DocAddress(0u32, 0u32));

View File

@@ -1,7 +1,7 @@
use super::term_weight::TermWeight;
use crate::query::bm25::BM25Weight;
use crate::query::Query;
use crate::query::Weight;
use crate::query::{Explanation, Query};
use crate::schema::IndexRecordOption;
use crate::Searcher;
use crate::Term;
@@ -92,21 +92,19 @@ impl TermQuery {
searcher: &Searcher,
scoring_enabled: bool,
) -> crate::Result<TermWeight> {
let term = self.term.clone();
let field_entry = searcher.schema().get_field_entry(term.field());
let field_entry = searcher
.schema()
.get_field_entry(self.term.field());
if !field_entry.is_indexed() {
return Err(crate::TantivyError::SchemaError(format!(
"Field {:?} is not indexed",
field_entry.name()
)));
}
let bm25_weight;
if scoring_enabled {
bm25_weight = BM25Weight::for_terms(searcher, &[term])?;
} else {
bm25_weight =
BM25Weight::new(Explanation::new("<no score>".to_string(), 1.0f32), 1.0f32);
let error_msg = format!("Field {:?} is not indexed.", field_entry.name());
return Err(crate::TantivyError::SchemaError(error_msg));
}
let has_fieldnorms = searcher
.schema()
.get_field_entry(self.term.field())
.has_fieldnorms();
let term = self.term.clone();
let bm25_weight = BM25Weight::for_terms(searcher, &[term])?;
let index_record_option = if scoring_enabled {
self.index_record_option
} else {
@@ -116,7 +114,7 @@ impl TermQuery {
self.term.clone(),
index_record_option,
bm25_weight,
scoring_enabled,
has_fieldnorms,
))
}
}

View File

@@ -16,7 +16,7 @@ pub struct TermWeight {
term: Term,
index_record_option: IndexRecordOption,
similarity_weight: BM25Weight,
scoring_enabled: bool,
has_fieldnorms: bool,
}
impl Weight for TermWeight {
@@ -45,7 +45,7 @@ impl Weight for TermWeight {
} else {
let field = self.term.field();
let inv_index = reader.inverted_index(field)?;
let term_info = inv_index.get_term_info(&self.term)?;
let term_info = inv_index.get_term_info(&self.term);
Ok(term_info.map(|term_info| term_info.doc_freq).unwrap_or(0))
}
}
@@ -89,13 +89,13 @@ impl TermWeight {
term: Term,
index_record_option: IndexRecordOption,
similarity_weight: BM25Weight,
scoring_enabled: bool,
has_fieldnorms: bool,
) -> TermWeight {
TermWeight {
term,
index_record_option,
similarity_weight,
scoring_enabled,
has_fieldnorms,
}
}
@@ -106,10 +106,10 @@ impl TermWeight {
) -> crate::Result<TermScorer> {
let field = self.term.field();
let inverted_index = reader.inverted_index(field)?;
let fieldnorm_reader = if self.scoring_enabled {
let fieldnorm_reader = if self.has_fieldnorms {
reader.get_fieldnorms_reader(field)?
} else {
FieldNormReader::constant(reader.max_doc(), 1)
FieldNormReader::const_fieldnorm_id(1u8, reader.num_docs())
};
let similarity_weight = self.similarity_weight.boost_by(boost);
let postings_opt: Option<SegmentPostings> =

View File

@@ -3,9 +3,9 @@ mod pool;
pub use self::pool::LeasedItem;
use self::pool::Pool;
use crate::core::Segment;
use crate::directory::Directory;
use crate::directory::WatchHandle;
use crate::directory::META_LOCK;
use crate::directory::{Directory, WatchCallback};
use crate::Index;
use crate::Searcher;
use crate::SegmentReader;
@@ -88,7 +88,7 @@ impl IndexReaderBuilder {
let watch_handle = inner_reader_arc
.index
.directory()
.watch(WatchCallback::new(callback))?;
.watch(Box::new(callback))?;
watch_handle_opt = Some(watch_handle);
}
}

View File

@@ -233,7 +233,6 @@ mod tests {
assert_eq!(Facet::root(), Facet::from("/"));
assert_eq!(format!("{}", Facet::root()), "/");
assert!(Facet::root().is_root());
assert_eq!(Facet::root().encoded_str(), "");
}
#[test]

View File

@@ -112,6 +112,21 @@ impl FieldEntry {
}
}
pub fn has_fieldnorms(&self) -> bool {
match self.field_type {
FieldType::Str(ref options) => options
.get_indexing_options()
.map(|options| options.fieldnorms())
.unwrap_or(false),
FieldType::U64(ref options)
| FieldType::I64(ref options)
| FieldType::F64(ref options)
| FieldType::Date(ref options) => options.index_option().has_fieldnorms(),
FieldType::HierarchicalFacet => false,
FieldType::Bytes(ref _options) => false,
}
}
/// Returns true iff the field is a int (signed or unsigned) fast field
pub fn is_fast(&self) -> bool {
match self.field_type {
@@ -272,7 +287,8 @@ impl<'de> Deserialize<'de> for FieldEntry {
#[cfg(test)]
mod tests {
use super::*;
use crate::schema::TEXT;
use crate::schema::{Schema, STRING, TEXT};
use crate::Index;
use serde_json;
#[test]
@@ -291,7 +307,8 @@ mod tests {
"options": {
"indexing": {
"record": "position",
"tokenizer": "default"
"tokenizer": "default",
"fieldnorms": true
},
"stored": false
}
@@ -309,4 +326,19 @@ mod tests {
_ => panic!("expected FieldType::Str"),
}
}
#[test]
fn test_fieldnorms() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
let text = schema_builder.add_text_field("text", STRING);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_for_tests()?;
index_writer.add_document(doc!(text=>"abc"));
index_writer.commit()?;
let searcher = index.reader()?.searcher();
let err = searcher.segment_reader(0u32).get_fieldnorms_reader(text);
assert!(matches!(err, Err(crate::TantivyError::SchemaError(_))));
Ok(())
}
}

View File

@@ -14,10 +14,50 @@ pub enum Cardinality {
MultiValues,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum IntOptionIndex {
#[serde(rename = "no_index")]
NoIndex,
#[serde(rename = "index_no_fieldnorms")]
IndexNoFieldnorms,
#[serde(rename = "index_with_fieldnorms")]
IndexWithFieldnorms,
}
impl BitOr<IntOptionIndex> for IntOptionIndex {
type Output = IntOptionIndex;
fn bitor(self, other: IntOptionIndex) -> IntOptionIndex {
match (self, other) {
(_, Self::IndexWithFieldnorms) | (Self::IndexWithFieldnorms, _) => {
Self::IndexWithFieldnorms
}
(_, Self::IndexNoFieldnorms) | (Self::IndexNoFieldnorms, _) => Self::IndexNoFieldnorms,
(Self::NoIndex, Self::NoIndex) => Self::NoIndex,
}
}
}
impl IntOptionIndex {
pub fn is_indexed(&self) -> bool {
match *self {
Self::NoIndex => false,
Self::IndexNoFieldnorms | Self::IndexWithFieldnorms => true,
}
}
pub fn has_fieldnorms(&self) -> bool {
match *self {
Self::NoIndex | Self::IndexNoFieldnorms => false,
Self::IndexWithFieldnorms => true,
}
}
}
/// Define how an u64, i64, of f64 field should be handled by tantivy.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct IntOptions {
indexed: bool,
indexed: IntOptionIndex,
#[serde(skip_serializing_if = "Option::is_none")]
fast: Option<Cardinality>,
stored: bool,
@@ -31,7 +71,7 @@ impl IntOptions {
/// Returns true iff the value is indexed.
pub fn is_indexed(&self) -> bool {
self.indexed
self.indexed.is_indexed()
}
/// Returns true iff the value is a fast field.
@@ -48,12 +88,21 @@ impl IntOptions {
self
}
pub fn index_option(&self) -> &IntOptionIndex {
&self.indexed
}
pub fn set_indexed(mut self) -> IntOptions {
self.indexed = IntOptionIndex::IndexWithFieldnorms;
self
}
/// Set the field as indexed.
///
/// Setting an integer as indexed will generate
/// a posting list for each value taken by the integer.
pub fn set_indexed(mut self) -> IntOptions {
self.indexed = true;
pub fn set_index_option(mut self, int_option_index: IntOptionIndex) -> IntOptions {
self.indexed = int_option_index;
self
}
@@ -80,7 +129,7 @@ impl IntOptions {
impl Default for IntOptions {
fn default() -> IntOptions {
IntOptions {
indexed: false,
indexed: IntOptionIndex::NoIndex,
stored: false,
fast: None,
}
@@ -96,7 +145,7 @@ impl From<()> for IntOptions {
impl From<FastFlag> for IntOptions {
fn from(_: FastFlag) -> Self {
IntOptions {
indexed: false,
indexed: IntOptionIndex::NoIndex,
stored: false,
fast: Some(Cardinality::SingleValue),
}
@@ -106,7 +155,7 @@ impl From<FastFlag> for IntOptions {
impl From<StoredFlag> for IntOptions {
fn from(_: StoredFlag) -> Self {
IntOptions {
indexed: false,
indexed: IntOptionIndex::NoIndex,
stored: true,
fast: None,
}
@@ -116,7 +165,7 @@ impl From<StoredFlag> for IntOptions {
impl From<IndexedFlag> for IntOptions {
fn from(_: IndexedFlag) -> Self {
IntOptions {
indexed: true,
indexed: IntOptionIndex::IndexWithFieldnorms,
stored: false,
fast: None,
}

View File

@@ -231,6 +231,10 @@ impl Schema {
&self.0.fields[field.field_id() as usize]
}
pub fn num_fields(&self) -> usize {
self.0.fields.len()
}
/// Return the field name for a given `Field`.
pub fn get_field_name(&self, field: Field) -> &str {
self.get_field_entry(field).name()
@@ -444,7 +448,8 @@ mod tests {
"options": {
"indexing": {
"record": "position",
"tokenizer": "default"
"tokenizer": "default",
"fieldnorms": true
},
"stored": false
}
@@ -455,7 +460,8 @@ mod tests {
"options": {
"indexing": {
"record": "basic",
"tokenizer": "raw"
"tokenizer": "raw",
"fieldnorms": false
},
"stored": false
}
@@ -464,7 +470,7 @@ mod tests {
"name": "count",
"type": "u64",
"options": {
"indexed": false,
"indexed": "no_index",
"fast": "single",
"stored": true
}
@@ -473,7 +479,7 @@ mod tests {
"name": "popularity",
"type": "i64",
"options": {
"indexed": false,
"indexed": "no_index",
"fast": "single",
"stored": true
}
@@ -482,7 +488,7 @@ mod tests {
"name": "score",
"type": "f64",
"options": {
"indexed": true,
"indexed": "index_with_fieldnorms",
"fast": "single",
"stored": false
}
@@ -747,7 +753,8 @@ mod tests {
"options": {
"indexing": {
"record": "position",
"tokenizer": "default"
"tokenizer": "default",
"fieldnorms": true
},
"stored": false
}
@@ -756,7 +763,7 @@ mod tests {
"name": "popularity",
"type": "i64",
"options": {
"indexed": false,
"indexed": "no_index",
"fast": "single",
"stored": true
}
@@ -777,7 +784,8 @@ mod tests {
"options": {
"indexing": {
"record": "basic",
"tokenizer": "raw"
"tokenizer": "raw",
"fieldnorms": false
},
"stored": true
}
@@ -786,7 +794,7 @@ mod tests {
"name": "_timestamp",
"type": "date",
"options": {
"indexed": true,
"indexed": "index_with_fieldnorms",
"fast": "single",
"stored": true
}
@@ -797,7 +805,8 @@ mod tests {
"options": {
"indexing": {
"record": "position",
"tokenizer": "default"
"tokenizer": "default",
"fieldnorms": true
},
"stored": false
}
@@ -806,7 +815,7 @@ mod tests {
"name": "popularity",
"type": "i64",
"options": {
"indexed": false,
"indexed": "no_index",
"fast": "single",
"stored": true
}

View File

@@ -55,6 +55,7 @@ impl Default for TextOptions {
pub struct TextFieldIndexing {
record: IndexRecordOption,
tokenizer: Cow<'static, str>,
fieldnorms: bool,
}
impl Default for TextFieldIndexing {
@@ -62,6 +63,7 @@ impl Default for TextFieldIndexing {
TextFieldIndexing {
tokenizer: Cow::Borrowed("default"),
record: IndexRecordOption::Basic,
fieldnorms: false,
}
}
}
@@ -78,6 +80,15 @@ impl TextFieldIndexing {
&self.tokenizer
}
pub fn set_fieldnorms(mut self, fieldnorms: bool) -> TextFieldIndexing {
self.fieldnorms = fieldnorms;
self
}
pub fn fieldnorms(&self) -> bool {
self.fieldnorms
}
/// Sets which information should be indexed with the tokens.
///
/// See [IndexRecordOption](./enum.IndexRecordOption.html) for more detail.
@@ -99,6 +110,7 @@ pub const STRING: TextOptions = TextOptions {
indexing: Some(TextFieldIndexing {
tokenizer: Cow::Borrowed("raw"),
record: IndexRecordOption::Basic,
fieldnorms: false,
}),
stored: false,
};
@@ -108,6 +120,7 @@ pub const TEXT: TextOptions = TextOptions {
indexing: Some(TextFieldIndexing {
tokenizer: Cow::Borrowed("default"),
record: IndexRecordOption::WithFreqsAndPositions,
fieldnorms: true,
}),
stored: false,
};

View File

@@ -1,165 +0,0 @@
use crate::common::VInt;
use crate::store::index::{Checkpoint, CHECKPOINT_PERIOD};
use crate::DocId;
use std::io;
/// Represents a block of checkpoints.
///
/// The DocStore index checkpoints are organized into block
/// for code-readability and compression purpose.
///
/// A block can be of any size.
pub struct CheckpointBlock {
pub checkpoints: Vec<Checkpoint>,
}
impl Default for CheckpointBlock {
fn default() -> CheckpointBlock {
CheckpointBlock {
checkpoints: Vec::with_capacity(2 * CHECKPOINT_PERIOD),
}
}
}
impl CheckpointBlock {
/// If non-empty returns [start_doc, end_doc)
/// for the overall block.
pub fn doc_interval(&self) -> Option<(DocId, DocId)> {
let start_doc_opt = self
.checkpoints
.first()
.cloned()
.map(|checkpoint| checkpoint.start_doc);
let end_doc_opt = self
.checkpoints
.last()
.cloned()
.map(|checkpoint| checkpoint.end_doc);
match (start_doc_opt, end_doc_opt) {
(Some(start_doc), Some(end_doc)) => Some((start_doc, end_doc)),
_ => None,
}
}
/// Adding another checkpoint in the block.
pub fn push(&mut self, checkpoint: Checkpoint) {
self.checkpoints.push(checkpoint);
}
/// Returns the number of checkpoints in the block.
pub fn len(&self) -> usize {
self.checkpoints.len()
}
pub fn get(&self, idx: usize) -> Checkpoint {
self.checkpoints[idx]
}
pub fn clear(&mut self) {
self.checkpoints.clear();
}
pub fn serialize(&mut self, buffer: &mut Vec<u8>) {
VInt(self.checkpoints.len() as u64).serialize_into_vec(buffer);
if self.checkpoints.is_empty() {
return;
}
VInt(self.checkpoints[0].start_doc as u64).serialize_into_vec(buffer);
VInt(self.checkpoints[0].start_offset as u64).serialize_into_vec(buffer);
for checkpoint in &self.checkpoints {
let delta_doc = checkpoint.end_doc - checkpoint.start_doc;
VInt(delta_doc as u64).serialize_into_vec(buffer);
VInt(checkpoint.end_offset - checkpoint.start_offset).serialize_into_vec(buffer);
}
}
pub fn deserialize(&mut self, data: &mut &[u8]) -> io::Result<()> {
if data.is_empty() {
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, ""));
}
self.checkpoints.clear();
let len = VInt::deserialize_u64(data)? as usize;
if len == 0 {
return Ok(());
}
let mut doc = VInt::deserialize_u64(data)? as DocId;
let mut start_offset = VInt::deserialize_u64(data)?;
for _ in 0..len {
let num_docs = VInt::deserialize_u64(data)? as DocId;
let block_num_bytes = VInt::deserialize_u64(data)?;
self.checkpoints.push(Checkpoint {
start_doc: doc,
end_doc: doc + num_docs,
start_offset,
end_offset: start_offset + block_num_bytes,
});
doc += num_docs;
start_offset += block_num_bytes;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use crate::store::index::block::CheckpointBlock;
use crate::store::index::Checkpoint;
use crate::DocId;
use std::io;
fn test_aux_ser_deser(checkpoints: &[Checkpoint]) -> io::Result<()> {
let mut block = CheckpointBlock::default();
for &checkpoint in checkpoints {
block.push(checkpoint);
}
let mut buffer = Vec::new();
block.serialize(&mut buffer);
let mut block_deser = CheckpointBlock::default();
let checkpoint = Checkpoint {
start_doc: 0,
end_doc: 1,
start_offset: 2,
end_offset: 3,
};
block_deser.push(checkpoint); // < check that value is erased before deser
let mut data = &buffer[..];
block_deser.deserialize(&mut data)?;
assert!(data.is_empty());
assert_eq!(checkpoints, &block_deser.checkpoints[..]);
Ok(())
}
#[test]
fn test_block_serialize_empty() -> io::Result<()> {
test_aux_ser_deser(&[])
}
#[test]
fn test_block_serialize_simple() -> io::Result<()> {
let checkpoints = vec![Checkpoint {
start_doc: 10,
end_doc: 12,
start_offset: 100,
end_offset: 120,
}];
test_aux_ser_deser(&checkpoints)
}
#[test]
fn test_block_serialize() -> io::Result<()> {
let offsets: Vec<u64> = (0..11).map(|i| i * i * i).collect();
let mut checkpoints = vec![];
let mut start_doc = 0;
for i in 0..10 {
let end_doc = (i * i) as DocId;
checkpoints.push(Checkpoint {
start_doc,
end_doc,
start_offset: offsets[i],
end_offset: offsets[i + 1],
});
start_doc = end_doc;
}
test_aux_ser_deser(&checkpoints)
}
}

View File

@@ -1,230 +0,0 @@
const CHECKPOINT_PERIOD: usize = 8;
use std::fmt;
mod block;
mod skip_index;
mod skip_index_builder;
use crate::DocId;
pub use self::skip_index::SkipIndex;
pub use self::skip_index_builder::SkipIndexBuilder;
/// A checkpoint contains meta-information about
/// a block. Either a block of documents, or another block
/// of checkpoints.
///
/// All of the intervals here defined are semi-open.
/// The checkpoint describes that the block within the bytes
/// `[start_offset..end_offset)` spans over the docs
/// `[start_doc..end_doc)`.
#[derive(Clone, Copy, Eq, PartialEq)]
pub struct Checkpoint {
pub start_doc: DocId,
pub end_doc: DocId,
pub start_offset: u64,
pub end_offset: u64,
}
impl fmt::Debug for Checkpoint {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"(doc=[{}..{}), bytes=[{}..{}))",
self.start_doc, self.end_doc, self.start_offset, self.end_offset
)
}
}
#[cfg(test)]
mod tests {
use std::io;
use proptest::strategy::{BoxedStrategy, Strategy};
use crate::directory::OwnedBytes;
use crate::store::index::Checkpoint;
use crate::DocId;
use super::{SkipIndex, SkipIndexBuilder};
#[test]
fn test_skip_index_empty() -> io::Result<()> {
let mut output: Vec<u8> = Vec::new();
let skip_index_builder: SkipIndexBuilder = SkipIndexBuilder::new();
skip_index_builder.write(&mut output)?;
let skip_index: SkipIndex = SkipIndex::from(OwnedBytes::new(output));
let mut skip_cursor = skip_index.checkpoints();
assert!(skip_cursor.next().is_none());
Ok(())
}
#[test]
fn test_skip_index_single_el() -> io::Result<()> {
let mut output: Vec<u8> = Vec::new();
let mut skip_index_builder: SkipIndexBuilder = SkipIndexBuilder::new();
let checkpoint = Checkpoint {
start_doc: 0,
end_doc: 2,
start_offset: 0,
end_offset: 3,
};
skip_index_builder.insert(checkpoint);
skip_index_builder.write(&mut output)?;
let skip_index: SkipIndex = SkipIndex::from(OwnedBytes::new(output));
let mut skip_cursor = skip_index.checkpoints();
assert_eq!(skip_cursor.next(), Some(checkpoint));
assert_eq!(skip_cursor.next(), None);
Ok(())
}
#[test]
fn test_skip_index() -> io::Result<()> {
let mut output: Vec<u8> = Vec::new();
let checkpoints = vec![
Checkpoint {
start_doc: 0,
end_doc: 3,
start_offset: 4,
end_offset: 9,
},
Checkpoint {
start_doc: 3,
end_doc: 4,
start_offset: 9,
end_offset: 25,
},
Checkpoint {
start_doc: 4,
end_doc: 6,
start_offset: 25,
end_offset: 49,
},
Checkpoint {
start_doc: 6,
end_doc: 8,
start_offset: 49,
end_offset: 81,
},
Checkpoint {
start_doc: 8,
end_doc: 10,
start_offset: 81,
end_offset: 100,
},
];
let mut skip_index_builder: SkipIndexBuilder = SkipIndexBuilder::new();
for &checkpoint in &checkpoints {
skip_index_builder.insert(checkpoint);
}
skip_index_builder.write(&mut output)?;
let skip_index: SkipIndex = SkipIndex::from(OwnedBytes::new(output));
assert_eq!(
&skip_index.checkpoints().collect::<Vec<_>>()[..],
&checkpoints[..]
);
Ok(())
}
fn offset_test(doc: DocId) -> u64 {
(doc as u64) * (doc as u64)
}
#[test]
fn test_skip_index_long() -> io::Result<()> {
let mut output: Vec<u8> = Vec::new();
let checkpoints: Vec<Checkpoint> = (0..1000)
.map(|i| Checkpoint {
start_doc: i,
end_doc: i + 1,
start_offset: offset_test(i),
end_offset: offset_test(i + 1),
})
.collect();
let mut skip_index_builder = SkipIndexBuilder::new();
for checkpoint in &checkpoints {
skip_index_builder.insert(*checkpoint);
}
skip_index_builder.write(&mut output)?;
assert_eq!(output.len(), 4035);
let resulting_checkpoints: Vec<Checkpoint> = SkipIndex::from(OwnedBytes::new(output))
.checkpoints()
.collect();
assert_eq!(&resulting_checkpoints, &checkpoints);
Ok(())
}
fn integrate_delta(mut vals: Vec<u64>) -> Vec<u64> {
let mut prev = 0u64;
for val in vals.iter_mut() {
let new_val = *val + prev;
prev = new_val;
*val = new_val;
}
vals
}
// Generates a sequence of n valid checkpoints, with n < max_len.
fn monotonic_checkpoints(max_len: usize) -> BoxedStrategy<Vec<Checkpoint>> {
(1..max_len)
.prop_flat_map(move |len: usize| {
(
proptest::collection::vec(1u64..20u64, len as usize).prop_map(integrate_delta),
proptest::collection::vec(1u64..26u64, len as usize).prop_map(integrate_delta),
)
.prop_map(|(docs, offsets)| {
(0..docs.len() - 1)
.map(move |i| Checkpoint {
start_doc: docs[i] as DocId,
end_doc: docs[i + 1] as DocId,
start_offset: offsets[i],
end_offset: offsets[i + 1],
})
.collect::<Vec<Checkpoint>>()
})
})
.boxed()
}
fn seek_manual<I: Iterator<Item = Checkpoint>>(
checkpoints: I,
target: DocId,
) -> Option<Checkpoint> {
checkpoints
.into_iter()
.filter(|checkpoint| checkpoint.end_doc > target)
.next()
}
fn test_skip_index_aux(skip_index: SkipIndex, checkpoints: &[Checkpoint]) {
if let Some(last_checkpoint) = checkpoints.last() {
for doc in 0u32..last_checkpoint.end_doc {
let expected = seek_manual(skip_index.checkpoints(), doc);
assert_eq!(expected, skip_index.seek(doc), "Doc {}", doc);
}
assert!(skip_index.seek(last_checkpoint.end_doc).is_none());
}
}
use proptest::prelude::*;
proptest! {
#![proptest_config(ProptestConfig::with_cases(20))]
#[test]
fn test_proptest_skip(checkpoints in monotonic_checkpoints(100)) {
let mut skip_index_builder = SkipIndexBuilder::new();
for checkpoint in checkpoints.iter().cloned() {
skip_index_builder.insert(checkpoint);
}
let mut buffer = Vec::new();
skip_index_builder.write(&mut buffer).unwrap();
let skip_index = SkipIndex::from(OwnedBytes::new(buffer));
let iter_checkpoints: Vec<Checkpoint> = skip_index.checkpoints().collect();
assert_eq!(&checkpoints[..], &iter_checkpoints[..]);
test_skip_index_aux(skip_index, &checkpoints[..]);
}
}
}

View File

@@ -1,111 +0,0 @@
use crate::common::{BinarySerializable, VInt};
use crate::directory::OwnedBytes;
use crate::store::index::block::CheckpointBlock;
use crate::store::index::Checkpoint;
use crate::DocId;
pub struct LayerCursor<'a> {
remaining: &'a [u8],
block: CheckpointBlock,
cursor: usize,
}
impl<'a> Iterator for LayerCursor<'a> {
type Item = Checkpoint;
fn next(&mut self) -> Option<Checkpoint> {
if self.cursor == self.block.len() {
if self.remaining.is_empty() {
return None;
}
let (block_mut, remaining_mut) = (&mut self.block, &mut self.remaining);
if block_mut.deserialize(remaining_mut).is_err() {
return None;
}
self.cursor = 0;
}
let res = Some(self.block.get(self.cursor));
self.cursor += 1;
res
}
}
struct Layer {
data: OwnedBytes,
}
impl Layer {
fn cursor<'a>(&'a self) -> impl Iterator<Item = Checkpoint> + 'a {
self.cursor_at_offset(0u64)
}
fn cursor_at_offset<'a>(&'a self, start_offset: u64) -> impl Iterator<Item = Checkpoint> + 'a {
let data = &self.data.as_slice();
LayerCursor {
remaining: &data[start_offset as usize..],
block: CheckpointBlock::default(),
cursor: 0,
}
}
fn seek_start_at_offset(&self, target: DocId, offset: u64) -> Option<Checkpoint> {
self.cursor_at_offset(offset)
.find(|checkpoint| checkpoint.end_doc > target)
}
}
pub struct SkipIndex {
layers: Vec<Layer>,
}
impl SkipIndex {
pub(crate) fn checkpoints<'a>(&'a self) -> impl Iterator<Item = Checkpoint> + 'a {
self.layers
.last()
.into_iter()
.flat_map(|layer| layer.cursor())
}
pub fn seek(&self, target: DocId) -> Option<Checkpoint> {
let first_layer_len = self
.layers
.first()
.map(|layer| layer.data.len() as u64)
.unwrap_or(0u64);
let mut cur_checkpoint = Checkpoint {
start_doc: 0u32,
end_doc: 1u32,
start_offset: 0u64,
end_offset: first_layer_len,
};
for layer in &self.layers {
if let Some(checkpoint) =
layer.seek_start_at_offset(target, cur_checkpoint.start_offset)
{
cur_checkpoint = checkpoint;
} else {
return None;
}
}
Some(cur_checkpoint)
}
}
impl From<OwnedBytes> for SkipIndex {
fn from(mut data: OwnedBytes) -> SkipIndex {
let offsets: Vec<u64> = Vec::<VInt>::deserialize(&mut data)
.unwrap()
.into_iter()
.map(|el| el.0)
.collect();
let mut start_offset = 0;
let mut layers = Vec::new();
for end_offset in offsets {
layers.push(Layer {
data: data.slice(start_offset as usize, end_offset as usize),
});
start_offset = end_offset;
}
SkipIndex { layers }
}
}

View File

@@ -1,115 +0,0 @@
use crate::common::{BinarySerializable, VInt};
use crate::store::index::block::CheckpointBlock;
use crate::store::index::{Checkpoint, CHECKPOINT_PERIOD};
use std::io;
use std::io::Write;
// Each skip contains iterator over pairs (last doc in block, offset to start of block).
struct LayerBuilder {
buffer: Vec<u8>,
pub block: CheckpointBlock,
}
impl LayerBuilder {
fn finish(self) -> Vec<u8> {
self.buffer
}
fn new() -> LayerBuilder {
LayerBuilder {
buffer: Vec::new(),
block: CheckpointBlock::default(),
}
}
/// Serializes the block, and return a checkpoint representing
/// the entire block.
///
/// If the block was empty to begin with, simply return None.
fn flush_block(&mut self) -> Option<Checkpoint> {
self.block.doc_interval().map(|(start_doc, end_doc)| {
let start_offset = self.buffer.len() as u64;
self.block.serialize(&mut self.buffer);
let end_offset = self.buffer.len() as u64;
self.block.clear();
Checkpoint {
start_doc,
end_doc,
start_offset,
end_offset,
}
})
}
fn push(&mut self, checkpoint: Checkpoint) {
self.block.push(checkpoint);
}
fn insert(&mut self, checkpoint: Checkpoint) -> Option<Checkpoint> {
self.push(checkpoint);
let emit_skip_info = (self.block.len() % CHECKPOINT_PERIOD) == 0;
if emit_skip_info {
self.flush_block()
} else {
None
}
}
}
pub struct SkipIndexBuilder {
layers: Vec<LayerBuilder>,
}
impl SkipIndexBuilder {
pub fn new() -> SkipIndexBuilder {
SkipIndexBuilder { layers: Vec::new() }
}
fn get_layer(&mut self, layer_id: usize) -> &mut LayerBuilder {
if layer_id == self.layers.len() {
let layer_builder = LayerBuilder::new();
self.layers.push(layer_builder);
}
&mut self.layers[layer_id]
}
pub fn insert(&mut self, checkpoint: Checkpoint) {
let mut skip_pointer = Some(checkpoint);
for layer_id in 0.. {
if let Some(checkpoint) = skip_pointer {
skip_pointer = self.get_layer(layer_id).insert(checkpoint);
} else {
break;
}
}
}
pub fn write<W: Write>(mut self, output: &mut W) -> io::Result<()> {
let mut last_pointer = None;
for skip_layer in self.layers.iter_mut() {
if let Some(checkpoint) = last_pointer {
skip_layer.push(checkpoint);
}
last_pointer = skip_layer.flush_block();
}
let layer_buffers: Vec<Vec<u8>> = self
.layers
.into_iter()
.rev()
.map(|layer| layer.finish())
.collect();
let mut layer_offset = 0;
let mut layer_sizes = Vec::new();
for layer_buffer in &layer_buffers {
layer_offset += layer_buffer.len() as u64;
layer_sizes.push(VInt(layer_offset));
}
layer_sizes.serialize(output)?;
for layer_buffer in layer_buffers {
output.write_all(&layer_buffer[..])?;
}
Ok(())
}
}

View File

@@ -33,8 +33,8 @@ and should rely on either
!*/
mod index;
mod reader;
mod skiplist;
mod writer;
pub use self::reader::StoreReader;
pub use self::writer::StoreWriter;

View File

@@ -1,91 +1,69 @@
use super::decompress;
use super::index::SkipIndex;
use super::skiplist::SkipList;
use crate::common::VInt;
use crate::common::{BinarySerializable, HasLen};
use crate::directory::{FileSlice, OwnedBytes};
use crate::schema::Document;
use crate::space_usage::StoreSpaceUsage;
use crate::store::index::Checkpoint;
use crate::DocId;
use lru::LruCache;
use std::cell::RefCell;
use std::io;
use std::mem::size_of;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
const LRU_CACHE_CAPACITY: usize = 100;
type Block = Arc<Vec<u8>>;
type BlockCache = Arc<Mutex<LruCache<u64, Block>>>;
/// Reads document off tantivy's [`Store`](./index.html)
#[derive(Clone)]
pub struct StoreReader {
data: FileSlice,
cache: BlockCache,
cache_hits: Arc<AtomicUsize>,
cache_misses: Arc<AtomicUsize>,
skip_index: Arc<SkipIndex>,
space_usage: StoreSpaceUsage,
offset_index_file: OwnedBytes,
current_block_offset: RefCell<usize>,
current_block: RefCell<Vec<u8>>,
max_doc: DocId,
}
impl StoreReader {
/// Opens a store reader
// TODO rename open
pub fn open(store_file: FileSlice) -> io::Result<StoreReader> {
let (data_file, offset_index_file) = split_file(store_file)?;
let index_data = offset_index_file.read_bytes()?;
let space_usage = StoreSpaceUsage::new(data_file.len(), offset_index_file.len());
let skip_index = SkipIndex::from(index_data);
let (data_file, offset_index_file, max_doc) = split_file(store_file)?;
Ok(StoreReader {
data: data_file,
cache: Arc::new(Mutex::new(LruCache::new(LRU_CACHE_CAPACITY))),
cache_hits: Default::default(),
cache_misses: Default::default(),
skip_index: Arc::new(skip_index),
space_usage,
offset_index_file: offset_index_file.read_bytes()?,
current_block_offset: RefCell::new(usize::max_value()),
current_block: RefCell::new(Vec::new()),
max_doc,
})
}
pub(crate) fn block_checkpoints<'a>(&'a self) -> impl Iterator<Item = Checkpoint> + 'a {
self.skip_index.checkpoints()
pub(crate) fn block_index(&self) -> SkipList<'_, u64> {
SkipList::from(self.offset_index_file.as_slice())
}
fn block_checkpoint(&self, doc_id: DocId) -> Option<Checkpoint> {
self.skip_index.seek(doc_id)
fn block_offset(&self, doc_id: DocId) -> (DocId, u64) {
self.block_index()
.seek(u64::from(doc_id) + 1)
.map(|(doc, offset)| (doc as DocId, offset))
.unwrap_or((0u32, 0u64))
}
pub(crate) fn block_data(&self) -> io::Result<OwnedBytes> {
self.data.read_bytes()
}
fn compressed_block(&self, checkpoint: &Checkpoint) -> io::Result<OwnedBytes> {
self.data
.slice(
checkpoint.start_offset as usize,
checkpoint.end_offset as usize,
)
.read_bytes()
fn compressed_block(&self, addr: usize) -> io::Result<OwnedBytes> {
let (block_len_bytes, block_body) = self.data.slice_from(addr).split(4);
let block_len = u32::deserialize(&mut block_len_bytes.read_bytes()?)?;
block_body.slice_to(block_len as usize).read_bytes()
}
fn read_block(&self, checkpoint: &Checkpoint) -> io::Result<Block> {
if let Some(block) = self.cache.lock().unwrap().get(&checkpoint.start_offset) {
self.cache_hits.fetch_add(1, Ordering::SeqCst);
return Ok(block.clone());
fn read_block(&self, block_offset: usize) -> io::Result<()> {
if block_offset != *self.current_block_offset.borrow() {
let mut current_block_mut = self.current_block.borrow_mut();
current_block_mut.clear();
let compressed_block = self.compressed_block(block_offset)?;
decompress(compressed_block.as_slice(), &mut current_block_mut)?;
*self.current_block_offset.borrow_mut() = block_offset;
}
self.cache_misses.fetch_add(1, Ordering::SeqCst);
let compressed_block = self.compressed_block(checkpoint)?;
let mut decompressed_block = vec![];
decompress(compressed_block.as_slice(), &mut decompressed_block)?;
let block = Arc::new(decompressed_block);
self.cache
.lock()
.unwrap()
.put(checkpoint.start_offset, block.clone());
Ok(block)
Ok(())
}
/// Reads a given document.
@@ -96,15 +74,14 @@ impl StoreReader {
/// It should not be called to score documents
/// for instance.
pub fn get(&self, doc_id: DocId) -> crate::Result<Document> {
let checkpoint = self.block_checkpoint(doc_id).ok_or_else(|| {
crate::TantivyError::InvalidArgument(format!("Failed to lookup Doc #{}.", doc_id))
})?;
let mut cursor = &self.read_block(&checkpoint)?[..];
for _ in checkpoint.start_doc..doc_id {
let (first_doc_id, block_offset) = self.block_offset(doc_id);
self.read_block(block_offset as usize)?;
let current_block_mut = self.current_block.borrow_mut();
let mut cursor = &current_block_mut[..];
for _ in first_doc_id..doc_id {
let doc_length = VInt::deserialize(&mut cursor)?.val() as usize;
cursor = &cursor[doc_length..];
}
let doc_length = VInt::deserialize(&mut cursor)?.val() as usize;
cursor = &cursor[..doc_length];
Ok(Document::deserialize(&mut cursor)?)
@@ -112,93 +89,21 @@ impl StoreReader {
/// Summarize total space usage of this store reader.
pub fn space_usage(&self) -> StoreSpaceUsage {
self.space_usage.clone()
StoreSpaceUsage::new(self.data.len(), self.offset_index_file.len())
}
}
fn split_file(data: FileSlice) -> io::Result<(FileSlice, FileSlice)> {
let (data, footer_len_bytes) = data.split_from_end(size_of::<u64>());
let serialized_offset: OwnedBytes = footer_len_bytes.read_bytes()?;
fn split_file(data: FileSlice) -> io::Result<(FileSlice, FileSlice, DocId)> {
let data_len = data.len();
let footer_offset = data_len - size_of::<u64>() - size_of::<u32>();
let serialized_offset: OwnedBytes = data.slice(footer_offset, data_len).read_bytes()?;
let mut serialized_offset_buf = serialized_offset.as_slice();
let offset = u64::deserialize(&mut serialized_offset_buf)? as usize;
Ok(data.split(offset))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::schema::Document;
use crate::schema::Field;
use crate::{directory::RAMDirectory, store::tests::write_lorem_ipsum_store, Directory};
use std::path::Path;
fn get_text_field<'a>(doc: &'a Document, field: &'a Field) -> Option<&'a str> {
doc.get_first(*field).and_then(|f| f.text())
}
#[test]
fn test_store_lru_cache() -> crate::Result<()> {
let directory = RAMDirectory::create();
let path = Path::new("store");
let writer = directory.open_write(path)?;
let schema = write_lorem_ipsum_store(writer, 500);
let title = schema.get_field("title").unwrap();
let store_file = directory.open_read(path)?;
let store = StoreReader::open(store_file)?;
assert_eq!(store.cache.lock().unwrap().len(), 0);
assert_eq!(store.cache_hits.load(Ordering::SeqCst), 0);
assert_eq!(store.cache_misses.load(Ordering::SeqCst), 0);
let doc = store.get(0)?;
assert_eq!(get_text_field(&doc, &title), Some("Doc 0"));
assert_eq!(store.cache.lock().unwrap().len(), 1);
assert_eq!(store.cache_hits.load(Ordering::SeqCst), 0);
assert_eq!(store.cache_misses.load(Ordering::SeqCst), 1);
assert_eq!(
store
.cache
.lock()
.unwrap()
.peek_lru()
.map(|(&k, _)| k as usize),
Some(0)
);
let doc = store.get(499)?;
assert_eq!(get_text_field(&doc, &title), Some("Doc 499"));
assert_eq!(store.cache.lock().unwrap().len(), 2);
assert_eq!(store.cache_hits.load(Ordering::SeqCst), 0);
assert_eq!(store.cache_misses.load(Ordering::SeqCst), 2);
assert_eq!(
store
.cache
.lock()
.unwrap()
.peek_lru()
.map(|(&k, _)| k as usize),
Some(0)
);
let doc = store.get(0)?;
assert_eq!(get_text_field(&doc, &title), Some("Doc 0"));
assert_eq!(store.cache.lock().unwrap().len(), 2);
assert_eq!(store.cache_hits.load(Ordering::SeqCst), 1);
assert_eq!(store.cache_misses.load(Ordering::SeqCst), 2);
assert_eq!(
store
.cache
.lock()
.unwrap()
.peek_lru()
.map(|(&k, _)| k as usize),
Some(18806)
);
Ok(())
}
let offset = u64::deserialize(&mut serialized_offset_buf)?;
let offset = offset as usize;
let max_doc = u32::deserialize(&mut serialized_offset_buf)?;
Ok((
data.slice(0, offset),
data.slice(offset, footer_offset),
max_doc,
))
}

168
src/store/skiplist/mod.rs Normal file
View File

@@ -0,0 +1,168 @@
#![allow(dead_code)]
mod skiplist;
mod skiplist_builder;
pub use self::skiplist::SkipList;
pub use self::skiplist_builder::SkipListBuilder;
#[cfg(test)]
mod tests {
use super::{SkipList, SkipListBuilder};
#[test]
fn test_skiplist() {
let mut output: Vec<u8> = Vec::new();
let mut skip_list_builder: SkipListBuilder<u32> = SkipListBuilder::new(8);
skip_list_builder.insert(2, &3).unwrap();
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
let mut skip_list: SkipList<'_, u32> = SkipList::from(output.as_slice());
assert_eq!(skip_list.next(), Some((2, 3)));
}
#[test]
fn test_skiplist2() {
let mut output: Vec<u8> = Vec::new();
let skip_list_builder: SkipListBuilder<u32> = SkipListBuilder::new(8);
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
let mut skip_list: SkipList<'_, u32> = SkipList::from(output.as_slice());
assert_eq!(skip_list.next(), None);
}
#[test]
fn test_skiplist3() {
let mut output: Vec<u8> = Vec::new();
let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(2);
skip_list_builder.insert(2, &()).unwrap();
skip_list_builder.insert(3, &()).unwrap();
skip_list_builder.insert(5, &()).unwrap();
skip_list_builder.insert(7, &()).unwrap();
skip_list_builder.insert(9, &()).unwrap();
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
let mut skip_list: SkipList<'_, ()> = SkipList::from(output.as_slice());
assert_eq!(skip_list.next().unwrap(), (2, ()));
assert_eq!(skip_list.next().unwrap(), (3, ()));
assert_eq!(skip_list.next().unwrap(), (5, ()));
assert_eq!(skip_list.next().unwrap(), (7, ()));
assert_eq!(skip_list.next().unwrap(), (9, ()));
assert_eq!(skip_list.next(), None);
}
#[test]
fn test_skiplist4() {
let mut output: Vec<u8> = Vec::new();
let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(2);
skip_list_builder.insert(2, &()).unwrap();
skip_list_builder.insert(3, &()).unwrap();
skip_list_builder.insert(5, &()).unwrap();
skip_list_builder.insert(7, &()).unwrap();
skip_list_builder.insert(9, &()).unwrap();
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
let mut skip_list: SkipList<'_, ()> = SkipList::from(output.as_slice());
assert_eq!(skip_list.next().unwrap(), (2, ()));
skip_list.seek(5);
assert_eq!(skip_list.next().unwrap(), (5, ()));
assert_eq!(skip_list.next().unwrap(), (7, ()));
assert_eq!(skip_list.next().unwrap(), (9, ()));
assert_eq!(skip_list.next(), None);
}
#[test]
fn test_skiplist5() {
let mut output: Vec<u8> = Vec::new();
let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(4);
skip_list_builder.insert(2, &()).unwrap();
skip_list_builder.insert(3, &()).unwrap();
skip_list_builder.insert(5, &()).unwrap();
skip_list_builder.insert(6, &()).unwrap();
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
let mut skip_list: SkipList<'_, ()> = SkipList::from(output.as_slice());
assert_eq!(skip_list.next().unwrap(), (2, ()));
skip_list.seek(6);
assert_eq!(skip_list.next().unwrap(), (6, ()));
assert_eq!(skip_list.next(), None);
}
#[test]
fn test_skiplist6() {
let mut output: Vec<u8> = Vec::new();
let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(2);
skip_list_builder.insert(2, &()).unwrap();
skip_list_builder.insert(3, &()).unwrap();
skip_list_builder.insert(5, &()).unwrap();
skip_list_builder.insert(7, &()).unwrap();
skip_list_builder.insert(9, &()).unwrap();
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
let mut skip_list: SkipList<'_, ()> = SkipList::from(output.as_slice());
assert_eq!(skip_list.next().unwrap(), (2, ()));
skip_list.seek(10);
assert_eq!(skip_list.next(), None);
}
#[test]
fn test_skiplist7() {
let mut output: Vec<u8> = Vec::new();
let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(4);
for i in 0..1000 {
skip_list_builder.insert(i, &()).unwrap();
}
skip_list_builder.insert(1004, &()).unwrap();
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
let mut skip_list: SkipList<'_, ()> = SkipList::from(output.as_slice());
assert_eq!(skip_list.next().unwrap(), (0, ()));
skip_list.seek(431);
assert_eq!(skip_list.next().unwrap(), (431, ()));
skip_list.seek(1003);
assert_eq!(skip_list.next().unwrap(), (1004, ()));
assert_eq!(skip_list.next(), None);
}
#[test]
fn test_skiplist8() {
let mut output: Vec<u8> = Vec::new();
let mut skip_list_builder: SkipListBuilder<u64> = SkipListBuilder::new(8);
skip_list_builder.insert(2, &3).unwrap();
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
assert_eq!(output.len(), 11);
assert_eq!(output[0], 1u8 + 128u8);
}
#[test]
fn test_skiplist9() {
let mut output: Vec<u8> = Vec::new();
let mut skip_list_builder: SkipListBuilder<u64> = SkipListBuilder::new(4);
for i in 0..4 * 4 * 4 {
skip_list_builder.insert(i, &i).unwrap();
}
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
assert_eq!(output.len(), 774);
assert_eq!(output[0], 4u8 + 128u8);
}
#[test]
fn test_skiplist10() {
// checking that void gets serialized to nothing.
let mut output: Vec<u8> = Vec::new();
let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(4);
for i in 0..((4 * 4 * 4) - 1) {
skip_list_builder.insert(i, &()).unwrap();
}
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
assert_eq!(output.len(), 230);
assert_eq!(output[0], 128u8 + 3u8);
}
#[test]
fn test_skiplist11() {
// checking that void gets serialized to nothing.
let mut output: Vec<u8> = Vec::new();
let mut skip_list_builder: SkipListBuilder<()> = SkipListBuilder::new(4);
for i in 0..(4 * 4) {
skip_list_builder.insert(i, &()).unwrap();
}
skip_list_builder.write::<Vec<u8>>(&mut output).unwrap();
assert_eq!(output.len(), 65);
assert_eq!(output[0], 128u8 + 3u8);
}
}

View File

@@ -0,0 +1,133 @@
use crate::common::{BinarySerializable, VInt};
use std::cmp::max;
use std::marker::PhantomData;
static EMPTY: [u8; 0] = [];
struct Layer<'a, T> {
data: &'a [u8],
cursor: &'a [u8],
next_id: Option<u64>,
_phantom_: PhantomData<T>,
}
impl<'a, T: BinarySerializable> Iterator for Layer<'a, T> {
type Item = (u64, T);
fn next(&mut self) -> Option<(u64, T)> {
if let Some(cur_id) = self.next_id {
let cur_val = T::deserialize(&mut self.cursor).unwrap();
self.next_id = VInt::deserialize_u64(&mut self.cursor).ok();
Some((cur_id, cur_val))
} else {
None
}
}
}
impl<'a, T: BinarySerializable> From<&'a [u8]> for Layer<'a, T> {
fn from(data: &'a [u8]) -> Layer<'a, T> {
let mut cursor = data;
let next_id = VInt::deserialize_u64(&mut cursor).ok();
Layer {
data,
cursor,
next_id,
_phantom_: PhantomData,
}
}
}
impl<'a, T: BinarySerializable> Layer<'a, T> {
fn empty() -> Layer<'a, T> {
Layer {
data: &EMPTY,
cursor: &EMPTY,
next_id: None,
_phantom_: PhantomData,
}
}
fn seek_offset(&mut self, offset: usize) {
self.cursor = &self.data[offset..];
self.next_id = VInt::deserialize_u64(&mut self.cursor).ok();
}
// Returns the last element (key, val)
// such that (key < doc_id)
//
// If there is no such element anymore,
// returns None.
//
// If the element exists, it will be returned
// at the next call to `.next()`.
fn seek(&mut self, key: u64) -> Option<(u64, T)> {
let mut result: Option<(u64, T)> = None;
loop {
if let Some(next_id) = self.next_id {
if next_id < key {
if let Some(v) = self.next() {
result = Some(v);
continue;
}
}
}
return result;
}
}
}
pub struct SkipList<'a, T: BinarySerializable> {
data_layer: Layer<'a, T>,
skip_layers: Vec<Layer<'a, u64>>,
}
impl<'a, T: BinarySerializable> Iterator for SkipList<'a, T> {
type Item = (u64, T);
fn next(&mut self) -> Option<(u64, T)> {
self.data_layer.next()
}
}
impl<'a, T: BinarySerializable> SkipList<'a, T> {
pub fn seek(&mut self, key: u64) -> Option<(u64, T)> {
let mut next_layer_skip: Option<(u64, u64)> = None;
for skip_layer in &mut self.skip_layers {
if let Some((_, offset)) = next_layer_skip {
skip_layer.seek_offset(offset as usize);
}
next_layer_skip = skip_layer.seek(key);
}
if let Some((_, offset)) = next_layer_skip {
self.data_layer.seek_offset(offset as usize);
}
self.data_layer.seek(key)
}
}
impl<'a, T: BinarySerializable> From<&'a [u8]> for SkipList<'a, T> {
fn from(mut data: &'a [u8]) -> SkipList<'a, T> {
let offsets: Vec<u64> = Vec::<VInt>::deserialize(&mut data)
.unwrap()
.into_iter()
.map(|el| el.0)
.collect();
let num_layers = offsets.len();
let layers_data: &[u8] = data;
let data_layer: Layer<'a, T> = if num_layers == 0 {
Layer::empty()
} else {
let first_layer_data: &[u8] = &layers_data[..offsets[0] as usize];
Layer::from(first_layer_data)
};
let skip_layers = (0..max(1, num_layers) - 1)
.map(|i| (offsets[i] as usize, offsets[i + 1] as usize))
.map(|(start, stop)| Layer::from(&layers_data[start..stop]))
.collect();
SkipList {
skip_layers,
data_layer,
}
}
}

View File

@@ -0,0 +1,98 @@
use crate::common::{is_power_of_2, BinarySerializable, VInt};
use std::io;
use std::io::Write;
use std::marker::PhantomData;
struct LayerBuilder<T: BinarySerializable> {
period_mask: usize,
buffer: Vec<u8>,
len: usize,
_phantom_: PhantomData<T>,
}
impl<T: BinarySerializable> LayerBuilder<T> {
fn written_size(&self) -> usize {
self.buffer.len()
}
fn write(&self, output: &mut dyn Write) -> Result<(), io::Error> {
output.write_all(&self.buffer)?;
Ok(())
}
fn with_period(period: usize) -> LayerBuilder<T> {
assert!(is_power_of_2(period), "The period has to be a power of 2.");
LayerBuilder {
period_mask: (period - 1),
buffer: Vec::new(),
len: 0,
_phantom_: PhantomData,
}
}
fn insert(&mut self, key: u64, value: &T) -> io::Result<Option<(u64, u64)>> {
self.len += 1;
let offset = self.written_size() as u64;
VInt(key).serialize_into_vec(&mut self.buffer);
value.serialize(&mut self.buffer)?;
let emit_skip_info = (self.period_mask & self.len) == 0;
if emit_skip_info {
Ok(Some((key, offset)))
} else {
Ok(None)
}
}
}
pub struct SkipListBuilder<T: BinarySerializable> {
period: usize,
data_layer: LayerBuilder<T>,
skip_layers: Vec<LayerBuilder<u64>>,
}
impl<T: BinarySerializable> SkipListBuilder<T> {
pub fn new(period: usize) -> SkipListBuilder<T> {
SkipListBuilder {
period,
data_layer: LayerBuilder::with_period(period),
skip_layers: Vec::new(),
}
}
fn get_skip_layer(&mut self, layer_id: usize) -> &mut LayerBuilder<u64> {
if layer_id == self.skip_layers.len() {
let layer_builder = LayerBuilder::with_period(self.period);
self.skip_layers.push(layer_builder);
}
&mut self.skip_layers[layer_id]
}
pub fn insert(&mut self, key: u64, dest: &T) -> io::Result<()> {
let mut skip_pointer = self.data_layer.insert(key, dest)?;
for layer_id in 0.. {
if let Some((skip_doc_id, skip_offset)) = skip_pointer {
skip_pointer = self
.get_skip_layer(layer_id)
.insert(skip_doc_id, &skip_offset)?;
} else {
break;
}
}
Ok(())
}
pub fn write<W: Write>(self, output: &mut W) -> io::Result<()> {
let mut size: u64 = self.data_layer.buffer.len() as u64;
let mut layer_sizes = vec![VInt(size)];
for layer in self.skip_layers.iter().rev() {
size += layer.buffer.len() as u64;
layer_sizes.push(VInt(size));
}
layer_sizes.serialize(output)?;
self.data_layer.write(output)?;
for layer in self.skip_layers.iter().rev() {
layer.write(output)?;
}
Ok(())
}
}

View File

@@ -1,12 +1,11 @@
use super::compress;
use super::index::SkipIndexBuilder;
use super::skiplist::SkipListBuilder;
use super::StoreReader;
use crate::common::CountingWriter;
use crate::common::{BinarySerializable, VInt};
use crate::directory::TerminatingWrite;
use crate::directory::WritePtr;
use crate::schema::Document;
use crate::store::index::Checkpoint;
use crate::DocId;
use std::io::{self, Write};
@@ -22,8 +21,7 @@ const BLOCK_SIZE: usize = 16_384;
///
pub struct StoreWriter {
doc: DocId,
first_doc_in_block: DocId,
offset_index_writer: SkipIndexBuilder,
offset_index_writer: SkipListBuilder<u64>,
writer: CountingWriter<WritePtr>,
intermediary_buffer: Vec<u8>,
current_block: Vec<u8>,
@@ -37,8 +35,7 @@ impl StoreWriter {
pub fn new(writer: WritePtr) -> StoreWriter {
StoreWriter {
doc: 0,
first_doc_in_block: 0,
offset_index_writer: SkipIndexBuilder::new(),
offset_index_writer: SkipListBuilder::new(4),
writer: CountingWriter::wrap(writer),
intermediary_buffer: Vec::new(),
current_block: Vec::new(),
@@ -71,9 +68,11 @@ impl StoreWriter {
pub fn stack(&mut self, store_reader: &StoreReader) -> io::Result<()> {
if !self.current_block.is_empty() {
self.write_and_compress_block()?;
self.offset_index_writer
.insert(u64::from(self.doc), &(self.writer.written_bytes() as u64))?;
}
let doc_shift = self.doc;
let start_shift = self.writer.written_bytes() as u64;
let doc_offset = self.doc;
let start_offset = self.writer.written_bytes() as u64;
// just bulk write all of the block of the given reader.
self.writer
@@ -81,33 +80,22 @@ impl StoreWriter {
// concatenate the index of the `store_reader`, after translating
// its start doc id and its start file offset.
for mut checkpoint in store_reader.block_checkpoints() {
checkpoint.start_doc += doc_shift;
checkpoint.end_doc += doc_shift;
checkpoint.start_offset += start_shift;
checkpoint.end_offset += start_shift;
self.offset_index_writer.insert(checkpoint);
self.doc = checkpoint.end_doc;
for (next_doc_id, block_addr) in store_reader.block_index() {
self.doc = doc_offset + next_doc_id as u32;
self.offset_index_writer
.insert(u64::from(self.doc), &(start_offset + block_addr))?;
}
Ok(())
}
fn write_and_compress_block(&mut self) -> io::Result<()> {
assert!(self.doc > 0);
self.intermediary_buffer.clear();
compress(&self.current_block[..], &mut self.intermediary_buffer)?;
let start_offset = self.writer.written_bytes();
(self.intermediary_buffer.len() as u32).serialize(&mut self.writer)?;
self.writer.write_all(&self.intermediary_buffer)?;
let end_offset = self.writer.written_bytes();
let end_doc = self.doc;
self.offset_index_writer.insert(Checkpoint {
start_doc: self.first_doc_in_block,
end_doc,
start_offset,
end_offset,
});
self.offset_index_writer
.insert(u64::from(self.doc), &(self.writer.written_bytes() as u64))?;
self.current_block.clear();
self.first_doc_in_block = self.doc;
Ok(())
}
@@ -122,6 +110,7 @@ impl StoreWriter {
let header_offset: u64 = self.writer.written_bytes() as u64;
self.offset_index_writer.write(&mut self.writer)?;
header_offset.serialize(&mut self.writer)?;
self.doc.serialize(&mut self.writer)?;
self.writer.terminate()
}
}

View File

@@ -1,27 +0,0 @@
/*!
The term dictionary main role is to associate the sorted [`Term`s](../struct.Term.html) to
a [`TermInfo`](../postings/struct.TermInfo.html) struct that contains some meta-information
about the term.
Internally, the term dictionary relies on the `fst` crate to store
a sorted mapping that associate each term to its rank in the lexicographical order.
For instance, in a dictionary containing the sorted terms "abba", "bjork", "blur" and "donovan",
the `TermOrdinal` are respectively `0`, `1`, `2`, and `3`.
For `u64`-terms, tantivy explicitely uses a `BigEndian` representation to ensure that the
lexicographical order matches the natural order of integers.
`i64`-terms are transformed to `u64` using a continuous mapping `val ⟶ val - i64::min_value()`
and then treated as a `u64`.
`f64`-terms are transformed to `u64` using a mapping that preserve order, and are then treated
as `u64`.
A second datastructure makes it possible to access a [`TermInfo`](../postings/struct.TermInfo.html).
*/
mod streamer;
mod term_info_store;
mod termdict;
pub use self::streamer::{TermStreamer, TermStreamerBuilder};
pub use self::termdict::{TermDictionary, TermDictionaryBuilder};

View File

@@ -60,10 +60,12 @@ impl<'a> TermMerger<'a> {
pub(crate) fn matching_segments<'b: 'a>(
&'b self,
) -> impl 'b + Iterator<Item = (usize, TermOrdinal)> {
self.current_streamers
.iter()
.map(|heap_item| (heap_item.segment_ord, heap_item.streamer.term_ord()))
) -> Box<dyn 'b + Iterator<Item = (usize, TermOrdinal)>> {
Box::new(
self.current_streamers
.iter()
.map(|heap_item| (heap_item.segment_ord, heap_item.streamer.term_ord())),
)
}
fn advance_segments(&mut self) {

View File

@@ -20,39 +20,435 @@ as `u64`.
A second datastructure makes it possible to access a [`TermInfo`](../postings/struct.TermInfo.html).
*/
use tantivy_fst::automaton::AlwaysMatch;
// mod fst_termdict;
// use fst_termdict as termdict;
mod sstable_termdict;
use sstable_termdict as termdict;
mod merger;
#[cfg(test)]
mod tests;
/// Position of the term in the sorted list of terms.
pub type TermOrdinal = u64;
/// The term dictionary contains all of the terms in
/// `tantivy index` in a sorted manner.
pub type TermDictionary = self::termdict::TermDictionary;
mod merger;
mod streamer;
mod term_info_store;
mod termdict;
/// Builder for the new term dictionary.
///
/// Inserting must be done in the order of the `keys`.
pub type TermDictionaryBuilder<W> = self::termdict::TermDictionaryBuilder<W>;
pub use self::merger::TermMerger;
pub use self::streamer::{TermStreamer, TermStreamerBuilder};
pub use self::termdict::{TermDictionary, TermDictionaryBuilder};
/// Given a list of sorted term streams,
/// returns an iterator over sorted unique terms.
///
/// The item yield is actually a pair with
/// - the term
/// - a slice with the ordinal of the segments containing
/// the terms.
pub type TermMerger<'a> = self::merger::TermMerger<'a>;
#[cfg(test)]
mod tests {
use super::{TermDictionary, TermDictionaryBuilder, TermStreamer};
use crate::core::Index;
use crate::directory::{Directory, FileSlice, RAMDirectory};
use crate::postings::TermInfo;
use crate::schema::{Schema, TEXT};
use std::path::PathBuf;
use std::str;
/// `TermStreamer` acts as a cursor over a range of terms of a segment.
/// Terms are guaranteed to be sorted.
pub type TermStreamer<'a, A = AlwaysMatch> = self::termdict::TermStreamer<'a, A>;
const BLOCK_SIZE: usize = 1_500;
fn make_term_info(val: u64) -> TermInfo {
TermInfo {
doc_freq: val as u32,
positions_idx: val * 2u64,
postings_offset: val * 3u64,
}
}
#[test]
fn test_empty_term_dictionary() {
let empty = TermDictionary::empty();
assert!(empty.stream().next().is_none());
}
#[test]
fn test_term_ordinals() -> crate::Result<()> {
const COUNTRIES: [&'static str; 7] = [
"San Marino",
"Serbia",
"Slovakia",
"Slovenia",
"Spain",
"Sweden",
"Switzerland",
];
let directory = RAMDirectory::create();
let path = PathBuf::from("TermDictionary");
{
let write = directory.open_write(&path)?;
let mut term_dictionary_builder = TermDictionaryBuilder::create(write)?;
for term in COUNTRIES.iter() {
term_dictionary_builder.insert(term.as_bytes(), &make_term_info(0u64))?;
}
term_dictionary_builder.finish()?;
}
let term_file = directory.open_read(&path)?;
let term_dict: TermDictionary = TermDictionary::open(term_file)?;
for (term_ord, term) in COUNTRIES.iter().enumerate() {
assert_eq!(term_dict.term_ord(term).unwrap(), term_ord as u64);
let mut bytes = vec![];
assert!(term_dict.ord_to_term(term_ord as u64, &mut bytes));
assert_eq!(bytes, term.as_bytes());
}
Ok(())
}
#[test]
fn test_term_dictionary_simple() -> crate::Result<()> {
let directory = RAMDirectory::create();
let path = PathBuf::from("TermDictionary");
{
let write = directory.open_write(&path)?;
let mut term_dictionary_builder = TermDictionaryBuilder::create(write)?;
term_dictionary_builder.insert("abc".as_bytes(), &make_term_info(34u64))?;
term_dictionary_builder.insert("abcd".as_bytes(), &make_term_info(346u64))?;
term_dictionary_builder.finish()?;
}
let file = directory.open_read(&path)?;
let term_dict: TermDictionary = TermDictionary::open(file)?;
assert_eq!(term_dict.get("abc").unwrap().doc_freq, 34u32);
assert_eq!(term_dict.get("abcd").unwrap().doc_freq, 346u32);
let mut stream = term_dict.stream();
{
{
let (k, v) = stream.next().unwrap();
assert_eq!(k.as_ref(), "abc".as_bytes());
assert_eq!(v.doc_freq, 34u32);
}
assert_eq!(stream.key(), "abc".as_bytes());
assert_eq!(stream.value().doc_freq, 34u32);
}
{
{
let (k, v) = stream.next().unwrap();
assert_eq!(k, "abcd".as_bytes());
assert_eq!(v.doc_freq, 346u32);
}
assert_eq!(stream.key(), "abcd".as_bytes());
assert_eq!(stream.value().doc_freq, 346u32);
}
assert!(!stream.advance());
Ok(())
}
#[test]
fn test_term_iterator() -> crate::Result<()> {
let mut schema_builder = Schema::builder();
let text_field = schema_builder.add_text_field("text", TEXT);
let index = Index::create_in_ram(schema_builder.build());
{
let mut index_writer = index.writer_for_tests()?;
index_writer.add_document(doc!(text_field=>"a b d f"));
index_writer.commit()?;
index_writer.add_document(doc!(text_field=>"a b c d f"));
index_writer.commit()?;
index_writer.add_document(doc!(text_field => "e f"));
index_writer.commit()?;
}
let searcher = index.reader()?.searcher();
let field_searcher = searcher.field(text_field)?;
let mut term_it = field_searcher.terms();
let mut term_string = String::new();
while term_it.advance() {
//let term = Term::from_bytes(term_it.key());
term_string.push_str(str::from_utf8(term_it.key()).expect("test"));
}
assert_eq!(&*term_string, "abcdef");
Ok(())
}
#[test]
fn test_term_dictionary_stream() -> crate::Result<()> {
let ids: Vec<_> = (0u32..10_000u32)
.map(|i| (format!("doc{:0>6}", i), i))
.collect();
let buffer: Vec<u8> = {
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
for &(ref id, ref i) in &ids {
term_dictionary_builder
.insert(id.as_bytes(), &make_term_info(*i as u64))
.unwrap();
}
term_dictionary_builder.finish().unwrap()
};
let term_file = FileSlice::from(buffer);
let term_dictionary: TermDictionary = TermDictionary::open(term_file)?;
{
let mut streamer = term_dictionary.stream();
let mut i = 0;
while let Some((streamer_k, streamer_v)) = streamer.next() {
let &(ref key, ref v) = &ids[i];
assert_eq!(streamer_k.as_ref(), key.as_bytes());
assert_eq!(streamer_v, &make_term_info(*v as u64));
i += 1;
}
}
let &(ref key, ref val) = &ids[2047];
assert_eq!(
term_dictionary.get(key.as_bytes()),
Some(make_term_info(*val as u64))
);
Ok(())
}
#[test]
fn test_stream_high_range_prefix_suffix() -> crate::Result<()> {
let buffer: Vec<u8> = {
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
// term requires more than 16bits
term_dictionary_builder.insert("abcdefghijklmnopqrstuvwxy", &make_term_info(1))?;
term_dictionary_builder.insert("abcdefghijklmnopqrstuvwxyz", &make_term_info(2))?;
term_dictionary_builder.insert("abr", &make_term_info(2))?;
term_dictionary_builder.finish()?
};
let term_dict_file = FileSlice::from(buffer);
let term_dictionary: TermDictionary = TermDictionary::open(term_dict_file)?;
let mut kv_stream = term_dictionary.stream();
assert!(kv_stream.advance());
assert_eq!(kv_stream.key(), "abcdefghijklmnopqrstuvwxy".as_bytes());
assert_eq!(kv_stream.value(), &make_term_info(1));
assert!(kv_stream.advance());
assert_eq!(kv_stream.key(), "abcdefghijklmnopqrstuvwxyz".as_bytes());
assert_eq!(kv_stream.value(), &make_term_info(2));
assert!(kv_stream.advance());
assert_eq!(kv_stream.key(), "abr".as_bytes());
assert!(!kv_stream.advance());
Ok(())
}
#[test]
fn test_stream_range() -> crate::Result<()> {
let ids: Vec<_> = (0u32..10_000u32)
.map(|i| (format!("doc{:0>6}", i), i))
.collect();
let buffer: Vec<u8> = {
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
for &(ref id, ref i) in &ids {
term_dictionary_builder
.insert(id.as_bytes(), &make_term_info(*i as u64))
.unwrap();
}
term_dictionary_builder.finish().unwrap()
};
let file = FileSlice::from(buffer);
let term_dictionary: TermDictionary = TermDictionary::open(file)?;
{
for i in (0..20).chain(6000..8_000) {
let &(ref target_key, _) = &ids[i];
let mut streamer = term_dictionary
.range()
.ge(target_key.as_bytes())
.into_stream();
for j in 0..3 {
let (streamer_k, streamer_v) = streamer.next().unwrap();
let &(ref key, ref v) = &ids[i + j];
assert_eq!(str::from_utf8(streamer_k.as_ref()).unwrap(), key);
assert_eq!(streamer_v.doc_freq, *v);
assert_eq!(streamer_v, &make_term_info(*v as u64));
}
}
}
{
for i in (0..20).chain(BLOCK_SIZE - 10..BLOCK_SIZE + 10) {
let &(ref target_key, _) = &ids[i];
let mut streamer = term_dictionary
.range()
.gt(target_key.as_bytes())
.into_stream();
for j in 0..3 {
let (streamer_k, streamer_v) = streamer.next().unwrap();
let &(ref key, ref v) = &ids[i + j + 1];
assert_eq!(streamer_k.as_ref(), key.as_bytes());
assert_eq!(streamer_v.doc_freq, *v);
}
}
}
{
for i in (0..20).chain(BLOCK_SIZE - 10..BLOCK_SIZE + 10) {
for j in 0..3 {
let &(ref fst_key, _) = &ids[i];
let &(ref last_key, _) = &ids[i + j];
let mut streamer = term_dictionary
.range()
.ge(fst_key.as_bytes())
.lt(last_key.as_bytes())
.into_stream();
for _ in 0..j {
assert!(streamer.next().is_some());
}
assert!(streamer.next().is_none());
}
}
}
Ok(())
}
#[test]
fn test_empty_string() -> crate::Result<()> {
let buffer: Vec<u8> = {
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
term_dictionary_builder
.insert(&[], &make_term_info(1 as u64))
.unwrap();
term_dictionary_builder
.insert(&[1u8], &make_term_info(2 as u64))
.unwrap();
term_dictionary_builder.finish().unwrap()
};
let file = FileSlice::from(buffer);
let term_dictionary: TermDictionary = TermDictionary::open(file)?;
let mut stream = term_dictionary.stream();
assert!(stream.advance());
assert!(stream.key().is_empty());
assert!(stream.advance());
assert_eq!(stream.key(), &[1u8]);
assert!(!stream.advance());
Ok(())
}
#[test]
fn test_stream_range_boundaries() -> crate::Result<()> {
let buffer: Vec<u8> = {
let mut term_dictionary_builder = TermDictionaryBuilder::create(Vec::new())?;
for i in 0u8..10u8 {
let number_arr = [i; 1];
term_dictionary_builder.insert(&number_arr, &make_term_info(i as u64))?;
}
term_dictionary_builder.finish()?
};
let file = FileSlice::from(buffer);
let term_dictionary: TermDictionary = TermDictionary::open(file)?;
let value_list = |mut streamer: TermStreamer<'_>, backwards: bool| {
let mut res: Vec<u32> = vec![];
while let Some((_, ref v)) = streamer.next() {
res.push(v.doc_freq);
}
if backwards {
res.reverse();
}
res
};
{
let range = term_dictionary.range().backward().into_stream();
assert_eq!(
value_list(range, true),
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
);
}
{
let range = term_dictionary.range().ge([2u8]).into_stream();
assert_eq!(
value_list(range, false),
vec![2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
);
}
{
let range = term_dictionary.range().ge([2u8]).backward().into_stream();
assert_eq!(
value_list(range, true),
vec![2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
);
}
{
let range = term_dictionary.range().gt([2u8]).into_stream();
assert_eq!(
value_list(range, false),
vec![3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
);
}
{
let range = term_dictionary.range().gt([2u8]).backward().into_stream();
assert_eq!(
value_list(range, true),
vec![3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
);
}
{
let range = term_dictionary.range().lt([6u8]).into_stream();
assert_eq!(
value_list(range, false),
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32]
);
}
{
let range = term_dictionary.range().lt([6u8]).backward().into_stream();
assert_eq!(
value_list(range, true),
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32]
);
}
{
let range = term_dictionary.range().le([6u8]).into_stream();
assert_eq!(
value_list(range, false),
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32]
);
}
{
let range = term_dictionary.range().le([6u8]).backward().into_stream();
assert_eq!(
value_list(range, true),
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32]
);
}
{
let range = term_dictionary.range().ge([0u8]).lt([5u8]).into_stream();
assert_eq!(value_list(range, false), vec![0u32, 1u32, 2u32, 3u32, 4u32]);
}
{
let range = term_dictionary
.range()
.ge([0u8])
.lt([5u8])
.backward()
.into_stream();
assert_eq!(value_list(range, true), vec![0u32, 1u32, 2u32, 3u32, 4u32]);
}
Ok(())
}
#[test]
fn test_automaton_search() -> crate::Result<()> {
use crate::query::DFAWrapper;
use levenshtein_automata::LevenshteinAutomatonBuilder;
const COUNTRIES: [&'static str; 7] = [
"San Marino",
"Serbia",
"Slovakia",
"Slovenia",
"Spain",
"Sweden",
"Switzerland",
];
let directory = RAMDirectory::create();
let path = PathBuf::from("TermDictionary");
{
let write = directory.open_write(&path)?;
let mut term_dictionary_builder = TermDictionaryBuilder::create(write)?;
for term in COUNTRIES.iter() {
term_dictionary_builder.insert(term.as_bytes(), &make_term_info(0u64))?;
}
term_dictionary_builder.finish()?;
}
let file = directory.open_read(&path)?;
let term_dict: TermDictionary = TermDictionary::open(file)?;
// We can now build an entire dfa.
let lev_automaton_builder = LevenshteinAutomatonBuilder::new(2, true);
let automaton = DFAWrapper(lev_automaton_builder.build_dfa("Spaen"));
let mut range = term_dict.search(automaton).into_stream();
// get the first finding
assert!(range.advance());
assert_eq!("Spain".as_bytes(), range.key());
assert!(!range.advance());
Ok(())
}
}

View File

@@ -1,148 +0,0 @@
use std::io;
mod sstable;
mod streamer;
mod termdict;
use self::sstable::value::{ValueReader, ValueWriter};
use self::sstable::{BlockReader, SSTable};
use crate::common::VInt;
use crate::postings::TermInfo;
pub use self::streamer::{TermStreamer, TermStreamerBuilder};
pub use self::termdict::{TermDictionary, TermDictionaryBuilder};
pub struct TermSSTable;
impl SSTable for TermSSTable {
type Value = TermInfo;
type Reader = TermInfoReader;
type Writer = TermInfoWriter;
}
#[derive(Default)]
pub struct TermInfoReader {
term_infos: Vec<TermInfo>,
}
impl ValueReader for TermInfoReader {
type Value = TermInfo;
fn value(&self, idx: usize) -> &TermInfo {
&self.term_infos[idx]
}
fn read(&mut self, reader: &mut BlockReader) -> io::Result<()> {
self.term_infos.clear();
let num_els = VInt::deserialize_u64(reader)?;
let mut start_offset = VInt::deserialize_u64(reader)?;
let mut positions_idx = 0;
for _ in 0..num_els {
let doc_freq = VInt::deserialize_u64(reader)? as u32;
let posting_num_bytes = VInt::deserialize_u64(reader)?;
let stop_offset = start_offset + posting_num_bytes;
let delta_positions_idx = VInt::deserialize_u64(reader)?;
positions_idx += delta_positions_idx;
let term_info = TermInfo {
doc_freq,
postings_start_offset: start_offset,
postings_stop_offset: stop_offset,
positions_idx,
};
self.term_infos.push(term_info);
start_offset = stop_offset;
}
Ok(())
}
}
#[derive(Default)]
pub struct TermInfoWriter {
term_infos: Vec<TermInfo>,
}
impl ValueWriter for TermInfoWriter {
type Value = TermInfo;
fn write(&mut self, term_info: &TermInfo) {
self.term_infos.push(term_info.clone());
}
fn write_block(&mut self, buffer: &mut Vec<u8>) {
VInt(self.term_infos.len() as u64).serialize_into_vec(buffer);
if self.term_infos.is_empty() {
return;
}
let mut prev_position_idx = 0u64;
VInt(self.term_infos[0].postings_start_offset).serialize_into_vec(buffer);
for term_info in &self.term_infos {
VInt(term_info.doc_freq as u64).serialize_into_vec(buffer);
VInt(term_info.postings_stop_offset - term_info.postings_start_offset)
.serialize_into_vec(buffer);
VInt(term_info.positions_idx - prev_position_idx).serialize_into_vec(buffer);
prev_position_idx = term_info.positions_idx;
}
self.term_infos.clear();
}
}
#[cfg(test)]
mod tests {
use std::io;
use super::BlockReader;
use crate::directory::OwnedBytes;
use crate::postings::TermInfo;
use crate::termdict::sstable_termdict::sstable::value::{ValueReader, ValueWriter};
use crate::termdict::sstable_termdict::TermInfoReader;
#[test]
fn test_block_terminfos() -> io::Result<()> {
let mut term_info_writer = super::TermInfoWriter::default();
term_info_writer.write(&TermInfo {
doc_freq: 120u32,
postings_start_offset: 17u64,
postings_stop_offset: 45u64,
positions_idx: 10u64,
});
term_info_writer.write(&TermInfo {
doc_freq: 10u32,
postings_start_offset: 45u64,
postings_stop_offset: 450u64,
positions_idx: 104u64,
});
term_info_writer.write(&TermInfo {
doc_freq: 17u32,
postings_start_offset: 450u64,
postings_stop_offset: 462u64,
positions_idx: 210u64,
});
let mut buffer = Vec::new();
term_info_writer.write_block(&mut buffer);
let mut block_reader = make_block_reader(&buffer[..]);
let mut term_info_reader = TermInfoReader::default();
term_info_reader.read(&mut block_reader)?;
assert_eq!(
term_info_reader.value(0),
&TermInfo {
doc_freq: 120u32,
postings_start_offset: 17u64,
postings_stop_offset: 45u64,
positions_idx: 10u64
}
);
assert!(block_reader.buffer().is_empty());
Ok(())
}
fn make_block_reader(data: &[u8]) -> BlockReader {
let mut buffer = (data.len() as u32).to_le_bytes().to_vec();
buffer.extend_from_slice(data);
let owned_bytes = OwnedBytes::new(buffer);
let mut block_reader = BlockReader::new(Box::new(owned_bytes));
block_reader.read_block().unwrap();
block_reader
}
}

View File

@@ -1,84 +0,0 @@
use byteorder::{LittleEndian, ReadBytesExt};
use std::io::{self, Read};
pub struct BlockReader<'a> {
buffer: Vec<u8>,
reader: Box<dyn io::Read + 'a>,
offset: usize,
}
impl<'a> BlockReader<'a> {
pub fn new(reader: Box<dyn io::Read + 'a>) -> BlockReader<'a> {
BlockReader {
buffer: Vec::new(),
reader,
offset: 0,
}
}
pub fn deserialize_u64(&mut self) -> u64 {
let (num_bytes, val) = super::vint::deserialize_read(self.buffer());
self.advance(num_bytes);
val
}
#[inline(always)]
pub fn buffer_from_to(&self, start: usize, end: usize) -> &[u8] {
&self.buffer[start..end]
}
pub fn buffer_from(&self, start: usize) -> &[u8] {
&self.buffer[start..]
}
pub fn read_block(&mut self) -> io::Result<bool> {
self.offset = 0;
let block_len_res = self.reader.read_u32::<LittleEndian>();
if let Err(err) = &block_len_res {
if err.kind() == io::ErrorKind::UnexpectedEof {
return Ok(false);
}
}
let block_len = block_len_res?;
if block_len == 0u32 {
self.buffer.clear();
return Ok(false);
}
self.buffer.resize(block_len as usize, 0u8);
self.reader.read_exact(&mut self.buffer[..])?;
Ok(true)
}
pub fn offset(&self) -> usize {
self.offset
}
pub fn advance(&mut self, num_bytes: usize) {
self.offset += num_bytes;
}
pub fn buffer(&self) -> &[u8] {
&self.buffer[self.offset..]
}
}
impl<'a> io::Read for BlockReader<'a> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let len = self.buffer().read(buf)?;
self.advance(len);
Ok(len)
}
fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
let len = self.buffer.len();
buf.extend_from_slice(self.buffer());
self.advance(len);
Ok(len)
}
fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
self.buffer().read_exact(buf)?;
self.advance(buf.len());
Ok(())
}
}

View File

@@ -1,203 +0,0 @@
use std::io::{self, BufWriter, Write};
use crate::common::CountingWriter;
use super::value::ValueWriter;
use super::{value, vint, BlockReader};
const FOUR_BIT_LIMITS: usize = 1 << 4;
const VINT_MODE: u8 = 1u8;
const BLOCK_LEN: usize = 256_000;
pub struct DeltaWriter<W, TValueWriter>
where
W: io::Write,
{
block: Vec<u8>,
write: CountingWriter<BufWriter<W>>,
value_writer: TValueWriter,
}
impl<W, TValueWriter> DeltaWriter<W, TValueWriter>
where
W: io::Write,
TValueWriter: ValueWriter,
{
pub fn new(wrt: W) -> Self {
DeltaWriter {
block: Vec::with_capacity(BLOCK_LEN * 2),
write: CountingWriter::wrap(BufWriter::new(wrt)),
value_writer: TValueWriter::default(),
}
}
}
impl<W, TValueWriter> DeltaWriter<W, TValueWriter>
where
W: io::Write,
TValueWriter: value::ValueWriter,
{
pub fn flush_block(&mut self) -> io::Result<Option<(u64, u64)>> {
if self.block.is_empty() {
return Ok(None);
}
let start_offset = self.write.written_bytes();
// TODO avoid buffer allocation
let mut buffer = Vec::new();
self.value_writer.write_block(&mut buffer);
let block_len = buffer.len() + self.block.len();
self.write.write_all(&(block_len as u32).to_le_bytes())?;
self.write.write_all(&buffer[..])?;
self.write.write_all(&mut self.block[..])?;
let end_offset = self.write.written_bytes();
self.block.clear();
Ok(Some((start_offset, end_offset)))
}
fn encode_keep_add(&mut self, keep_len: usize, add_len: usize) {
if keep_len < FOUR_BIT_LIMITS && add_len < FOUR_BIT_LIMITS {
let b = (keep_len | add_len << 4) as u8;
self.block.extend_from_slice(&[b])
} else {
let mut buf = [VINT_MODE; 20];
let mut len = 1 + vint::serialize(keep_len as u64, &mut buf[1..]);
len += vint::serialize(add_len as u64, &mut buf[len..]);
self.block.extend_from_slice(&mut buf[..len])
}
}
pub(crate) fn write_suffix(&mut self, common_prefix_len: usize, suffix: &[u8]) {
let keep_len = common_prefix_len;
let add_len = suffix.len();
self.encode_keep_add(keep_len, add_len);
self.block.extend_from_slice(suffix);
}
pub(crate) fn write_value(&mut self, value: &TValueWriter::Value) {
self.value_writer.write(value);
}
pub fn write_delta(
&mut self,
common_prefix_len: usize,
suffix: &[u8],
value: &TValueWriter::Value,
) {
self.write_suffix(common_prefix_len, suffix);
self.write_value(value);
}
pub fn flush_block_if_required(&mut self) -> io::Result<Option<(u64, u64)>> {
if self.block.len() > BLOCK_LEN {
return self.flush_block();
}
Ok(None)
}
pub fn finalize(mut self) -> CountingWriter<BufWriter<W>> {
self.write
}
}
pub struct DeltaReader<'a, TValueReader> {
common_prefix_len: usize,
suffix_start: usize,
suffix_end: usize,
value_reader: TValueReader,
block_reader: BlockReader<'a>,
idx: usize,
}
impl<'a, TValueReader> DeltaReader<'a, TValueReader>
where
TValueReader: value::ValueReader,
{
pub fn new<R: io::Read + 'a>(reader: R) -> Self {
DeltaReader {
idx: 0,
common_prefix_len: 0,
suffix_start: 0,
suffix_end: 0,
value_reader: TValueReader::default(),
block_reader: BlockReader::new(Box::new(reader)),
}
}
fn deserialize_vint(&mut self) -> u64 {
self.block_reader.deserialize_u64()
}
fn read_keep_add(&mut self) -> Option<(usize, usize)> {
let b = {
let buf = &self.block_reader.buffer();
if buf.is_empty() {
return None;
}
buf[0]
};
self.block_reader.advance(1);
match b {
VINT_MODE => {
let keep = self.deserialize_vint() as usize;
let add = self.deserialize_vint() as usize;
Some((keep, add))
}
b => {
let keep = (b & 0b1111) as usize;
let add = (b >> 4) as usize;
Some((keep, add))
}
}
}
fn read_delta_key(&mut self) -> bool {
if let Some((keep, add)) = self.read_keep_add() {
self.common_prefix_len = keep;
self.suffix_start = self.block_reader.offset();
self.suffix_end = self.suffix_start + add;
self.block_reader.advance(add);
true
} else {
false
}
}
pub fn advance(&mut self) -> io::Result<bool> {
if self.block_reader.buffer().is_empty() {
if !self.block_reader.read_block()? {
return Ok(false);
}
self.value_reader.read(&mut self.block_reader)?;
self.idx = 0;
} else {
self.idx += 1;
}
if !self.read_delta_key() {
return Ok(false);
}
Ok(true)
}
pub fn common_prefix_len(&self) -> usize {
self.common_prefix_len
}
pub fn suffix(&self) -> &[u8] {
&self
.block_reader
.buffer_from_to(self.suffix_start, self.suffix_end)
}
pub fn suffix_from(&self, offset: usize) -> &[u8] {
&self.block_reader.buffer_from_to(
self.suffix_start
.wrapping_add(offset)
.wrapping_sub(self.common_prefix_len),
self.suffix_end,
)
}
pub fn value(&self) -> &TValueReader::Value {
self.value_reader.value(self.idx)
}
}

View File

@@ -1,72 +0,0 @@
use crate::termdict::sstable_termdict::sstable::{Reader, SSTable, Writer};
use super::SingleValueMerger;
use super::ValueMerger;
use std::cmp::Ordering;
use std::collections::binary_heap::PeekMut;
use std::collections::BinaryHeap;
use std::io;
struct HeapItem<B: AsRef<[u8]>>(B);
impl<B: AsRef<[u8]>> Ord for HeapItem<B> {
fn cmp(&self, other: &Self) -> Ordering {
other.0.as_ref().cmp(self.0.as_ref())
}
}
impl<B: AsRef<[u8]>> PartialOrd for HeapItem<B> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(other.0.as_ref().cmp(self.0.as_ref()))
}
}
impl<B: AsRef<[u8]>> Eq for HeapItem<B> {}
impl<B: AsRef<[u8]>> PartialEq for HeapItem<B> {
fn eq(&self, other: &Self) -> bool {
self.0.as_ref() == other.0.as_ref()
}
}
pub fn merge_sstable<SST: SSTable, W: io::Write, M: ValueMerger<SST::Value>>(
readers: Vec<Reader<SST::Reader>>,
mut writer: Writer<W, SST::Writer>,
mut merger: M,
) -> io::Result<()> {
let mut heap: BinaryHeap<HeapItem<Reader<SST::Reader>>> =
BinaryHeap::with_capacity(readers.len());
for mut reader in readers {
if reader.advance()? {
heap.push(HeapItem(reader));
}
}
loop {
let len = heap.len();
let mut value_merger;
if let Some(mut head) = heap.peek_mut() {
writer.write_key(head.0.key());
value_merger = merger.new_value(head.0.value());
if !head.0.advance()? {
PeekMut::pop(head);
}
} else {
break;
}
for _ in 0..len - 1 {
if let Some(mut head) = heap.peek_mut() {
if head.0.key() == writer.current_key() {
value_merger.add(head.0.value());
if !head.0.advance()? {
PeekMut::pop(head);
}
continue;
}
}
break;
}
let value = value_merger.finish();
writer.write_value(&value);
writer.flush_block_if_required()?;
}
writer.finalize()?;
Ok(())
}

View File

@@ -1,184 +0,0 @@
mod heap_merge;
pub use self::heap_merge::merge_sstable;
pub trait SingleValueMerger<V> {
fn add(&mut self, v: &V);
fn finish(self) -> V;
}
pub trait ValueMerger<V> {
type TSingleValueMerger: SingleValueMerger<V>;
fn new_value(&mut self, v: &V) -> Self::TSingleValueMerger;
}
#[derive(Default)]
pub struct KeepFirst;
pub struct FirstVal<V>(V);
impl<V: Clone> ValueMerger<V> for KeepFirst {
type TSingleValueMerger = FirstVal<V>;
fn new_value(&mut self, v: &V) -> FirstVal<V> {
FirstVal(v.clone())
}
}
impl<V> SingleValueMerger<V> for FirstVal<V> {
fn add(&mut self, _: &V) {}
fn finish(self) -> V {
self.0
}
}
pub struct VoidMerge;
impl ValueMerger<()> for VoidMerge {
type TSingleValueMerger = ();
fn new_value(&mut self, _: &()) -> () {
()
}
}
pub struct U64Merge;
impl ValueMerger<u64> for U64Merge {
type TSingleValueMerger = u64;
fn new_value(&mut self, val: &u64) -> u64 {
*val
}
}
impl SingleValueMerger<u64> for u64 {
fn add(&mut self, val: &u64) {
*self += *val;
}
fn finish(self) -> u64 {
self
}
}
impl SingleValueMerger<()> for () {
fn add(&mut self, _: &()) {}
fn finish(self) -> () {
()
}
}
#[cfg(test)]
mod tests {
use super::super::SSTable;
use super::super::{SSTableMonotonicU64, VoidSSTable};
use super::U64Merge;
use super::VoidMerge;
use std::collections::{BTreeMap, BTreeSet};
use std::str;
fn write_sstable(keys: &[&'static str]) -> Vec<u8> {
let mut buffer: Vec<u8> = vec![];
{
let mut sstable_writer = VoidSSTable::writer(&mut buffer);
for &key in keys {
assert!(sstable_writer.write(key.as_bytes(), &()).is_ok());
}
assert!(sstable_writer.finalize().is_ok());
}
dbg!(&buffer);
buffer
}
fn write_sstable_u64(keys: &[(&'static str, u64)]) -> Vec<u8> {
let mut buffer: Vec<u8> = vec![];
{
let mut sstable_writer = SSTableMonotonicU64::writer(&mut buffer);
for (key, val) in keys {
assert!(sstable_writer.write(key.as_bytes(), val).is_ok());
}
assert!(sstable_writer.finalize().is_ok());
}
buffer
}
fn merge_test_aux(arrs: &[&[&'static str]]) {
let sstables = arrs.iter().cloned().map(write_sstable).collect::<Vec<_>>();
let sstables_ref: Vec<&[u8]> = sstables.iter().map(|s| s.as_ref()).collect();
let mut merged = BTreeSet::new();
for &arr in arrs.iter() {
for &s in arr {
merged.insert(s.to_string());
}
}
let mut w = Vec::new();
assert!(VoidSSTable::merge(sstables_ref, &mut w, VoidMerge).is_ok());
let mut reader = VoidSSTable::reader(&w[..]);
for k in merged {
assert!(reader.advance().unwrap());
assert_eq!(reader.key(), k.as_bytes());
}
assert!(!reader.advance().unwrap());
}
fn merge_test_u64_monotonic_aux(arrs: &[&[(&'static str, u64)]]) {
let sstables = arrs
.iter()
.cloned()
.map(write_sstable_u64)
.collect::<Vec<_>>();
let sstables_ref: Vec<&[u8]> = sstables.iter().map(|s| s.as_ref()).collect();
let mut merged = BTreeMap::new();
for &arr in arrs.iter() {
for (key, val) in arr {
let entry = merged.entry(key.to_string()).or_insert(0u64);
*entry += val;
}
}
let mut w = Vec::new();
assert!(SSTableMonotonicU64::merge(sstables_ref, &mut w, U64Merge).is_ok());
let mut reader = SSTableMonotonicU64::reader(&w[..]);
for (k, v) in merged {
assert!(reader.advance().unwrap());
assert_eq!(reader.key(), k.as_bytes());
assert_eq!(reader.value(), &v);
}
assert!(!reader.advance().unwrap());
}
#[test]
fn test_merge_simple_reproduce() {
let sstable_data = write_sstable(&["a"]);
let mut reader = VoidSSTable::reader(&sstable_data[..]);
assert!(reader.advance().unwrap());
assert_eq!(reader.key(), b"a");
assert!(!reader.advance().unwrap());
}
#[test]
fn test_merge() {
merge_test_aux(&[]);
merge_test_aux(&[&["a"]]);
merge_test_aux(&[&["a", "b"], &["ab"]]); // a, ab, b
merge_test_aux(&[&["a", "b"], &["a", "b"]]);
merge_test_aux(&[
&["happy", "hello", "payer", "tax"],
&["habitat", "hello", "zoo"],
&[],
&["a"],
]);
merge_test_aux(&[&["a"]]);
merge_test_aux(&[&["a", "b"], &["ab"]]);
merge_test_aux(&[&["a", "b"], &["a", "b"]]);
}
#[test]
fn test_merge_u64() {
merge_test_u64_monotonic_aux(&[]);
merge_test_u64_monotonic_aux(&[&[("a", 1u64)]]);
merge_test_u64_monotonic_aux(&[&[("a", 1u64), ("b", 3u64)], &[("ab", 2u64)]]); // a, ab, b
merge_test_u64_monotonic_aux(&[&[("a", 1u64), ("b", 2u64)], &[("a", 16u64), ("b", 23u64)]]);
}
}

View File

@@ -1,365 +0,0 @@
use merge::ValueMerger;
use std::io::{self, Write};
use std::usize;
mod delta;
pub mod merge;
pub mod value;
pub(crate) mod sstable_index;
pub(crate) use self::sstable_index::{SSTableIndex, SSTableIndexBuilder};
pub(crate) mod vint;
mod block_reader;
pub use self::delta::DeltaReader;
use self::delta::DeltaWriter;
use self::value::{U64MonotonicReader, U64MonotonicWriter, ValueReader, ValueWriter};
pub use self::block_reader::BlockReader;
pub use self::merge::VoidMerge;
const DEFAULT_KEY_CAPACITY: usize = 50;
pub(crate) fn common_prefix_len(left: &[u8], right: &[u8]) -> usize {
left.iter()
.cloned()
.zip(right.iter().cloned())
.take_while(|(left, right)| left == right)
.count()
}
pub trait SSTable: Sized {
type Value;
type Reader: ValueReader<Value = Self::Value>;
type Writer: ValueWriter<Value = Self::Value>;
fn delta_writer<W: io::Write>(write: W) -> DeltaWriter<W, Self::Writer> {
DeltaWriter::new(write)
}
fn writer<W: io::Write>(write: W) -> Writer<W, Self::Writer> {
Writer {
previous_key: Vec::with_capacity(DEFAULT_KEY_CAPACITY),
num_terms: 0u64,
index_builder: SSTableIndexBuilder::default(),
delta_writer: Self::delta_writer(write),
first_ordinal_of_the_block: 0u64,
}
}
fn delta_reader<'a, R: io::Read + 'a>(reader: R) -> DeltaReader<'a, Self::Reader> {
DeltaReader::new(reader)
}
fn reader<'a, R: io::Read + 'a>(reader: R) -> Reader<'a, Self::Reader> {
Reader {
key: Vec::with_capacity(DEFAULT_KEY_CAPACITY),
delta_reader: Self::delta_reader(reader),
}
}
fn merge<R: io::Read, W: io::Write, M: ValueMerger<Self::Value>>(
io_readers: Vec<R>,
w: W,
merger: M,
) -> io::Result<()> {
let readers: Vec<_> = io_readers.into_iter().map(Self::reader).collect();
let writer = Self::writer(w);
merge::merge_sstable::<Self, _, _>(readers, writer, merger)
}
}
pub struct VoidSSTable;
impl SSTable for VoidSSTable {
type Value = ();
type Reader = value::VoidReader;
type Writer = value::VoidWriter;
}
pub struct SSTableMonotonicU64;
impl SSTable for SSTableMonotonicU64 {
type Value = u64;
type Reader = U64MonotonicReader;
type Writer = U64MonotonicWriter;
}
pub struct Reader<'a, TValueReader> {
key: Vec<u8>,
delta_reader: DeltaReader<'a, TValueReader>,
}
impl<'a, TValueReader> Reader<'a, TValueReader>
where
TValueReader: ValueReader,
{
pub fn advance(&mut self) -> io::Result<bool> {
if !self.delta_reader.advance()? {
return Ok(false);
}
let common_prefix_len = self.delta_reader.common_prefix_len();
let suffix = self.delta_reader.suffix();
let new_len = self.delta_reader.common_prefix_len() + suffix.len();
self.key.resize(new_len, 0u8);
self.key[common_prefix_len..].copy_from_slice(suffix);
Ok(true)
}
pub fn key(&self) -> &[u8] {
&self.key
}
pub fn value(&self) -> &TValueReader::Value {
self.delta_reader.value()
}
pub(crate) fn into_delta_reader(self) -> DeltaReader<'a, TValueReader> {
assert!(self.key.is_empty());
self.delta_reader
}
}
impl<'a, TValueReader> AsRef<[u8]> for Reader<'a, TValueReader> {
fn as_ref(&self) -> &[u8] {
&self.key
}
}
pub struct Writer<W, TValueWriter>
where
W: io::Write,
{
previous_key: Vec<u8>,
index_builder: SSTableIndexBuilder,
delta_writer: DeltaWriter<W, TValueWriter>,
num_terms: u64,
first_ordinal_of_the_block: u64,
}
impl<W, TValueWriter> Writer<W, TValueWriter>
where
W: io::Write,
TValueWriter: value::ValueWriter,
{
pub(crate) fn current_key(&self) -> &[u8] {
&self.previous_key[..]
}
pub fn write_key(&mut self, key: &[u8]) {
let keep_len = common_prefix_len(&self.previous_key, key);
let add_len = key.len() - keep_len;
let increasing_keys = add_len > 0 && (self.previous_key.len() == keep_len)
|| self.previous_key.is_empty()
|| self.previous_key[keep_len] < key[keep_len];
assert!(
increasing_keys,
"Keys should be increasing. ({:?} > {:?})",
self.previous_key, key
);
self.previous_key.resize(key.len(), 0u8);
self.previous_key[keep_len..].copy_from_slice(&key[keep_len..]);
self.delta_writer.write_suffix(keep_len, &key[keep_len..]);
}
pub(crate) fn into_delta_writer(self) -> DeltaWriter<W, TValueWriter> {
self.delta_writer
}
pub fn write(&mut self, key: &[u8], value: &TValueWriter::Value) -> io::Result<()> {
self.write_key(key);
self.write_value(value)?;
Ok(())
}
pub fn write_value(&mut self, value: &TValueWriter::Value) -> io::Result<()> {
self.delta_writer.write_value(value);
self.num_terms += 1u64;
self.flush_block_if_required()
}
pub fn flush_block_if_required(&mut self) -> io::Result<()> {
if let Some((start_offset, end_offset)) = self.delta_writer.flush_block_if_required()? {
self.index_builder.add_block(
&self.previous_key[..],
start_offset,
end_offset,
self.first_ordinal_of_the_block,
);
self.first_ordinal_of_the_block = self.num_terms;
self.previous_key.clear();
}
Ok(())
}
pub fn finalize(mut self) -> io::Result<W> {
if let Some((start_offset, end_offset)) = self.delta_writer.flush_block()? {
self.index_builder.add_block(
&self.previous_key[..],
start_offset,
end_offset,
self.first_ordinal_of_the_block,
);
self.first_ordinal_of_the_block = self.num_terms;
}
let mut wrt = self.delta_writer.finalize();
wrt.write_all(&0u32.to_le_bytes())?;
let offset = wrt.written_bytes();
self.index_builder.serialize(&mut wrt)?;
wrt.write_all(&offset.to_le_bytes())?;
wrt.write_all(&self.num_terms.to_le_bytes())?;
let wrt = wrt.finish();
Ok(wrt.into_inner()?)
}
}
#[cfg(test)]
mod test {
use std::io;
use super::SSTable;
use super::VoidMerge;
use super::VoidSSTable;
use super::{common_prefix_len, SSTableMonotonicU64};
fn aux_test_common_prefix_len(left: &str, right: &str, expect_len: usize) {
assert_eq!(
common_prefix_len(left.as_bytes(), right.as_bytes()),
expect_len
);
assert_eq!(
common_prefix_len(right.as_bytes(), left.as_bytes()),
expect_len
);
}
#[test]
fn test_common_prefix_len() {
aux_test_common_prefix_len("a", "ab", 1);
aux_test_common_prefix_len("", "ab", 0);
aux_test_common_prefix_len("ab", "abc", 2);
aux_test_common_prefix_len("abde", "abce", 2);
}
#[test]
fn test_long_key_diff() {
let long_key = (0..1_024).map(|x| (x % 255) as u8).collect::<Vec<_>>();
let long_key2 = (1..300).map(|x| (x % 255) as u8).collect::<Vec<_>>();
let mut buffer = vec![];
{
let mut sstable_writer = VoidSSTable::writer(&mut buffer);
assert!(sstable_writer.write(&long_key[..], &()).is_ok());
assert!(sstable_writer.write(&[0, 3, 4], &()).is_ok());
assert!(sstable_writer.write(&long_key2[..], &()).is_ok());
assert!(sstable_writer.finalize().is_ok());
}
let mut sstable_reader = VoidSSTable::reader(&buffer[..]);
assert!(sstable_reader.advance().unwrap());
assert_eq!(sstable_reader.key(), &long_key[..]);
assert!(sstable_reader.advance().unwrap());
assert_eq!(sstable_reader.key(), &[0, 3, 4]);
assert!(sstable_reader.advance().unwrap());
assert_eq!(sstable_reader.key(), &long_key2[..]);
assert!(!sstable_reader.advance().unwrap());
}
#[test]
fn test_simple_sstable() {
let mut buffer = vec![];
{
let mut sstable_writer = VoidSSTable::writer(&mut buffer);
assert!(sstable_writer.write(&[17u8], &()).is_ok());
assert!(sstable_writer.write(&[17u8, 18u8, 19u8], &()).is_ok());
assert!(sstable_writer.write(&[17u8, 20u8], &()).is_ok());
assert!(sstable_writer.finalize().is_ok());
}
assert_eq!(
&buffer,
&[
// block len
7u8, 0u8, 0u8, 0u8, // keep 0 push 1 | ""
16u8, 17u8, // keep 1 push 2 | 18 19
33u8, 18u8, 19u8, // keep 1 push 1 | 20
17u8, 20u8, 0u8, 0u8, 0u8, 0u8, // no more blocks
// index
161, 102, 98, 108, 111, 99, 107, 115, 129, 162, 104, 108, 97, 115, 116, 95, 107,
101, 121, 130, 17, 20, 106, 98, 108, 111, 99, 107, 95, 97, 100, 100, 114, 163, 108,
115, 116, 97, 114, 116, 95, 111, 102, 102, 115, 101, 116, 0, 106, 101, 110, 100,
95, 111, 102, 102, 115, 101, 116, 11, 109, 102, 105, 114, 115, 116, 95, 111, 114,
100, 105, 110, 97, 108, 0, 15, 0, 0, 0, 0, 0, 0, 0, // offset for the index
3u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8 // num terms
]
);
let mut sstable_reader = VoidSSTable::reader(&buffer[..]);
assert!(sstable_reader.advance().unwrap());
assert_eq!(sstable_reader.key(), &[17u8]);
assert!(sstable_reader.advance().unwrap());
assert_eq!(sstable_reader.key(), &[17u8, 18u8, 19u8]);
assert!(sstable_reader.advance().unwrap());
assert_eq!(sstable_reader.key(), &[17u8, 20u8]);
assert!(!sstable_reader.advance().unwrap());
}
#[test]
#[should_panic]
fn test_simple_sstable_non_increasing_key() {
let mut buffer = vec![];
let mut sstable_writer = VoidSSTable::writer(&mut buffer);
assert!(sstable_writer.write(&[17u8], &()).is_ok());
assert!(sstable_writer.write(&[16u8], &()).is_ok());
}
#[test]
fn test_merge_abcd_abe() {
let mut buffer = Vec::new();
{
let mut writer = VoidSSTable::writer(&mut buffer);
writer.write(b"abcd", &()).unwrap();
writer.write(b"abe", &()).unwrap();
writer.finalize().unwrap();
}
let mut output = Vec::new();
assert!(VoidSSTable::merge(vec![&buffer[..], &buffer[..]], &mut output, VoidMerge).is_ok());
assert_eq!(&output[..], &buffer[..]);
}
#[test]
fn test_sstable() {
let mut buffer = Vec::new();
{
let mut writer = VoidSSTable::writer(&mut buffer);
writer.write(b"abcd", &()).unwrap();
writer.write(b"abe", &()).unwrap();
writer.finalize().unwrap();
}
let mut output = Vec::new();
assert!(VoidSSTable::merge(vec![&buffer[..], &buffer[..]], &mut output, VoidMerge).is_ok());
assert_eq!(&output[..], &buffer[..]);
}
#[test]
fn test_sstable_u64() -> io::Result<()> {
let mut buffer = Vec::new();
let mut writer = SSTableMonotonicU64::writer(&mut buffer);
writer.write(b"abcd", &1u64)?;
writer.write(b"abe", &4u64)?;
writer.write(b"gogo", &4324234234234234u64)?;
writer.finalize()?;
let mut reader = SSTableMonotonicU64::reader(&buffer[..]);
assert!(reader.advance()?);
assert_eq!(reader.key(), b"abcd");
assert_eq!(reader.value(), &1u64);
assert!(reader.advance()?);
assert_eq!(reader.key(), b"abe");
assert_eq!(reader.value(), &4u64);
assert!(reader.advance()?);
assert_eq!(reader.key(), b"gogo");
assert_eq!(reader.value(), &4324234234234234u64);
assert!(!reader.advance()?);
Ok(())
}
}

View File

@@ -1,90 +0,0 @@
use std::io;
use serde;
use serde::{Deserialize, Serialize};
#[derive(Default, Debug, Serialize, Deserialize)]
pub struct SSTableIndex {
blocks: Vec<BlockMeta>,
}
impl SSTableIndex {
pub fn load(data: &[u8]) -> SSTableIndex {
// TODO
serde_cbor::de::from_slice(data).unwrap()
}
pub fn search(&self, key: &[u8]) -> Option<BlockAddr> {
self.blocks
.iter()
.find(|block| &block.last_key[..] >= &key)
.map(|block| block.block_addr)
}
}
#[derive(Clone, Eq, PartialEq, Debug, Copy, Serialize, Deserialize)]
pub struct BlockAddr {
pub start_offset: u64,
pub end_offset: u64,
pub first_ordinal: u64,
}
#[derive(Debug, Serialize, Deserialize)]
struct BlockMeta {
pub last_key: Vec<u8>,
pub block_addr: BlockAddr,
}
#[derive(Default)]
pub struct SSTableIndexBuilder {
index: SSTableIndex,
}
impl SSTableIndexBuilder {
pub fn add_block(
&mut self,
last_key: &[u8],
start_offset: u64,
stop_offset: u64,
first_ordinal: u64,
) {
self.index.blocks.push(BlockMeta {
last_key: last_key.to_vec(),
block_addr: BlockAddr {
start_offset,
end_offset: stop_offset,
first_ordinal,
},
})
}
pub fn serialize(&self, wrt: &mut dyn io::Write) -> io::Result<()> {
serde_cbor::ser::to_writer(wrt, &self.index).unwrap();
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::{BlockAddr, SSTableIndex, SSTableIndexBuilder};
#[test]
fn test_sstable_index() {
let mut sstable_builder = SSTableIndexBuilder::default();
sstable_builder.add_block(b"aaa", 10u64, 20u64, 0u64);
sstable_builder.add_block(b"bbbbbbb", 20u64, 30u64, 564);
sstable_builder.add_block(b"ccc", 30u64, 40u64, 10u64);
sstable_builder.add_block(b"dddd", 40u64, 50u64, 15u64);
let mut buffer: Vec<u8> = Vec::new();
sstable_builder.serialize(&mut buffer).unwrap();
let sstable = SSTableIndex::load(&buffer[..]);
assert_eq!(
sstable.search(b"bbbde"),
Some(BlockAddr {
first_ordinal: 10u64,
start_offset: 30u64,
end_offset: 40u64
})
);
}
}

View File

@@ -1,94 +0,0 @@
use super::{vint, BlockReader};
use std::io;
pub trait ValueReader: Default {
type Value;
fn value(&self, idx: usize) -> &Self::Value;
fn read(&mut self, reader: &mut BlockReader) -> io::Result<()>;
}
pub trait ValueWriter: Default {
type Value;
fn write(&mut self, val: &Self::Value);
fn write_block(&mut self, writer: &mut Vec<u8>);
}
#[derive(Default)]
pub struct VoidReader;
impl ValueReader for VoidReader {
type Value = ();
fn value(&self, _idx: usize) -> &() {
&()
}
fn read(&mut self, _reader: &mut BlockReader) -> io::Result<()> {
Ok(())
}
}
#[derive(Default)]
pub struct VoidWriter;
impl ValueWriter for VoidWriter {
type Value = ();
fn write(&mut self, _val: &()) {}
fn write_block(&mut self, _writer: &mut Vec<u8>) {}
}
#[derive(Default)]
pub struct U64MonotonicWriter {
vals: Vec<u64>,
}
impl ValueWriter for U64MonotonicWriter {
type Value = u64;
fn write(&mut self, val: &Self::Value) {
self.vals.push(*val);
}
fn write_block(&mut self, writer: &mut Vec<u8>) {
let mut prev_val = 0u64;
vint::serialize_into_vec(self.vals.len() as u64, writer);
for &val in &self.vals {
let delta = val - prev_val;
vint::serialize_into_vec(delta, writer);
prev_val = val;
}
self.vals.clear();
}
}
#[derive(Default)]
pub struct U64MonotonicReader {
vals: Vec<u64>,
}
impl ValueReader for U64MonotonicReader {
type Value = u64;
fn value(&self, idx: usize) -> &Self::Value {
&self.vals[idx]
}
fn read(&mut self, reader: &mut BlockReader) -> io::Result<()> {
let len = reader.deserialize_u64() as usize;
self.vals.clear();
let mut prev_val = 0u64;
for _ in 0..len {
let delta = reader.deserialize_u64() as u64;
let val = prev_val + delta;
self.vals.push(val);
prev_val = val;
}
Ok(())
}
}

View File

@@ -1,74 +0,0 @@
use super::BlockReader;
const CONTINUE_BIT: u8 = 128u8;
pub fn serialize(mut val: u64, buffer: &mut [u8]) -> usize {
for (i, b) in buffer.iter_mut().enumerate() {
let next_byte: u8 = (val & 127u64) as u8;
val = val >> 7;
if val == 0u64 {
*b = next_byte;
return i + 1;
} else {
*b = next_byte | CONTINUE_BIT;
}
}
10 //< actually unreachable
}
pub fn serialize_into_vec(val: u64, buffer: &mut Vec<u8>) {
let mut buf = [0u8; 10];
let num_bytes = serialize(val, &mut buf[..]);
buffer.extend_from_slice(&buf[..num_bytes]);
}
// super slow but we don't care
pub fn deserialize_read(buf: &[u8]) -> (usize, u64) {
let mut result = 0u64;
let mut shift = 0u64;
let mut consumed = 0;
for &b in buf {
consumed += 1;
result |= u64::from(b % 128u8) << shift;
if b < CONTINUE_BIT {
break;
}
shift += 7;
}
(consumed, result)
}
pub fn deserialize_from_block(block: &mut BlockReader) -> u64 {
let (num_bytes, val) = deserialize_read(block.buffer());
block.advance(num_bytes);
val
}
#[cfg(test)]
mod tests {
use super::{deserialize_read, serialize};
use std::u64;
fn aux_test_int(val: u64, expect_len: usize) {
let mut buffer = [0u8; 14];
assert_eq!(serialize(val, &mut buffer[..]), expect_len);
assert_eq!(deserialize_read(&buffer), (expect_len, val));
}
#[test]
fn test_vint() {
aux_test_int(0u64, 1);
aux_test_int(17u64, 1);
aux_test_int(127u64, 1);
aux_test_int(128u64, 2);
aux_test_int(123423418u64, 4);
for i in 1..63 {
let power_of_two = 1u64 << i;
aux_test_int(power_of_two + 1, (i / 7) + 1);
aux_test_int(power_of_two, (i / 7) + 1);
aux_test_int(power_of_two - 1, ((i - 1) / 7) + 1);
}
aux_test_int(u64::MAX, 10);
}
}

View File

@@ -1,227 +0,0 @@
use super::TermDictionary;
use crate::postings::TermInfo;
use crate::termdict::sstable_termdict::TermInfoReader;
use crate::termdict::TermOrdinal;
use std::io;
use std::ops::Bound;
use tantivy_fst::automaton::AlwaysMatch;
use tantivy_fst::Automaton;
/// `TermStreamerBuilder` is a helper object used to define
/// a range of terms that should be streamed.
pub struct TermStreamerBuilder<'a, A = AlwaysMatch>
where
A: Automaton,
A::State: Clone,
{
term_dict: &'a TermDictionary,
automaton: A,
lower: Bound<Vec<u8>>,
upper: Bound<Vec<u8>>,
}
impl<'a, A> TermStreamerBuilder<'a, A>
where
A: Automaton,
A::State: Clone,
{
pub(crate) fn new(term_dict: &'a TermDictionary, automaton: A) -> Self {
TermStreamerBuilder {
term_dict,
automaton,
lower: Bound::Unbounded,
upper: Bound::Unbounded,
}
}
/// Limit the range to terms greater or equal to the bound
pub fn ge<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
self.lower = Bound::Included(bound.as_ref().to_owned());
self
}
/// Limit the range to terms strictly greater than the bound
pub fn gt<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
self.lower = Bound::Excluded(bound.as_ref().to_owned());
self
}
/// Limit the range to terms lesser or equal to the bound
pub fn le<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
self.upper = Bound::Included(bound.as_ref().to_owned());
self
}
/// Limit the range to terms lesser or equal to the bound
pub fn lt<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
self.lower = Bound::Excluded(bound.as_ref().to_owned());
self
}
pub fn backward(mut self) -> Self {
unimplemented!()
}
/// Creates the stream corresponding to the range
/// of terms defined using the `TermStreamerBuilder`.
pub fn into_stream(self) -> io::Result<TermStreamer<'a, A>> {
let start_state = self.automaton.start();
let delta_reader = self.term_dict.sstable_delta_reader()?;
Ok(TermStreamer {
automaton: self.automaton,
states: vec![start_state],
delta_reader,
key: Vec::new(),
term_ord: 0u64,
})
}
}
/// `TermStreamer` acts as a cursor over a range of terms of a segment.
/// Terms are guaranteed to be sorted.
pub struct TermStreamer<'a, A = AlwaysMatch>
where
A: Automaton,
A::State: Clone,
{
automaton: A,
states: Vec<A::State>,
delta_reader: super::sstable::DeltaReader<'a, TermInfoReader>,
key: Vec<u8>,
term_ord: TermOrdinal,
}
impl<'a, A> TermStreamer<'a, A>
where
A: Automaton,
A::State: Clone,
{
/// Advance position the stream on the next item.
/// Before the first call to `.advance()`, the stream
/// is an unitialized state.
pub fn advance(&mut self) -> bool {
while self.delta_reader.advance().unwrap() {
self.term_ord += 1u64;
let common_prefix_len = self.delta_reader.common_prefix_len();
self.states.truncate(common_prefix_len + 1);
self.key.truncate(common_prefix_len);
let mut state: A::State = self.states.last().unwrap().clone();
for &b in self.delta_reader.suffix() {
state = self.automaton.accept(&state, b);
self.states.push(state.clone());
}
self.key.extend_from_slice(self.delta_reader.suffix());
if self.automaton.is_match(&state) {
return true;
}
}
false
}
/// Returns the `TermOrdinal` of the given term.
///
/// May panic if the called as `.advance()` as never
/// been called before.
pub fn term_ord(&self) -> TermOrdinal {
self.term_ord
}
/// Accesses the current key.
///
/// `.key()` should return the key that was returned
/// by the `.next()` method.
///
/// If the end of the stream as been reached, and `.next()`
/// has been called and returned `None`, `.key()` remains
/// the value of the last key encountered.
///
/// Before any call to `.next()`, `.key()` returns an empty array.
pub fn key(&self) -> &[u8] {
&self.key
}
/// Accesses the current value.
///
/// Calling `.value()` after the end of the stream will return the
/// last `.value()` encountered.
///
/// # Panics
///
/// Calling `.value()` before the first call to `.advance()` returns
/// `V::default()`.
pub fn value(&self) -> &TermInfo {
self.delta_reader.value()
}
/// Return the next `(key, value)` pair.
#[cfg_attr(feature = "cargo-clippy", allow(clippy::should_implement_trait))]
pub fn next(&mut self) -> Option<(&[u8], &TermInfo)> {
if self.advance() {
Some((self.key(), self.value()))
} else {
None
}
}
}
#[cfg(test)]
mod tests {
use super::super::TermDictionary;
use crate::directory::OwnedBytes;
use crate::postings::TermInfo;
fn make_term_info(i: u64) -> TermInfo {
TermInfo {
doc_freq: 1000u32 + i as u32,
positions_idx: i * 500,
postings_start_offset: (i + 10) * (i * 10),
postings_stop_offset: ((i + 1) + 10) * ((i + 1) * 10),
}
}
fn create_test_term_dictionary() -> crate::Result<TermDictionary> {
let mut term_dict_builder = super::super::TermDictionaryBuilder::create(Vec::new())?;
term_dict_builder.insert(b"abaisance", &make_term_info(0u64))?;
term_dict_builder.insert(b"abalation", &make_term_info(1u64))?;
term_dict_builder.insert(b"abalienate", &make_term_info(2u64))?;
term_dict_builder.insert(b"abandon", &make_term_info(3u64))?;
let buffer = term_dict_builder.finish()?;
let owned_bytes = OwnedBytes::new(buffer);
TermDictionary::from_bytes(owned_bytes)
}
#[test]
fn test_sstable_stream() -> crate::Result<()> {
let term_dict = create_test_term_dictionary()?;
let mut term_streamer = term_dict.stream()?;
assert!(term_streamer.advance());
assert_eq!(term_streamer.key(), b"abaisance");
assert_eq!(term_streamer.value().doc_freq, 1000u32);
assert!(term_streamer.advance());
assert_eq!(term_streamer.key(), b"abalation");
assert_eq!(term_streamer.value().doc_freq, 1001u32);
assert!(term_streamer.advance());
assert_eq!(term_streamer.key(), b"abalienate");
assert_eq!(term_streamer.value().doc_freq, 1002u32);
assert!(term_streamer.advance());
assert_eq!(term_streamer.key(), b"abandon");
assert_eq!(term_streamer.value().doc_freq, 1003u32);
assert!(!term_streamer.advance());
Ok(())
}
#[test]
fn test_sstable_search() -> crate::Result<()> {
let term_dict = create_test_term_dictionary()?;
let ptn = tantivy_fst::Regex::new("ab.*t.*").unwrap();
let mut term_streamer = term_dict.search(ptn).into_stream()?;
assert!(term_streamer.advance());
assert_eq!(term_streamer.key(), b"abalation");
assert_eq!(term_streamer.value().doc_freq, 1001u32);
assert!(term_streamer.advance());
assert_eq!(term_streamer.key(), b"abalienate");
assert_eq!(term_streamer.value().doc_freq, 1002u32);
assert!(!term_streamer.advance());
Ok(())
}
}

View File

@@ -1,228 +0,0 @@
use std::io;
use crate::common::BinarySerializable;
use crate::directory::{FileSlice, OwnedBytes};
use crate::postings::TermInfo;
use crate::termdict::sstable_termdict::sstable::sstable_index::BlockAddr;
use crate::termdict::sstable_termdict::sstable::Writer;
use crate::termdict::sstable_termdict::sstable::{DeltaReader, SSTable};
use crate::termdict::sstable_termdict::sstable::{Reader, SSTableIndex};
use crate::termdict::sstable_termdict::{
TermInfoReader, TermInfoWriter, TermSSTable, TermStreamer, TermStreamerBuilder,
};
use crate::termdict::TermOrdinal;
use crate::HasLen;
use once_cell::sync::Lazy;
use tantivy_fst::automaton::AlwaysMatch;
use tantivy_fst::Automaton;
pub struct TermInfoSSTable;
impl SSTable for TermInfoSSTable {
type Value = TermInfo;
type Reader = TermInfoReader;
type Writer = TermInfoWriter;
}
pub struct TermDictionaryBuilder<W: io::Write> {
sstable_writer: Writer<W, TermInfoWriter>,
}
impl<W: io::Write> TermDictionaryBuilder<W> {
/// Creates a new `TermDictionaryBuilder`
pub fn create(w: W) -> io::Result<Self> {
let sstable_writer = TermSSTable::writer(w);
Ok(TermDictionaryBuilder { sstable_writer })
}
/// Inserts a `(key, value)` pair in the term dictionary.
///
/// *Keys have to be inserted in order.*
pub fn insert<K: AsRef<[u8]>>(&mut self, key_ref: K, value: &TermInfo) -> io::Result<()> {
let key = key_ref.as_ref();
self.insert_key(key)?;
self.insert_value(value)?;
Ok(())
}
/// # Warning
/// Horribly dangerous internal API
///
/// If used, it must be used by systematically alternating calls
/// to insert_key and insert_value.
///
/// Prefer using `.insert(key, value)`
pub(crate) fn insert_key(&mut self, key: &[u8]) -> io::Result<()> {
self.sstable_writer.write_key(key);
Ok(())
}
/// # Warning
///
/// Horribly dangerous internal API. See `.insert_key(...)`.
pub(crate) fn insert_value(&mut self, term_info: &TermInfo) -> io::Result<()> {
self.sstable_writer.write_value(term_info);
Ok(())
}
/// Finalize writing the builder, and returns the underlying
/// `Write` object.
pub fn finish(self) -> io::Result<W> {
self.sstable_writer.finalize()
}
}
static EMPTY_TERM_DICT_FILE: Lazy<FileSlice> = Lazy::new(|| {
let term_dictionary_data: Vec<u8> = TermDictionaryBuilder::create(Vec::<u8>::new())
.expect("Creating a TermDictionaryBuilder in a Vec<u8> should never fail")
.finish()
.expect("Writing in a Vec<u8> should never fail");
FileSlice::from(term_dictionary_data)
});
/// The term dictionary contains all of the terms in
/// `tantivy index` in a sorted manner.
///
/// The `Fst` crate is used to associate terms to their
/// respective `TermOrdinal`. The `TermInfoStore` then makes it
/// possible to fetch the associated `TermInfo`.
pub struct TermDictionary {
sstable_slice: FileSlice,
sstable_index: SSTableIndex,
num_terms: u64,
}
impl TermDictionary {
pub(crate) fn sstable_reader(&self) -> io::Result<Reader<'static, TermInfoReader>> {
let data = self.sstable_slice.read_bytes()?;
Ok(TermInfoSSTable::reader(data))
}
pub(crate) fn sstable_reader_block(
&self,
block_addr: BlockAddr,
) -> io::Result<Reader<'static, TermInfoReader>> {
let data = self.sstable_slice.read_bytes_slice(
block_addr.start_offset as usize,
block_addr.end_offset as usize,
)?;
Ok(TermInfoSSTable::reader(data))
}
pub(crate) fn sstable_delta_reader(&self) -> io::Result<DeltaReader<'static, TermInfoReader>> {
let data = self.sstable_slice.read_bytes()?;
Ok(TermInfoSSTable::delta_reader(data))
}
/// Opens a `TermDictionary`.
pub fn open(term_dictionary_file: FileSlice) -> crate::Result<Self> {
let (main_slice, footer_len_slice) = term_dictionary_file.split_from_end(16);
let mut footer_len_bytes: OwnedBytes = footer_len_slice.read_bytes()?;
let index_offset = u64::deserialize(&mut footer_len_bytes)?;
let num_terms = u64::deserialize(&mut footer_len_bytes)?;
let (sstable_slice, index_slice) = main_slice.split(index_offset as usize);
// dbg!(index_slice.len());
let sstable_index_bytes = index_slice.read_bytes()?;
let sstable_index = SSTableIndex::load(sstable_index_bytes.as_slice());
// dbg!(&sstable_index);
Ok(TermDictionary {
sstable_slice,
sstable_index,
num_terms,
})
}
pub fn from_bytes(owned_bytes: OwnedBytes) -> crate::Result<TermDictionary> {
TermDictionary::open(FileSlice::new(Box::new(owned_bytes)))
}
/// Creates an empty term dictionary which contains no terms.
pub fn empty() -> Self {
TermDictionary::open(EMPTY_TERM_DICT_FILE.clone()).unwrap()
}
/// Returns the number of terms in the dictionary.
/// Term ordinals range from 0 to `num_terms() - 1`.
pub fn num_terms(&self) -> usize {
self.num_terms as usize
}
/// Returns the ordinal associated to a given term.
pub fn term_ord<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TermOrdinal>> {
let mut term_ord = 0u64;
let key_bytes = key.as_ref();
let mut sstable_reader = self.sstable_reader()?;
while sstable_reader.advance().unwrap_or(false) {
if sstable_reader.key() == key_bytes {
return Ok(Some(term_ord));
}
term_ord += 1;
}
Ok(None)
}
/// Returns the term associated to a given term ordinal.
///
/// Term ordinals are defined as the position of the term in
/// the sorted list of terms.
///
/// Returns true iff the term has been found.
///
/// Regardless of whether the term is found or not,
/// the buffer may be modified.
pub fn ord_to_term(&self, ord: TermOrdinal, bytes: &mut Vec<u8>) -> io::Result<bool> {
let mut sstable_reader = self.sstable_reader()?;
bytes.clear();
for _ in 0..(ord + 1) {
if !sstable_reader.advance().unwrap_or(false) {
return Ok(false);
}
}
bytes.extend_from_slice(sstable_reader.key());
Ok(true)
}
/// Returns the number of terms in the dictionary.
pub fn term_info_from_ord(&self, term_ord: TermOrdinal) -> io::Result<TermInfo> {
let mut sstable_reader = self.sstable_reader()?;
for _ in 0..(term_ord + 1) {
if !sstable_reader.advance().unwrap_or(false) {
return Ok(TermInfo::default());
}
}
Ok(sstable_reader.value().clone())
}
/// Lookups the value corresponding to the key.
pub fn get<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TermInfo>> {
if let Some(block_addr) = self.sstable_index.search(key.as_ref()) {
let mut sstable_reader = self.sstable_reader_block(block_addr)?;
let key_bytes = key.as_ref();
while sstable_reader.advance().unwrap_or(false) {
if sstable_reader.key() == key_bytes {
let term_info = sstable_reader.value().clone();
return Ok(Some(term_info));
}
}
}
Ok(None)
}
// Returns a range builder, to stream all of the terms
// within an interval.
pub fn range(&self) -> TermStreamerBuilder<'_> {
TermStreamerBuilder::new(self, AlwaysMatch)
}
// A stream of all the sorted terms. [See also `.stream_field()`](#method.stream_field)
pub fn stream(&self) -> io::Result<TermStreamer<'_>> {
self.range().into_stream()
}
// Returns a search builder, to stream all of the terms
// within the Automaton
pub fn search<'a, A: Automaton + 'a>(&'a self, automaton: A) -> TermStreamerBuilder<'a, A>
where
A::State: Clone,
{
TermStreamerBuilder::<A>::new(self, automaton)
}
}

View File

@@ -1,5 +1,3 @@
use std::io;
use super::TermDictionary;
use crate::postings::TermInfo;
use crate::termdict::TermOrdinal;
@@ -61,14 +59,14 @@ where
/// Creates the stream corresponding to the range
/// of terms defined using the `TermStreamerBuilder`.
pub fn into_stream(self) -> io::Result<TermStreamer<'a, A>> {
Ok(TermStreamer {
pub fn into_stream(self) -> TermStreamer<'a, A> {
TermStreamer {
fst_map: self.fst_map,
stream: self.stream_builder.into_stream(),
term_ord: 0u64,
current_key: Vec::with_capacity(100),
current_value: TermInfo::default(),
})
}
}
}

View File

@@ -55,32 +55,22 @@ impl TermInfoBlockMeta {
self.doc_freq_nbits + self.postings_offset_nbits + self.positions_idx_nbits
}
// Here inner_offset is the offset within the block, WITHOUT the first term_info.
// In other word, term_info #1,#2,#3 gets inner_offset 0,1,2... While term_info #0
// is encoded without bitpacking.
fn deserialize_term_info(&self, data: &[u8], inner_offset: usize) -> TermInfo {
assert!(inner_offset < BLOCK_LEN - 1);
let num_bits = self.num_bits() as usize;
let mut cursor = num_bits * inner_offset;
let posting_start_addr = num_bits * inner_offset;
// the stop offset is the start offset of the next term info.
let posting_stop_addr = posting_start_addr + num_bits;
let doc_freq_addr = posting_start_addr + self.postings_offset_nbits as usize;
let positions_idx_addr = doc_freq_addr + self.doc_freq_nbits as usize;
let doc_freq = extract_bits(data, cursor, self.doc_freq_nbits) as u32;
cursor += self.doc_freq_nbits as usize;
let postings_start_offset = self.ref_term_info.postings_start_offset
+ extract_bits(data, posting_start_addr, self.postings_offset_nbits);
let postings_stop_offset = self.ref_term_info.postings_start_offset
+ extract_bits(data, posting_stop_addr, self.postings_offset_nbits);
let doc_freq = extract_bits(data, doc_freq_addr, self.doc_freq_nbits) as u32;
let positions_idx = self.ref_term_info.positions_idx
+ extract_bits(data, positions_idx_addr, self.positions_idx_nbits);
let postings_offset = extract_bits(data, cursor, self.postings_offset_nbits);
cursor += self.postings_offset_nbits as usize;
let positions_idx = extract_bits(data, cursor, self.positions_idx_nbits);
TermInfo {
doc_freq,
postings_start_offset,
postings_stop_offset,
positions_idx,
postings_offset: postings_offset + self.ref_term_info.postings_offset,
positions_idx: positions_idx + self.ref_term_info.positions_idx,
}
}
}
@@ -162,17 +152,16 @@ fn bitpack_serialize<W: Write>(
term_info_block_meta: &TermInfoBlockMeta,
term_info: &TermInfo,
) -> io::Result<()> {
bit_packer.write(
term_info.postings_start_offset,
term_info_block_meta.postings_offset_nbits,
write,
)?;
bit_packer.write(
u64::from(term_info.doc_freq),
term_info_block_meta.doc_freq_nbits,
write,
)?;
bit_packer.write(
term_info.postings_offset,
term_info_block_meta.postings_offset_nbits,
write,
)?;
bit_packer.write(
term_info.positions_idx,
term_info_block_meta.positions_idx_nbits,
@@ -192,27 +181,23 @@ impl TermInfoStoreWriter {
}
fn flush_block(&mut self) -> io::Result<()> {
if self.term_infos.is_empty() {
return Ok(());
}
let mut bit_packer = BitPacker::new();
let ref_term_info = self.term_infos[0].clone();
let last_term_info = if let Some(last_term_info) = self.term_infos.last().cloned() {
last_term_info
} else {
return Ok(());
};
let postings_stop_offset =
last_term_info.postings_stop_offset - ref_term_info.postings_start_offset;
for term_info in &mut self.term_infos[1..] {
term_info.postings_start_offset -= ref_term_info.postings_start_offset;
term_info.postings_offset -= ref_term_info.postings_offset;
term_info.positions_idx -= ref_term_info.positions_idx;
}
let mut max_doc_freq: u32 = 0u32;
let max_postings_offset: u64 = postings_stop_offset;
let max_positions_idx: u64 = last_term_info.positions_idx;
let mut max_postings_offset: u64 = 0u64;
let mut max_positions_idx: u64 = 0u64;
for term_info in &self.term_infos[1..] {
max_doc_freq = cmp::max(max_doc_freq, term_info.doc_freq);
max_postings_offset = cmp::max(max_postings_offset, term_info.postings_offset);
max_positions_idx = cmp::max(max_positions_idx, term_info.positions_idx);
}
let max_doc_freq_nbits: u8 = compute_num_bits(u64::from(max_doc_freq));
@@ -237,12 +222,6 @@ impl TermInfoStoreWriter {
)?;
}
bit_packer.write(
postings_stop_offset,
term_info_block_meta.postings_offset_nbits,
&mut self.buffer_term_infos,
)?;
// Block need end up at the end of a byte.
bit_packer.flush(&mut self.buffer_term_infos)?;
self.term_infos.clear();
@@ -251,7 +230,6 @@ impl TermInfoStoreWriter {
}
pub fn write_term_info(&mut self, term_info: &TermInfo) -> io::Result<()> {
assert!(term_info.postings_stop_offset >= term_info.postings_start_offset);
self.num_terms += 1u64;
self.term_infos.push(term_info.clone());
if self.term_infos.len() >= BLOCK_LEN {
@@ -311,11 +289,10 @@ mod tests {
#[test]
fn test_term_info_block_meta_serialization() {
let term_info_block_meta = TermInfoBlockMeta {
offset: 2009u64,
offset: 2009,
ref_term_info: TermInfo {
doc_freq: 512,
postings_start_offset: 51,
postings_stop_offset: 57u64,
postings_offset: 51,
positions_idx: 3584,
},
doc_freq_nbits: 10,
@@ -333,12 +310,10 @@ mod tests {
fn test_pack() -> crate::Result<()> {
let mut store_writer = TermInfoStoreWriter::new();
let mut term_infos = vec![];
let offset = |i| (i * 13 + i * i) as u64;
for i in 0..1000 {
let term_info = TermInfo {
doc_freq: i as u32,
postings_start_offset: offset(i),
postings_stop_offset: offset(i + 1),
postings_offset: (i / 10) as u64,
positions_idx: (i * 7) as u64,
};
store_writer.write_term_info(&term_info)?;
@@ -348,12 +323,7 @@ mod tests {
store_writer.serialize(&mut buffer)?;
let term_info_store = TermInfoStore::open(FileSlice::from(buffer))?;
for i in 0..1000 {
assert_eq!(
term_info_store.get(i as u64),
term_infos[i],
"term info {}",
i
);
assert_eq!(term_info_store.get(i as u64), term_infos[i]);
}
Ok(())
}

View File

@@ -80,6 +80,7 @@ where
.serialize(&mut counting_writer)?;
let footer_size = counting_writer.written_bytes();
(footer_size as u64).serialize(&mut counting_writer)?;
counting_writer.flush()?;
}
Ok(file)
}
@@ -138,8 +139,8 @@ impl TermDictionary {
}
/// Returns the ordinal associated to a given term.
pub fn term_ord<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TermOrdinal>> {
Ok(self.fst_index.get(key))
pub fn term_ord<K: AsRef<[u8]>>(&self, key: K) -> Option<TermOrdinal> {
self.fst_index.get(key)
}
/// Returns the term associated to a given term ordinal.
@@ -151,7 +152,7 @@ impl TermDictionary {
///
/// Regardless of whether the term is found or not,
/// the buffer may be modified.
pub fn ord_to_term(&self, mut ord: TermOrdinal, bytes: &mut Vec<u8>) -> io::Result<bool> {
pub fn ord_to_term(&self, mut ord: TermOrdinal, bytes: &mut Vec<u8>) -> bool {
bytes.clear();
let fst = self.fst_index.as_fst();
let mut node = fst.root();
@@ -166,10 +167,10 @@ impl TermDictionary {
let new_node_addr = transition.addr;
node = fst.node(new_node_addr);
} else {
return Ok(false);
return false;
}
}
Ok(true)
true
}
/// Returns the number of terms in the dictionary.
@@ -178,10 +179,9 @@ impl TermDictionary {
}
/// Lookups the value corresponding to the key.
pub fn get<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TermInfo>> {
Ok(self
.term_ord(key)?
.map(|term_ord| self.term_info_from_ord(term_ord)))
pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Option<TermInfo> {
self.term_ord(key)
.map(|term_ord| self.term_info_from_ord(term_ord))
}
/// Returns a range builder, to stream all of the terms
@@ -191,7 +191,7 @@ impl TermDictionary {
}
/// A stream of all the sorted terms. [See also `.stream_field()`](#method.stream_field)
pub fn stream(&self) -> io::Result<TermStreamer<'_>> {
pub fn stream(&self) -> TermStreamer<'_> {
self.range().into_stream()
}

View File

@@ -1,393 +0,0 @@
use super::{TermDictionary, TermDictionaryBuilder, TermStreamer};
use crate::directory::{Directory, FileSlice, RAMDirectory, TerminatingWrite};
use crate::postings::TermInfo;
use std::path::PathBuf;
use std::str;
const BLOCK_SIZE: usize = 1_500;
fn make_term_info(term_ord: u64) -> TermInfo {
let offset = |term_ord: u64| term_ord * 100 + term_ord * term_ord;
TermInfo {
doc_freq: term_ord as u32,
postings_start_offset: offset(term_ord),
postings_stop_offset: offset(term_ord + 1),
positions_idx: offset(term_ord) * 2u64,
}
}
#[test]
fn test_empty_term_dictionary() {
let empty = TermDictionary::empty();
assert!(empty.stream().unwrap().next().is_none());
}
#[test]
fn test_term_ordinals() -> crate::Result<()> {
const COUNTRIES: [&'static str; 7] = [
"San Marino",
"Serbia",
"Slovakia",
"Slovenia",
"Spain",
"Sweden",
"Switzerland",
];
let directory = RAMDirectory::create();
let path = PathBuf::from("TermDictionary");
{
let write = directory.open_write(&path)?;
let mut term_dictionary_builder = TermDictionaryBuilder::create(write)?;
for term in COUNTRIES.iter() {
term_dictionary_builder.insert(term.as_bytes(), &make_term_info(0u64))?;
}
term_dictionary_builder.finish()?.terminate()?;
}
let term_file = directory.open_read(&path)?;
let term_dict: TermDictionary = TermDictionary::open(term_file)?;
for (term_ord, term) in COUNTRIES.iter().enumerate() {
assert_eq!(term_dict.term_ord(term)?, Some(term_ord as u64));
let mut bytes = vec![];
assert!(term_dict.ord_to_term(term_ord as u64, &mut bytes)?);
assert_eq!(bytes, term.as_bytes());
}
Ok(())
}
#[test]
fn test_term_dictionary_simple() -> crate::Result<()> {
let directory = RAMDirectory::create();
let path = PathBuf::from("TermDictionary");
{
let write = directory.open_write(&path)?;
let mut term_dictionary_builder = TermDictionaryBuilder::create(write)?;
term_dictionary_builder.insert("abc".as_bytes(), &make_term_info(34u64))?;
term_dictionary_builder.insert("abcd".as_bytes(), &make_term_info(346u64))?;
term_dictionary_builder.finish()?.terminate()?;
}
let file = directory.open_read(&path)?;
let term_dict: TermDictionary = TermDictionary::open(file)?;
assert_eq!(term_dict.get("abc")?.unwrap().doc_freq, 34u32);
assert_eq!(term_dict.get("abcd")?.unwrap().doc_freq, 346u32);
let mut stream = term_dict.stream()?;
{
{
let (k, v) = stream.next().unwrap();
assert_eq!(k.as_ref(), "abc".as_bytes());
assert_eq!(v.doc_freq, 34u32);
}
assert_eq!(stream.key(), "abc".as_bytes());
assert_eq!(stream.value().doc_freq, 34u32);
}
{
{
let (k, v) = stream.next().unwrap();
assert_eq!(k, "abcd".as_bytes());
assert_eq!(v.doc_freq, 346u32);
}
assert_eq!(stream.key(), "abcd".as_bytes());
assert_eq!(stream.value().doc_freq, 346u32);
}
assert!(!stream.advance());
Ok(())
}
#[test]
fn test_term_dictionary_stream() -> crate::Result<()> {
let ids: Vec<_> = (0u32..10_000u32)
.map(|i| (format!("doc{:0>6}", i), i))
.collect();
let buffer: Vec<u8> = {
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
for &(ref id, ref i) in &ids {
term_dictionary_builder
.insert(id.as_bytes(), &make_term_info(*i as u64))
.unwrap();
}
term_dictionary_builder.finish()?
};
let term_file = FileSlice::from(buffer);
let term_dictionary: TermDictionary = TermDictionary::open(term_file)?;
{
let mut streamer = term_dictionary.stream()?;
let mut i = 0;
while let Some((streamer_k, streamer_v)) = streamer.next() {
let &(ref key, ref v) = &ids[i];
assert_eq!(streamer_k.as_ref(), key.as_bytes());
assert_eq!(streamer_v, &make_term_info(*v as u64));
i += 1;
}
}
let &(ref key, ref val) = &ids[2047];
assert_eq!(
term_dictionary.get(key.as_bytes())?,
Some(make_term_info(*val as u64))
);
Ok(())
}
#[test]
fn test_stream_high_range_prefix_suffix() -> crate::Result<()> {
let buffer: Vec<u8> = {
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
// term requires more than 16bits
term_dictionary_builder.insert("abcdefghijklmnopqrstuvwxy", &make_term_info(1))?;
term_dictionary_builder.insert("abcdefghijklmnopqrstuvwxyz", &make_term_info(2))?;
term_dictionary_builder.insert("abr", &make_term_info(3))?;
term_dictionary_builder.finish()?
};
let term_dict_file = FileSlice::from(buffer);
let term_dictionary: TermDictionary = TermDictionary::open(term_dict_file)?;
let mut kv_stream = term_dictionary.stream()?;
assert!(kv_stream.advance());
assert_eq!(kv_stream.key(), "abcdefghijklmnopqrstuvwxy".as_bytes());
assert_eq!(kv_stream.value(), &make_term_info(1));
assert!(kv_stream.advance());
assert_eq!(kv_stream.key(), "abcdefghijklmnopqrstuvwxyz".as_bytes());
assert_eq!(kv_stream.value(), &make_term_info(2));
assert!(kv_stream.advance());
assert_eq!(kv_stream.key(), "abr".as_bytes());
assert_eq!(kv_stream.value(), &make_term_info(3));
assert!(!kv_stream.advance());
Ok(())
}
#[test]
fn test_stream_range() -> crate::Result<()> {
let ids: Vec<_> = (0u32..10_000u32)
.map(|i| (format!("doc{:0>6}", i), i))
.collect();
let buffer: Vec<u8> = {
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
for &(ref id, ref i) in &ids {
term_dictionary_builder
.insert(id.as_bytes(), &make_term_info(*i as u64))
.unwrap();
}
term_dictionary_builder.finish()?
};
let file = FileSlice::from(buffer);
let term_dictionary: TermDictionary = TermDictionary::open(file)?;
{
for i in (0..20).chain(6000..8_000) {
let &(ref target_key, _) = &ids[i];
let mut streamer = term_dictionary
.range()
.ge(target_key.as_bytes())
.into_stream()?;
for j in 0..3 {
let (streamer_k, streamer_v) = streamer.next().unwrap();
let &(ref key, ref v) = &ids[i + j];
assert_eq!(str::from_utf8(streamer_k.as_ref()).unwrap(), key);
assert_eq!(streamer_v.doc_freq, *v);
assert_eq!(streamer_v, &make_term_info(*v as u64));
}
}
}
{
for i in (0..20).chain(BLOCK_SIZE - 10..BLOCK_SIZE + 10) {
let &(ref target_key, _) = &ids[i];
let mut streamer = term_dictionary
.range()
.gt(target_key.as_bytes())
.into_stream()?;
for j in 0..3 {
let (streamer_k, streamer_v) = streamer.next().unwrap();
let &(ref key, ref v) = &ids[i + j + 1];
assert_eq!(streamer_k.as_ref(), key.as_bytes());
assert_eq!(streamer_v.doc_freq, *v);
}
}
}
{
for i in (0..20).chain(BLOCK_SIZE - 10..BLOCK_SIZE + 10) {
for j in 0..3 {
let &(ref fst_key, _) = &ids[i];
let &(ref last_key, _) = &ids[i + j];
let mut streamer = term_dictionary
.range()
.ge(fst_key.as_bytes())
.lt(last_key.as_bytes())
.into_stream()?;
for _ in 0..j {
assert!(streamer.next().is_some());
}
assert!(streamer.next().is_none());
}
}
}
Ok(())
}
#[test]
fn test_empty_string() -> crate::Result<()> {
let buffer: Vec<u8> = {
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
term_dictionary_builder
.insert(&[], &make_term_info(1 as u64))
.unwrap();
term_dictionary_builder
.insert(&[1u8], &make_term_info(2 as u64))
.unwrap();
term_dictionary_builder.finish()?
};
let file = FileSlice::from(buffer);
let term_dictionary: TermDictionary = TermDictionary::open(file)?;
let mut stream = term_dictionary.stream()?;
assert!(stream.advance());
assert!(stream.key().is_empty());
assert!(stream.advance());
assert_eq!(stream.key(), &[1u8]);
assert!(!stream.advance());
Ok(())
}
#[test]
fn test_stream_range_boundaries() -> crate::Result<()> {
let buffer: Vec<u8> = {
let mut term_dictionary_builder = TermDictionaryBuilder::create(Vec::new())?;
for i in 0u8..10u8 {
let number_arr = [i; 1];
term_dictionary_builder.insert(&number_arr, &make_term_info(i as u64))?;
}
term_dictionary_builder.finish()?
};
let file = FileSlice::from(buffer);
let term_dictionary: TermDictionary = TermDictionary::open(file)?;
let value_list = |mut streamer: TermStreamer<'_>, backwards: bool| {
let mut res: Vec<u32> = vec![];
while let Some((_, ref v)) = streamer.next() {
res.push(v.doc_freq);
}
if backwards {
res.reverse();
}
res
};
{
let range = term_dictionary.range().backward().into_stream()?;
assert_eq!(
value_list(range, true),
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
);
}
{
let range = term_dictionary.range().ge([2u8]).into_stream()?;
assert_eq!(
value_list(range, false),
vec![2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
);
}
{
let range = term_dictionary.range().ge([2u8]).backward().into_stream()?;
assert_eq!(
value_list(range, true),
vec![2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
);
}
{
let range = term_dictionary.range().gt([2u8]).into_stream()?;
assert_eq!(
value_list(range, false),
vec![3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
);
}
{
let range = term_dictionary.range().gt([2u8]).backward().into_stream()?;
assert_eq!(
value_list(range, true),
vec![3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
);
}
{
let range = term_dictionary.range().lt([6u8]).into_stream()?;
assert_eq!(
value_list(range, false),
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32]
);
}
{
let range = term_dictionary.range().lt([6u8]).backward().into_stream()?;
assert_eq!(
value_list(range, true),
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32]
);
}
{
let range = term_dictionary.range().le([6u8]).into_stream()?;
assert_eq!(
value_list(range, false),
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32]
);
}
{
let range = term_dictionary.range().le([6u8]).backward().into_stream()?;
assert_eq!(
value_list(range, true),
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32]
);
}
{
let range = term_dictionary.range().ge([0u8]).lt([5u8]).into_stream()?;
assert_eq!(value_list(range, false), vec![0u32, 1u32, 2u32, 3u32, 4u32]);
}
{
let range = term_dictionary
.range()
.ge([0u8])
.lt([5u8])
.backward()
.into_stream()?;
assert_eq!(value_list(range, true), vec![0u32, 1u32, 2u32, 3u32, 4u32]);
}
Ok(())
}
#[test]
fn test_automaton_search() -> crate::Result<()> {
use crate::query::DFAWrapper;
use levenshtein_automata::LevenshteinAutomatonBuilder;
const COUNTRIES: [&'static str; 7] = [
"San Marino",
"Serbia",
"Slovakia",
"Slovenia",
"Spain",
"Sweden",
"Switzerland",
];
let directory = RAMDirectory::create();
let path = PathBuf::from("TermDictionary");
{
let write = directory.open_write(&path)?;
let mut term_dictionary_builder = TermDictionaryBuilder::create(write)?;
for term in COUNTRIES.iter() {
term_dictionary_builder.insert(term.as_bytes(), &make_term_info(0u64))?;
}
term_dictionary_builder.finish()?.terminate()?;
}
let file = directory.open_read(&path)?;
let term_dict: TermDictionary = TermDictionary::open(file)?;
// We can now build an entire dfa.
let lev_automaton_builder = LevenshteinAutomatonBuilder::new(2, true);
let automaton = DFAWrapper(lev_automaton_builder.build_dfa("Spaen"));
let mut range = term_dict.search(automaton).into_stream()?;
// get the first finding
assert!(range.advance());
assert_eq!("Spain".as_bytes(), range.key());
assert!(!range.advance());
Ok(())
}

View File

@@ -18,7 +18,7 @@ fn test_failpoints_managed_directory_gc_if_delete_fails() {
.unwrap()
.terminate()
.unwrap();
assert!(managed_directory.exists(test_path).unwrap());
assert!(managed_directory.exists(test_path));
// triggering gc and setting the delete operation to fail.
//
// We are checking that the gc operation is not removing the
@@ -29,12 +29,12 @@ fn test_failpoints_managed_directory_gc_if_delete_fails() {
// lock file.
fail::cfg("RAMDirectory::delete", "1*off->1*return").unwrap();
assert!(managed_directory.garbage_collect(Default::default).is_ok());
assert!(managed_directory.exists(test_path).unwrap());
assert!(managed_directory.exists(test_path));
// running the gc a second time should remove the file.
assert!(managed_directory.garbage_collect(Default::default).is_ok());
assert!(
!managed_directory.exists(test_path).unwrap(),
!managed_directory.exists(test_path),
"The file should have been deleted"
);
}