mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2025-12-27 20:42:54 +00:00
Compare commits
20 Commits
agg_format
...
barrotstei
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c5d50e2138 | ||
|
|
af2f067b69 | ||
|
|
c056140d81 | ||
|
|
c6204f49d3 | ||
|
|
738a1a0188 | ||
|
|
294e8b4659 | ||
|
|
2f342257d3 | ||
|
|
e9e2984ac2 | ||
|
|
e1f9271be3 | ||
|
|
883eb92df9 | ||
|
|
590654ceb8 | ||
|
|
7367eb5455 | ||
|
|
e22330c7e0 | ||
|
|
ad4c2be21b | ||
|
|
412c83c336 | ||
|
|
3006db17c1 | ||
|
|
b4fc185dc5 | ||
|
|
20a314093f | ||
|
|
9b3eb59e9b | ||
|
|
6d33ae307a |
@@ -61,7 +61,7 @@ fn main() -> tantivy::Result<()> {
|
||||
|
||||
let query_ords: HashSet<u64> = facets
|
||||
.iter()
|
||||
.filter_map(|key| facet_dict.term_ord(key.encoded_str()))
|
||||
.filter_map(|key| facet_dict.term_ord(key.encoded_str()).unwrap())
|
||||
.collect();
|
||||
|
||||
let mut facet_ords_buffer: Vec<u64> = Vec::with_capacity(20);
|
||||
|
||||
@@ -274,7 +274,7 @@ impl Collector for FacetCollector {
|
||||
let mut collapse_facet_it = self.facets.iter().peekable();
|
||||
collapse_facet_ords.push(0);
|
||||
{
|
||||
let mut facet_streamer = facet_reader.facet_dict().range().into_stream();
|
||||
let mut facet_streamer = facet_reader.facet_dict().range().into_stream()?;
|
||||
if facet_streamer.advance() {
|
||||
'outer: loop {
|
||||
// at the begining of this loop, facet_streamer
|
||||
@@ -368,9 +368,12 @@ impl SegmentCollector for FacetSegmentCollector {
|
||||
}
|
||||
let mut facet = vec![];
|
||||
let facet_ord = self.collapse_facet_ords[collapsed_facet_ord];
|
||||
facet_dict.ord_to_term(facet_ord as u64, &mut facet);
|
||||
// TODO
|
||||
facet_counts.insert(Facet::from_encoded(facet).unwrap(), count);
|
||||
// TODO handle errors.
|
||||
if facet_dict.ord_to_term(facet_ord as u64, &mut facet).is_ok() {
|
||||
if let Ok(facet) = Facet::from_encoded(facet) {
|
||||
facet_counts.insert(facet, count);
|
||||
}
|
||||
}
|
||||
}
|
||||
FacetCounts { facet_counts }
|
||||
}
|
||||
|
||||
@@ -144,22 +144,19 @@ where
|
||||
))
|
||||
};
|
||||
let fast_fields = segment_reader.fast_fields();
|
||||
let fast_filed_reader: crate::Result<FastFieldReader<TPredicateValue>> = match schema_type {
|
||||
crate::schema::Type::U64 => {fast_fields.u64(self.field).ok_or_else(err_closure)}
|
||||
crate::schema::Type::I64 => {fast_fields.i64(self.field).ok_or_else(err_closure)}
|
||||
crate::schema::Type::F64 => {fast_fields.f64(self.field).ok_or_else(err_closure)}
|
||||
crate::schema::Type::Date => {fast_fields.date(self.field).ok_or_else(err_closure)}
|
||||
crate::schema::Type::Bytes => {fast_fields.bytes(self.field).ok_or_else(err_closure)}
|
||||
crate::schema::Type::Str | crate::schema::Type::HierarchicalFacet => {Err(TantivyError::SchemaError(format!("Field {:?} uses an unsupported type", segment_reader.schema().get_field_name(self.field))))}
|
||||
};
|
||||
|
||||
let fast_value_type = TPredicateValue::to_type();
|
||||
// TODO do a runtime check of `fast_value_type` against the schema.
|
||||
|
||||
let fast_field_reader_opt = fast_fields.typed_fast_field_reader(self.field);
|
||||
let fast_field_reader = fast_field_reader_opt
|
||||
.ok_or_else(|| TantivyError::SchemaError(format!("{:?} is not declared as a fast field in the schema.", self.field)))?;
|
||||
let segment_collector = self
|
||||
.collector
|
||||
.for_segment(segment_local_id, segment_reader)?;
|
||||
|
||||
let a = fast_filed_reader?;
|
||||
Ok(FilterSegmentCollector {
|
||||
fast_field_reader: a,
|
||||
fast_field_reader ,
|
||||
segment_collector: segment_collector,
|
||||
predicate: self.predicate,
|
||||
t_predicate_value: PhantomData,
|
||||
|
||||
@@ -728,7 +728,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_top_collector_not_at_capacity() {
|
||||
fn test_top_collector_not_at_capacity_without_offset() {
|
||||
let index = make_index();
|
||||
let field = index.schema().get_field("text").unwrap();
|
||||
let query_parser = QueryParser::for_index(&index, vec![field]);
|
||||
|
||||
@@ -20,9 +20,10 @@ impl<W: Write> CountingWriter<W> {
|
||||
self.written_bytes
|
||||
}
|
||||
|
||||
pub fn finish(mut self) -> io::Result<(W, u64)> {
|
||||
self.flush()?;
|
||||
Ok((self.underlying, self.written_bytes))
|
||||
/// Returns the underlying write object.
|
||||
/// Note that this method does not trigger any flushing.
|
||||
pub fn finish(self) -> W {
|
||||
self.underlying
|
||||
}
|
||||
}
|
||||
|
||||
@@ -46,7 +47,6 @@ impl<W: Write> Write for CountingWriter<W> {
|
||||
|
||||
impl<W: TerminatingWrite> TerminatingWrite for CountingWriter<W> {
|
||||
fn terminate_ref(&mut self, token: AntiCallToken) -> io::Result<()> {
|
||||
self.flush()?;
|
||||
self.underlying.terminate_ref(token)
|
||||
}
|
||||
}
|
||||
@@ -63,8 +63,9 @@ mod test {
|
||||
let mut counting_writer = CountingWriter::wrap(buffer);
|
||||
let bytes = (0u8..10u8).collect::<Vec<u8>>();
|
||||
counting_writer.write_all(&bytes).unwrap();
|
||||
let (w, len): (Vec<u8>, u64) = counting_writer.finish().unwrap();
|
||||
let len = counting_writer.written_bytes();
|
||||
let buffer_restituted: Vec<u8> = counting_writer.finish();
|
||||
assert_eq!(len, 10u64);
|
||||
assert_eq!(w.len(), 10);
|
||||
assert_eq!(buffer_restituted.len(), 10);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -511,28 +511,28 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_index_manual_policy_mmap() {
|
||||
fn test_index_manual_policy_mmap() -> crate::Result<()> {
|
||||
let schema = throw_away_schema();
|
||||
let field = schema.get_field("num_likes").unwrap();
|
||||
let mut index = Index::create_from_tempdir(schema).unwrap();
|
||||
let mut writer = index.writer_for_tests().unwrap();
|
||||
writer.commit().unwrap();
|
||||
let mut index = Index::create_from_tempdir(schema)?;
|
||||
let mut writer = index.writer_for_tests()?;
|
||||
writer.commit()?;
|
||||
let reader = index
|
||||
.reader_builder()
|
||||
.reload_policy(ReloadPolicy::Manual)
|
||||
.try_into()
|
||||
.unwrap();
|
||||
.try_into()?;
|
||||
assert_eq!(reader.searcher().num_docs(), 0);
|
||||
writer.add_document(doc!(field=>1u64));
|
||||
let (sender, receiver) = crossbeam::channel::unbounded();
|
||||
let _handle = index.directory_mut().watch(WatchCallback::new(move || {
|
||||
let _ = sender.send(());
|
||||
}));
|
||||
writer.commit().unwrap();
|
||||
writer.commit()?;
|
||||
assert!(receiver.recv().is_ok());
|
||||
assert_eq!(reader.searcher().num_docs(), 0);
|
||||
reader.reload().unwrap();
|
||||
reader.reload()?;
|
||||
assert_eq!(reader.searcher().num_docs(), 1);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -66,7 +66,7 @@ impl InvertedIndexReader {
|
||||
}
|
||||
|
||||
/// Returns the term info associated with the term.
|
||||
pub fn get_term_info(&self, term: &Term) -> Option<TermInfo> {
|
||||
pub fn get_term_info(&self, term: &Term) -> io::Result<Option<TermInfo>> {
|
||||
self.termdict.get(term.value_bytes())
|
||||
}
|
||||
|
||||
@@ -106,10 +106,9 @@ impl InvertedIndexReader {
|
||||
term: &Term,
|
||||
option: IndexRecordOption,
|
||||
) -> io::Result<Option<BlockSegmentPostings>> {
|
||||
Ok(self
|
||||
.get_term_info(term)
|
||||
self.get_term_info(term)?
|
||||
.map(move |term_info| self.read_block_postings_from_terminfo(&term_info, option))
|
||||
.transpose()?)
|
||||
.transpose()
|
||||
}
|
||||
|
||||
/// Returns a block postings given a `term_info`.
|
||||
@@ -181,7 +180,7 @@ impl InvertedIndexReader {
|
||||
term: &Term,
|
||||
option: IndexRecordOption,
|
||||
) -> io::Result<Option<SegmentPostings>> {
|
||||
self.get_term_info(term)
|
||||
self.get_term_info(term)?
|
||||
.map(move |term_info| self.read_postings_from_terminfo(&term_info, option))
|
||||
.transpose()
|
||||
}
|
||||
@@ -191,7 +190,7 @@ impl InvertedIndexReader {
|
||||
term: &Term,
|
||||
option: IndexRecordOption,
|
||||
) -> io::Result<Option<SegmentPostings>> {
|
||||
self.get_term_info(term)
|
||||
self.get_term_info(term)?
|
||||
.map(|term_info| self.read_postings_from_terminfo(&term_info, option))
|
||||
.transpose()
|
||||
}
|
||||
@@ -199,7 +198,7 @@ impl InvertedIndexReader {
|
||||
/// Returns the number of documents containing the term.
|
||||
pub fn doc_freq(&self, term: &Term) -> io::Result<u32> {
|
||||
Ok(self
|
||||
.get_term_info(term)
|
||||
.get_term_info(term)?
|
||||
.map(|term_info| term_info.doc_freq)
|
||||
.unwrap_or(0u32))
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@ pub use self::executor::Executor;
|
||||
pub use self::index::Index;
|
||||
pub use self::index_meta::{IndexMeta, SegmentMeta, SegmentMetaInventory};
|
||||
pub use self::inverted_index_reader::InvertedIndexReader;
|
||||
pub use self::searcher::{FieldSearcher, Searcher};
|
||||
pub use self::searcher::Searcher;
|
||||
pub use self::segment::Segment;
|
||||
pub use self::segment::SerializableSegment;
|
||||
pub use self::segment_component::SegmentComponent;
|
||||
|
||||
@@ -1,17 +1,16 @@
|
||||
use crate::collector::Collector;
|
||||
use crate::core::Executor;
|
||||
use crate::core::InvertedIndexReader;
|
||||
|
||||
use crate::core::SegmentReader;
|
||||
use crate::query::Query;
|
||||
use crate::schema::Document;
|
||||
use crate::schema::Schema;
|
||||
use crate::schema::{Field, Term};
|
||||
use crate::schema::Term;
|
||||
use crate::space_usage::SearcherSpaceUsage;
|
||||
use crate::store::StoreReader;
|
||||
use crate::termdict::TermMerger;
|
||||
use crate::DocAddress;
|
||||
use crate::Index;
|
||||
use std::sync::Arc;
|
||||
|
||||
use std::{fmt, io};
|
||||
|
||||
/// Holds a list of `SegmentReader`s ready for search.
|
||||
@@ -148,16 +147,6 @@ impl Searcher {
|
||||
collector.merge_fruits(fruits)
|
||||
}
|
||||
|
||||
/// Return the field searcher associated to a `Field`.
|
||||
pub fn field(&self, field: Field) -> crate::Result<FieldSearcher> {
|
||||
let inv_index_readers: Vec<Arc<InvertedIndexReader>> = self
|
||||
.segment_readers
|
||||
.iter()
|
||||
.map(|segment_reader| segment_reader.inverted_index(field))
|
||||
.collect::<crate::Result<Vec<_>>>()?;
|
||||
Ok(FieldSearcher::new(inv_index_readers))
|
||||
}
|
||||
|
||||
/// Summarize total space usage of this searcher.
|
||||
pub fn space_usage(&self) -> io::Result<SearcherSpaceUsage> {
|
||||
let mut space_usage = SearcherSpaceUsage::new();
|
||||
@@ -168,32 +157,6 @@ impl Searcher {
|
||||
}
|
||||
}
|
||||
|
||||
/// **Experimental API** `FieldSearcher` only gives access to a stream over the terms of a field.
|
||||
pub struct FieldSearcher {
|
||||
inv_index_readers: Vec<Arc<InvertedIndexReader>>,
|
||||
}
|
||||
|
||||
impl FieldSearcher {
|
||||
fn new(inv_index_readers: Vec<Arc<InvertedIndexReader>>) -> FieldSearcher {
|
||||
FieldSearcher { inv_index_readers }
|
||||
}
|
||||
|
||||
/// Returns a Stream over all of the sorted unique terms of
|
||||
/// for the given field.
|
||||
///
|
||||
/// This method does not take into account which documents are deleted, so
|
||||
/// in presence of deletes some terms may not actually exist in any document
|
||||
/// anymore.
|
||||
pub fn terms(&self) -> TermMerger {
|
||||
let term_streamers: Vec<_> = self
|
||||
.inv_index_readers
|
||||
.iter()
|
||||
.map(|inverted_index| inverted_index.terms().stream())
|
||||
.collect();
|
||||
TermMerger::new(term_streamers)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for Searcher {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
let segment_ids = self
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
use crate::directory::directory_lock::Lock;
|
||||
use crate::directory::error::LockError;
|
||||
use crate::directory::error::{DeleteError, OpenReadError, OpenWriteError};
|
||||
use crate::directory::WatchCallback;
|
||||
use crate::directory::WatchHandle;
|
||||
use crate::directory::{FileHandle, WatchCallback};
|
||||
use crate::directory::{FileSlice, WritePtr};
|
||||
use std::fmt;
|
||||
use std::io;
|
||||
@@ -108,10 +108,13 @@ fn retry_policy(is_blocking: bool) -> RetryPolicy {
|
||||
/// should be your default choice.
|
||||
/// - The [`RAMDirectory`](struct.RAMDirectory.html), which
|
||||
/// should be used mostly for tests.
|
||||
///
|
||||
pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
|
||||
/// Opens a virtual file for read.
|
||||
/// Opens a file and returns a boxed `FileHandle`.
|
||||
///
|
||||
/// Users of `Directory` should typically call `Directory::open_read(...)`,
|
||||
/// while `Directory` implementor should implement `get_file_handle()`.
|
||||
fn get_file_handle(&self, path: &Path) -> Result<Box<dyn FileHandle>, OpenReadError>;
|
||||
|
||||
/// Once a virtual file is open, its data may not
|
||||
/// change.
|
||||
///
|
||||
@@ -119,7 +122,10 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
|
||||
/// have no effect on the returned `FileSlice` object.
|
||||
///
|
||||
/// You should only use this to read files create with [Directory::open_write].
|
||||
fn open_read(&self, path: &Path) -> Result<FileSlice, OpenReadError>;
|
||||
fn open_read(&self, path: &Path) -> Result<FileSlice, OpenReadError> {
|
||||
let file_handle = self.get_file_handle(path)?;
|
||||
Ok(FileSlice::new(file_handle))
|
||||
}
|
||||
|
||||
/// Removes a file
|
||||
///
|
||||
|
||||
@@ -58,7 +58,8 @@ pub enum OpenWriteError {
|
||||
}
|
||||
|
||||
impl OpenWriteError {
|
||||
pub(crate) fn wrap_io_error(io_error: io::Error, filepath: PathBuf) -> Self {
|
||||
/// Wraps an io error.
|
||||
pub fn wrap_io_error(io_error: io::Error, filepath: PathBuf) -> Self {
|
||||
Self::IOError { io_error, filepath }
|
||||
}
|
||||
}
|
||||
@@ -143,7 +144,8 @@ pub enum OpenReadError {
|
||||
}
|
||||
|
||||
impl OpenReadError {
|
||||
pub(crate) fn wrap_io_error(io_error: io::Error, filepath: PathBuf) -> Self {
|
||||
/// Wraps an io error.
|
||||
pub fn wrap_io_error(io_error: io::Error, filepath: PathBuf) -> Self {
|
||||
Self::IOError { io_error, filepath }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,10 +2,11 @@ use stable_deref_trait::StableDeref;
|
||||
|
||||
use crate::common::HasLen;
|
||||
use crate::directory::OwnedBytes;
|
||||
use std::sync::Arc;
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::{io, ops::Deref};
|
||||
|
||||
pub type BoxedData = Box<dyn Deref<Target = [u8]> + Send + Sync + 'static>;
|
||||
pub type ArcBytes = Arc<dyn Deref<Target = [u8]> + Send + Sync + 'static>;
|
||||
pub type WeakArcBytes = Weak<dyn Deref<Target = [u8]> + Send + Sync + 'static>;
|
||||
|
||||
/// Objects that represents files sections in tantivy.
|
||||
///
|
||||
@@ -40,7 +41,7 @@ where
|
||||
B: StableDeref + Deref<Target = [u8]> + 'static + Send + Sync,
|
||||
{
|
||||
fn from(bytes: B) -> FileSlice {
|
||||
FileSlice::new(OwnedBytes::new(bytes))
|
||||
FileSlice::new(Box::new(OwnedBytes::new(bytes)))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -50,22 +51,25 @@ where
|
||||
///
|
||||
#[derive(Clone)]
|
||||
pub struct FileSlice {
|
||||
data: Arc<Box<dyn FileHandle>>,
|
||||
data: Arc<dyn FileHandle>,
|
||||
start: usize,
|
||||
stop: usize,
|
||||
}
|
||||
|
||||
impl FileSlice {
|
||||
/// Wraps a FileHandle.
|
||||
pub fn new<D>(data: D) -> Self
|
||||
where
|
||||
D: FileHandle,
|
||||
{
|
||||
let len = data.len();
|
||||
pub fn new(file_handle: Box<dyn FileHandle>) -> Self {
|
||||
let num_bytes = file_handle.len();
|
||||
FileSlice::new_with_num_bytes(file_handle, num_bytes)
|
||||
}
|
||||
|
||||
/// Wraps a FileHandle.
|
||||
#[doc(hidden)]
|
||||
pub fn new_with_num_bytes(file_handle: Box<dyn FileHandle>, num_bytes: usize) -> Self {
|
||||
FileSlice {
|
||||
data: Arc::new(Box::new(data)),
|
||||
data: Arc::from(file_handle),
|
||||
start: 0,
|
||||
stop: len,
|
||||
stop: num_bytes,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -146,6 +150,12 @@ impl FileSlice {
|
||||
}
|
||||
}
|
||||
|
||||
impl FileHandle for FileSlice {
|
||||
fn read_bytes(&self, from: usize, to: usize) -> io::Result<OwnedBytes> {
|
||||
self.read_bytes_slice(from, to)
|
||||
}
|
||||
}
|
||||
|
||||
impl HasLen for FileSlice {
|
||||
fn len(&self) -> usize {
|
||||
self.stop - self.start
|
||||
@@ -160,7 +170,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_file_slice() -> io::Result<()> {
|
||||
let file_slice = FileSlice::new(b"abcdef".as_ref());
|
||||
let file_slice = FileSlice::new(Box::new(b"abcdef".as_ref()));
|
||||
assert_eq!(file_slice.len(), 6);
|
||||
assert_eq!(file_slice.slice_from(2).read_bytes()?.as_slice(), b"cdef");
|
||||
assert_eq!(file_slice.slice_to(2).read_bytes()?.as_slice(), b"ab");
|
||||
@@ -204,7 +214,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_slice_simple_read() -> io::Result<()> {
|
||||
let slice = FileSlice::new(&b"abcdef"[..]);
|
||||
let slice = FileSlice::new(Box::new(&b"abcdef"[..]));
|
||||
assert_eq!(slice.len(), 6);
|
||||
assert_eq!(slice.read_bytes()?.as_ref(), b"abcdef");
|
||||
assert_eq!(slice.slice(1, 4).read_bytes()?.as_ref(), b"bcd");
|
||||
@@ -213,7 +223,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_slice_read_slice() -> io::Result<()> {
|
||||
let slice_deref = FileSlice::new(&b"abcdef"[..]);
|
||||
let slice_deref = FileSlice::new(Box::new(&b"abcdef"[..]));
|
||||
assert_eq!(slice_deref.read_bytes_slice(1, 4)?.as_ref(), b"bcd");
|
||||
Ok(())
|
||||
}
|
||||
@@ -221,14 +231,14 @@ mod tests {
|
||||
#[test]
|
||||
#[should_panic(expected = "assertion failed: from <= to")]
|
||||
fn test_slice_read_slice_invalid_range() {
|
||||
let slice_deref = FileSlice::new(&b"abcdef"[..]);
|
||||
let slice_deref = FileSlice::new(Box::new(&b"abcdef"[..]));
|
||||
assert_eq!(slice_deref.read_bytes_slice(1, 0).unwrap().as_ref(), b"bcd");
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic(expected = "`to` exceeds the fileslice length")]
|
||||
fn test_slice_read_slice_invalid_range_exceeds() {
|
||||
let slice_deref = FileSlice::new(&b"abcdef"[..]);
|
||||
let slice_deref = FileSlice::new(Box::new(&b"abcdef"[..]));
|
||||
assert_eq!(
|
||||
slice_deref.read_bytes_slice(0, 10).unwrap().as_ref(),
|
||||
b"bcd"
|
||||
|
||||
@@ -3,7 +3,7 @@ use crc32fast::Hasher;
|
||||
use std::fs;
|
||||
use std::io;
|
||||
use std::io::BufRead;
|
||||
use std::path::PathBuf;
|
||||
use std::path::Path;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
@@ -13,15 +13,15 @@ pub const POLLING_INTERVAL: Duration = Duration::from_millis(if cfg!(test) { 1 }
|
||||
|
||||
// Watches a file and executes registered callbacks when the file is modified.
|
||||
pub struct FileWatcher {
|
||||
path: Arc<PathBuf>,
|
||||
path: Arc<Path>,
|
||||
callbacks: Arc<WatchCallbackList>,
|
||||
state: Arc<AtomicUsize>, // 0: new, 1: runnable, 2: terminated
|
||||
}
|
||||
|
||||
impl FileWatcher {
|
||||
pub fn new(path: &PathBuf) -> FileWatcher {
|
||||
pub fn new(path: &Path) -> FileWatcher {
|
||||
FileWatcher {
|
||||
path: Arc::new(path.clone()),
|
||||
path: Arc::from(path),
|
||||
callbacks: Default::default(),
|
||||
state: Default::default(),
|
||||
}
|
||||
@@ -63,7 +63,7 @@ impl FileWatcher {
|
||||
handle
|
||||
}
|
||||
|
||||
fn compute_checksum(path: &PathBuf) -> Result<u32, io::Error> {
|
||||
fn compute_checksum(path: &Path) -> Result<u32, io::Error> {
|
||||
let reader = match fs::File::open(path) {
|
||||
Ok(f) => io::BufReader::new(f),
|
||||
Err(e) => {
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
use crate::core::{MANAGED_FILEPATH, META_FILEPATH};
|
||||
use crate::directory::error::{DeleteError, LockError, OpenReadError, OpenWriteError};
|
||||
use crate::directory::footer::{Footer, FooterProxy};
|
||||
use crate::directory::DirectoryLock;
|
||||
use crate::directory::GarbageCollectionResult;
|
||||
use crate::directory::Lock;
|
||||
use crate::directory::META_LOCK;
|
||||
use crate::directory::{DirectoryLock, FileHandle};
|
||||
use crate::directory::{FileSlice, WritePtr};
|
||||
use crate::directory::{WatchCallback, WatchHandle};
|
||||
use crate::error::DataCorruption;
|
||||
@@ -274,6 +274,11 @@ impl ManagedDirectory {
|
||||
}
|
||||
|
||||
impl Directory for ManagedDirectory {
|
||||
fn get_file_handle(&self, path: &Path) -> Result<Box<dyn FileHandle>, OpenReadError> {
|
||||
let file_slice = self.open_read(path)?;
|
||||
Ok(Box::new(file_slice))
|
||||
}
|
||||
|
||||
fn open_read(&self, path: &Path) -> result::Result<FileSlice, OpenReadError> {
|
||||
let file_slice = self.directory.open_read(path)?;
|
||||
let (footer, reader) = Footer::extract_footer(file_slice)
|
||||
|
||||
@@ -2,14 +2,13 @@ use crate::core::META_FILEPATH;
|
||||
use crate::directory::error::LockError;
|
||||
use crate::directory::error::{DeleteError, OpenDirectoryError, OpenReadError, OpenWriteError};
|
||||
use crate::directory::file_watcher::FileWatcher;
|
||||
use crate::directory::AntiCallToken;
|
||||
use crate::directory::BoxedData;
|
||||
use crate::directory::Directory;
|
||||
use crate::directory::DirectoryLock;
|
||||
use crate::directory::FileSlice;
|
||||
use crate::directory::Lock;
|
||||
use crate::directory::WatchCallback;
|
||||
use crate::directory::WatchHandle;
|
||||
use crate::directory::{AntiCallToken, FileHandle, OwnedBytes};
|
||||
use crate::directory::{ArcBytes, WeakArcBytes};
|
||||
use crate::directory::{TerminatingWrite, WritePtr};
|
||||
use fs2::FileExt;
|
||||
use memmap::Mmap;
|
||||
@@ -25,7 +24,6 @@ use std::path::{Path, PathBuf};
|
||||
use std::result;
|
||||
use std::sync::Arc;
|
||||
use std::sync::RwLock;
|
||||
use std::sync::Weak;
|
||||
use std::{collections::HashMap, ops::Deref};
|
||||
use tempfile::TempDir;
|
||||
|
||||
@@ -78,7 +76,7 @@ pub struct CacheInfo {
|
||||
|
||||
struct MmapCache {
|
||||
counters: CacheCounters,
|
||||
cache: HashMap<PathBuf, Weak<BoxedData>>,
|
||||
cache: HashMap<PathBuf, WeakArcBytes>,
|
||||
}
|
||||
|
||||
impl Default for MmapCache {
|
||||
@@ -112,7 +110,7 @@ impl MmapCache {
|
||||
}
|
||||
|
||||
// Returns None if the file exists but as a len of 0 (and hence is not mmappable).
|
||||
fn get_mmap(&mut self, full_path: &Path) -> Result<Option<Arc<BoxedData>>, OpenReadError> {
|
||||
fn get_mmap(&mut self, full_path: &Path) -> Result<Option<ArcBytes>, OpenReadError> {
|
||||
if let Some(mmap_weak) = self.cache.get(full_path) {
|
||||
if let Some(mmap_arc) = mmap_weak.upgrade() {
|
||||
self.counters.hit += 1;
|
||||
@@ -123,7 +121,7 @@ impl MmapCache {
|
||||
self.counters.miss += 1;
|
||||
let mmap_opt = open_mmap(full_path)?;
|
||||
Ok(mmap_opt.map(|mmap| {
|
||||
let mmap_arc: Arc<BoxedData> = Arc::new(Box::new(mmap));
|
||||
let mmap_arc: ArcBytes = Arc::new(mmap);
|
||||
let mmap_weak = Arc::downgrade(&mmap_arc);
|
||||
self.cache.insert(full_path.to_owned(), mmap_weak);
|
||||
mmap_arc
|
||||
@@ -161,7 +159,7 @@ impl MmapDirectoryInner {
|
||||
mmap_cache: Default::default(),
|
||||
_temp_directory: temp_directory,
|
||||
watcher: FileWatcher::new(&root_path.join(*META_FILEPATH)),
|
||||
root_path: root_path,
|
||||
root_path,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -316,7 +314,7 @@ impl TerminatingWrite for SafeFileWriter {
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct MmapArc(Arc<Box<dyn Deref<Target = [u8]> + Send + Sync>>);
|
||||
struct MmapArc(Arc<dyn Deref<Target = [u8]> + Send + Sync>);
|
||||
|
||||
impl Deref for MmapArc {
|
||||
type Target = [u8];
|
||||
@@ -346,7 +344,7 @@ pub(crate) fn atomic_write(path: &Path, content: &[u8]) -> io::Result<()> {
|
||||
}
|
||||
|
||||
impl Directory for MmapDirectory {
|
||||
fn open_read(&self, path: &Path) -> result::Result<FileSlice, OpenReadError> {
|
||||
fn get_file_handle(&self, path: &Path) -> result::Result<Box<dyn FileHandle>, OpenReadError> {
|
||||
debug!("Open Read {:?}", path);
|
||||
let full_path = self.resolve_path(path);
|
||||
|
||||
@@ -359,11 +357,16 @@ impl Directory for MmapDirectory {
|
||||
let io_err = make_io_err(msg);
|
||||
OpenReadError::wrap_io_error(io_err, path.to_path_buf())
|
||||
})?;
|
||||
if let Some(mmap_arc) = mmap_cache.get_mmap(&full_path)? {
|
||||
Ok(FileSlice::from(MmapArc(mmap_arc)))
|
||||
} else {
|
||||
Ok(FileSlice::empty())
|
||||
}
|
||||
|
||||
let owned_bytes = mmap_cache
|
||||
.get_mmap(&full_path)?
|
||||
.map(|mmap_arc| {
|
||||
let mmap_arc_obj = MmapArc(mmap_arc);
|
||||
OwnedBytes::new(mmap_arc_obj)
|
||||
})
|
||||
.unwrap_or_else(OwnedBytes::empty);
|
||||
|
||||
Ok(Box::new(owned_bytes))
|
||||
}
|
||||
|
||||
/// Any entry associated to the path in the mmap will be
|
||||
@@ -446,7 +449,8 @@ impl Directory for MmapDirectory {
|
||||
fn atomic_write(&self, path: &Path, content: &[u8]) -> io::Result<()> {
|
||||
debug!("Atomic Write {:?}", path);
|
||||
let full_path = self.resolve_path(path);
|
||||
atomic_write(&full_path, content)
|
||||
atomic_write(&full_path, content)?;
|
||||
self.sync_directory()
|
||||
}
|
||||
|
||||
fn acquire_lock(&self, lock: &Lock) -> Result<DirectoryLock, LockError> {
|
||||
|
||||
@@ -23,7 +23,7 @@ pub mod error;
|
||||
pub use self::directory::DirectoryLock;
|
||||
pub use self::directory::{Directory, DirectoryClone};
|
||||
pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK};
|
||||
pub(crate) use self::file_slice::BoxedData;
|
||||
pub(crate) use self::file_slice::{ArcBytes, WeakArcBytes};
|
||||
pub use self::file_slice::{FileHandle, FileSlice};
|
||||
pub use self::owned_bytes::OwnedBytes;
|
||||
pub use self::ram_directory::RAMDirectory;
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use crate::directory::FileHandle;
|
||||
use stable_deref_trait::StableDeref;
|
||||
use std::convert::TryInto;
|
||||
use std::mem;
|
||||
use std::ops::Deref;
|
||||
use std::sync::Arc;
|
||||
@@ -95,6 +96,24 @@ impl OwnedBytes {
|
||||
pub fn advance(&mut self, advance_len: usize) {
|
||||
self.data = &self.data[advance_len..]
|
||||
}
|
||||
|
||||
/// Reads an `u8` from the `OwnedBytes` and advance by one byte.
|
||||
pub fn read_u8(&mut self) -> u8 {
|
||||
assert!(!self.is_empty());
|
||||
|
||||
let byte = self.as_slice()[0];
|
||||
self.advance(1);
|
||||
byte
|
||||
}
|
||||
|
||||
/// Reads an `u64` encoded as little-endian from the `OwnedBytes` and advance by 8 bytes.
|
||||
pub fn read_u64(&mut self) -> u64 {
|
||||
assert!(self.len() > 7);
|
||||
|
||||
let octlet: [u8; 8] = self.as_slice()[..8].try_into().unwrap();
|
||||
self.advance(8);
|
||||
u64::from_le_bytes(octlet)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for OwnedBytes {
|
||||
@@ -230,6 +249,22 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_owned_bytes_read_u8() -> io::Result<()> {
|
||||
let mut bytes = OwnedBytes::new(b"\xFF".as_ref());
|
||||
assert_eq!(bytes.read_u8(), 255);
|
||||
assert_eq!(bytes.len(), 0);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_owned_bytes_read_u64() -> io::Result<()> {
|
||||
let mut bytes = OwnedBytes::new(b"\0\xFF\xFF\xFF\xFF\xFF\xFF\xFF".as_ref());
|
||||
assert_eq!(bytes.read_u64(), u64::MAX - 255);
|
||||
assert_eq!(bytes.len(), 0);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_owned_bytes_split() {
|
||||
let bytes = OwnedBytes::new(b"abcdefghi".as_ref());
|
||||
|
||||
@@ -12,6 +12,8 @@ use std::path::{Path, PathBuf};
|
||||
use std::result;
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use super::FileHandle;
|
||||
|
||||
/// Writer associated with the `RAMDirectory`
|
||||
///
|
||||
/// The Writer just writes a buffer.
|
||||
@@ -163,6 +165,11 @@ impl RAMDirectory {
|
||||
}
|
||||
|
||||
impl Directory for RAMDirectory {
|
||||
fn get_file_handle(&self, path: &Path) -> Result<Box<dyn FileHandle>, OpenReadError> {
|
||||
let file_slice = self.open_read(path)?;
|
||||
Ok(Box::new(file_slice))
|
||||
}
|
||||
|
||||
fn open_read(&self, path: &Path) -> result::Result<FileSlice, OpenReadError> {
|
||||
self.fs.read().unwrap().open_read(path)
|
||||
}
|
||||
|
||||
@@ -6,12 +6,12 @@ use std::sync::Weak;
|
||||
|
||||
/// Cloneable wrapper for callbacks registered when watching files of a `Directory`.
|
||||
#[derive(Clone)]
|
||||
pub struct WatchCallback(Arc<Box<dyn Fn() + Sync + Send>>);
|
||||
pub struct WatchCallback(Arc<dyn Fn() + Sync + Send>);
|
||||
|
||||
impl WatchCallback {
|
||||
/// Wraps a `Fn()` to create a WatchCallback.
|
||||
pub fn new<F: Fn() + Sync + Send + 'static>(op: F) -> Self {
|
||||
WatchCallback(Arc::new(Box::new(op)))
|
||||
WatchCallback(Arc::new(op))
|
||||
}
|
||||
|
||||
fn call(&self) {
|
||||
|
||||
@@ -10,7 +10,7 @@ use std::borrow::BorrowMut;
|
||||
pub const TERMINATED: DocId = std::i32::MAX as u32;
|
||||
|
||||
/// Represents an iterable set of sorted doc ids.
|
||||
pub trait DocSet {
|
||||
pub trait DocSet: Send {
|
||||
/// Goes to the next element.
|
||||
///
|
||||
/// The DocId of the next element is returned.
|
||||
@@ -129,6 +129,14 @@ impl<'a> DocSet for &'a mut dyn DocSet {
|
||||
fn size_hint(&self) -> u32 {
|
||||
(**self).size_hint()
|
||||
}
|
||||
|
||||
fn count(&mut self, delete_bitset: &DeleteBitSet) -> u32 {
|
||||
(**self).count(delete_bitset)
|
||||
}
|
||||
|
||||
fn count_including_deleted(&mut self) -> u32 {
|
||||
(**self).count_including_deleted()
|
||||
}
|
||||
}
|
||||
|
||||
impl<TDocSet: DocSet + ?Sized> DocSet for Box<TDocSet> {
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use super::MultiValueIntFastFieldReader;
|
||||
use crate::error::DataCorruption;
|
||||
use crate::schema::Facet;
|
||||
use crate::termdict::TermDictionary;
|
||||
use crate::termdict::TermOrdinal;
|
||||
@@ -62,12 +63,13 @@ impl FacetReader {
|
||||
&mut self,
|
||||
facet_ord: TermOrdinal,
|
||||
output: &mut Facet,
|
||||
) -> Result<(), str::Utf8Error> {
|
||||
) -> crate::Result<()> {
|
||||
let found_term = self
|
||||
.term_dict
|
||||
.ord_to_term(facet_ord as u64, &mut self.buffer);
|
||||
.ord_to_term(facet_ord as u64, &mut self.buffer)?;
|
||||
assert!(found_term, "Term ordinal {} no found.", facet_ord);
|
||||
let facet_str = str::from_utf8(&self.buffer[..])?;
|
||||
let facet_str = str::from_utf8(&self.buffer[..])
|
||||
.map_err(|utf8_err| DataCorruption::comment_only(utf8_err.to_string()))?;
|
||||
output.set_facet_str(facet_str);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -51,6 +51,15 @@ impl<Item: FastValue> FastFieldReader<Item> {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn cast<TFastValue: FastValue>(self) -> FastFieldReader<TFastValue> {
|
||||
FastFieldReader {
|
||||
bit_unpacker: self.bit_unpacker,
|
||||
min_value_u64: self.min_value_u64,
|
||||
max_value_u64: self.max_value_u64,
|
||||
_phantom: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
/// Return the value associated to the given document.
|
||||
///
|
||||
/// This accessor should return as fast as possible.
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use crate::common::CompositeFile;
|
||||
use crate::fastfield::BytesFastFieldReader;
|
||||
use crate::fastfield::{BytesFastFieldReader, FastValue};
|
||||
use crate::fastfield::MultiValueIntFastFieldReader;
|
||||
use crate::fastfield::{FastFieldNotAvailableError, FastFieldReader};
|
||||
use crate::schema::{Cardinality, Field, FieldType, Schema};
|
||||
@@ -201,6 +201,10 @@ impl FastFieldReaders {
|
||||
None
|
||||
}
|
||||
|
||||
pub(crate) fn typed_fast_field_reader<TFastValue: FastValue>(&self, field: Field) -> Option<FastFieldReader<TFastValue>> {
|
||||
self.u64_lenient(field).map(|fast_field_reader| fast_field_reader.cast())
|
||||
}
|
||||
|
||||
/// Returns the `i64` fast field reader reader associated to `field`.
|
||||
///
|
||||
/// If `field` is not a i64 fast field, this method returns `None`.
|
||||
|
||||
@@ -61,16 +61,38 @@ impl FieldNormReaders {
|
||||
/// precompute computationally expensive functions of the fieldnorm
|
||||
/// in a very short array.
|
||||
#[derive(Clone)]
|
||||
pub struct FieldNormReader {
|
||||
data: OwnedBytes,
|
||||
pub struct FieldNormReader(ReaderImplEnum);
|
||||
|
||||
impl From<ReaderImplEnum> for FieldNormReader {
|
||||
fn from(reader_enum: ReaderImplEnum) -> FieldNormReader {
|
||||
FieldNormReader(reader_enum)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
enum ReaderImplEnum {
|
||||
FromData(OwnedBytes),
|
||||
Const {
|
||||
num_docs: u32,
|
||||
fieldnorm_id: u8,
|
||||
fieldnorm: u32,
|
||||
},
|
||||
}
|
||||
|
||||
impl FieldNormReader {
|
||||
/// Creates a `FieldNormReader` with a constant fieldnorm.
|
||||
///
|
||||
/// The fieldnorm will be subjected to compression as if it was coming
|
||||
/// from an array-backed fieldnorm reader.
|
||||
pub fn constant(num_docs: u32, fieldnorm: u32) -> FieldNormReader {
|
||||
let fieldnorm_id = fieldnorm_to_id(fieldnorm);
|
||||
let field_norms_data = OwnedBytes::new(vec![fieldnorm_id; num_docs as usize]);
|
||||
FieldNormReader::new(field_norms_data)
|
||||
let fieldnorm = id_to_fieldnorm(fieldnorm_id);
|
||||
ReaderImplEnum::Const {
|
||||
num_docs,
|
||||
fieldnorm_id,
|
||||
fieldnorm,
|
||||
}
|
||||
.into()
|
||||
}
|
||||
|
||||
/// Opens a field norm reader given its file.
|
||||
@@ -80,12 +102,15 @@ impl FieldNormReader {
|
||||
}
|
||||
|
||||
fn new(data: OwnedBytes) -> Self {
|
||||
FieldNormReader { data }
|
||||
ReaderImplEnum::FromData(data).into()
|
||||
}
|
||||
|
||||
/// Returns the number of documents in this segment.
|
||||
pub fn num_docs(&self) -> u32 {
|
||||
self.data.len() as u32
|
||||
match &self.0 {
|
||||
ReaderImplEnum::FromData(data) => data.len() as u32,
|
||||
ReaderImplEnum::Const { num_docs, .. } => *num_docs,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the `fieldnorm` associated to a doc id.
|
||||
@@ -98,14 +123,25 @@ impl FieldNormReader {
|
||||
/// The fieldnorm is effectively decoded from the
|
||||
/// `fieldnorm_id` by doing a simple table lookup.
|
||||
pub fn fieldnorm(&self, doc_id: DocId) -> u32 {
|
||||
let fieldnorm_id = self.fieldnorm_id(doc_id);
|
||||
id_to_fieldnorm(fieldnorm_id)
|
||||
match &self.0 {
|
||||
ReaderImplEnum::FromData(data) => {
|
||||
let fieldnorm_id = data.as_slice()[doc_id as usize];
|
||||
id_to_fieldnorm(fieldnorm_id)
|
||||
}
|
||||
ReaderImplEnum::Const { fieldnorm, .. } => *fieldnorm,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the `fieldnorm_id` associated to a document.
|
||||
#[inline(always)]
|
||||
pub fn fieldnorm_id(&self, doc_id: DocId) -> u8 {
|
||||
self.data.as_slice()[doc_id as usize]
|
||||
match &self.0 {
|
||||
ReaderImplEnum::FromData(data) => {
|
||||
let fieldnorm_id = data.as_slice()[doc_id as usize];
|
||||
fieldnorm_id
|
||||
}
|
||||
ReaderImplEnum::Const { fieldnorm_id, .. } => *fieldnorm_id,
|
||||
}
|
||||
}
|
||||
|
||||
/// Converts a `fieldnorm_id` into a fieldnorm.
|
||||
@@ -129,9 +165,7 @@ impl FieldNormReader {
|
||||
.map(FieldNormReader::fieldnorm_to_id)
|
||||
.collect::<Vec<u8>>();
|
||||
let field_norms_data = OwnedBytes::new(field_norms_id);
|
||||
FieldNormReader {
|
||||
data: field_norms_data,
|
||||
}
|
||||
FieldNormReader::new(field_norms_data)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -150,4 +184,20 @@ mod tests {
|
||||
assert_eq!(fieldnorm_reader.fieldnorm(3), 4);
|
||||
assert_eq!(fieldnorm_reader.fieldnorm(4), 983_064);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_const_fieldnorm_reader_small_fieldnorm_id() {
|
||||
let fieldnorm_reader = FieldNormReader::constant(1_000_000u32, 10u32);
|
||||
assert_eq!(fieldnorm_reader.num_docs(), 1_000_000u32);
|
||||
assert_eq!(fieldnorm_reader.fieldnorm(0u32), 10u32);
|
||||
assert_eq!(fieldnorm_reader.fieldnorm_id(0u32), 10u8);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_const_fieldnorm_reader_large_fieldnorm_id() {
|
||||
let fieldnorm_reader = FieldNormReader::constant(1_000_000u32, 300u32);
|
||||
assert_eq!(fieldnorm_reader.num_docs(), 1_000_000u32);
|
||||
assert_eq!(fieldnorm_reader.fieldnorm(0u32), 280u32);
|
||||
assert_eq!(fieldnorm_reader.fieldnorm_id(0u32), 72u8);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -53,7 +53,7 @@ impl DeleteQueue {
|
||||
return block;
|
||||
}
|
||||
let block = Arc::new(Block {
|
||||
operations: Arc::default(),
|
||||
operations: Arc::new([]),
|
||||
next: NextBlock::from(self.clone()),
|
||||
});
|
||||
wlock.last_block = Arc::downgrade(&block);
|
||||
@@ -108,7 +108,7 @@ impl DeleteQueue {
|
||||
let delete_operations = mem::replace(&mut self_wlock.writer, vec![]);
|
||||
|
||||
let new_block = Arc::new(Block {
|
||||
operations: Arc::new(delete_operations.into_boxed_slice()),
|
||||
operations: Arc::from(delete_operations.into_boxed_slice()),
|
||||
next: NextBlock::from(self.clone()),
|
||||
});
|
||||
|
||||
@@ -167,7 +167,7 @@ impl NextBlock {
|
||||
}
|
||||
|
||||
struct Block {
|
||||
operations: Arc<Box<[DeleteOperation]>>,
|
||||
operations: Arc<[DeleteOperation]>,
|
||||
next: NextBlock,
|
||||
}
|
||||
|
||||
|
||||
@@ -449,7 +449,7 @@ impl IndexWriter {
|
||||
}
|
||||
|
||||
/// Accessor to the merge policy.
|
||||
pub fn get_merge_policy(&self) -> Arc<Box<dyn MergePolicy>> {
|
||||
pub fn get_merge_policy(&self) -> Arc<dyn MergePolicy> {
|
||||
self.segment_updater.get_merge_policy()
|
||||
}
|
||||
|
||||
|
||||
@@ -503,7 +503,6 @@ impl IndexMerger {
|
||||
let mut positions_buffer: Vec<u32> = Vec::with_capacity(1_000);
|
||||
let mut delta_computer = DeltaComputer::new();
|
||||
|
||||
let mut field_term_streams = Vec::new();
|
||||
let mut max_term_ords: Vec<TermOrdinal> = Vec::new();
|
||||
|
||||
let field_readers: Vec<Arc<InvertedIndexReader>> = self
|
||||
@@ -512,9 +511,10 @@ impl IndexMerger {
|
||||
.map(|reader| reader.inverted_index(indexed_field))
|
||||
.collect::<crate::Result<Vec<_>>>()?;
|
||||
|
||||
let mut field_term_streams = Vec::new();
|
||||
for field_reader in &field_readers {
|
||||
let terms = field_reader.terms();
|
||||
field_term_streams.push(terms.stream());
|
||||
field_term_streams.push(terms.stream()?);
|
||||
max_term_ords.push(terms.num_terms() as u64);
|
||||
}
|
||||
|
||||
|
||||
@@ -9,6 +9,15 @@ pub struct DeleteOperation {
|
||||
pub term: Term,
|
||||
}
|
||||
|
||||
impl Default for DeleteOperation {
|
||||
fn default() -> Self {
|
||||
DeleteOperation {
|
||||
opstamp: 0u64,
|
||||
term: Term::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Timestamped Add operation.
|
||||
#[derive(Eq, PartialEq, Debug)]
|
||||
pub struct AddOperation {
|
||||
|
||||
@@ -154,7 +154,7 @@ pub(crate) struct InnerSegmentUpdater {
|
||||
|
||||
index: Index,
|
||||
segment_manager: SegmentManager,
|
||||
merge_policy: RwLock<Arc<Box<dyn MergePolicy>>>,
|
||||
merge_policy: RwLock<Arc<dyn MergePolicy>>,
|
||||
killed: AtomicBool,
|
||||
stamper: Stamper,
|
||||
merge_operations: MergeOperationInventory,
|
||||
@@ -193,19 +193,19 @@ impl SegmentUpdater {
|
||||
merge_thread_pool,
|
||||
index,
|
||||
segment_manager,
|
||||
merge_policy: RwLock::new(Arc::new(Box::new(DefaultMergePolicy::default()))),
|
||||
merge_policy: RwLock::new(Arc::new(DefaultMergePolicy::default())),
|
||||
killed: AtomicBool::new(false),
|
||||
stamper,
|
||||
merge_operations: Default::default(),
|
||||
})))
|
||||
}
|
||||
|
||||
pub fn get_merge_policy(&self) -> Arc<Box<dyn MergePolicy>> {
|
||||
pub fn get_merge_policy(&self) -> Arc<dyn MergePolicy> {
|
||||
self.merge_policy.read().unwrap().clone()
|
||||
}
|
||||
|
||||
pub fn set_merge_policy(&self, merge_policy: Box<dyn MergePolicy>) {
|
||||
let arc_merge_policy = Arc::new(merge_policy);
|
||||
let arc_merge_policy = Arc::from(merge_policy);
|
||||
*self.merge_policy.write().unwrap() = arc_merge_policy;
|
||||
}
|
||||
|
||||
|
||||
@@ -160,7 +160,7 @@ pub use self::docset::{DocSet, TERMINATED};
|
||||
pub use crate::common::HasLen;
|
||||
pub use crate::common::{f64_to_u64, i64_to_u64, u64_to_f64, u64_to_i64};
|
||||
pub use crate::core::{Executor, SegmentComponent};
|
||||
pub use crate::core::{FieldSearcher, Index, IndexMeta, Searcher, Segment, SegmentId, SegmentMeta};
|
||||
pub use crate::core::{Index, IndexMeta, Searcher, Segment, SegmentId, SegmentMeta};
|
||||
pub use crate::core::{InvertedIndexReader, SegmentReader};
|
||||
pub use crate::directory::Directory;
|
||||
pub use crate::indexer::operation::UserOperation;
|
||||
|
||||
@@ -8,7 +8,7 @@ use std::io::{self, Write};
|
||||
pub struct PositionSerializer<W: io::Write> {
|
||||
bit_packer: BitPacker4x,
|
||||
write_stream: CountingWriter<W>,
|
||||
write_skiplist: W,
|
||||
write_skip_index: W,
|
||||
block: Vec<u32>,
|
||||
buffer: Vec<u8>,
|
||||
num_ints: u64,
|
||||
@@ -16,11 +16,11 @@ pub struct PositionSerializer<W: io::Write> {
|
||||
}
|
||||
|
||||
impl<W: io::Write> PositionSerializer<W> {
|
||||
pub fn new(write_stream: W, write_skiplist: W) -> PositionSerializer<W> {
|
||||
pub fn new(write_stream: W, write_skip_index: W) -> PositionSerializer<W> {
|
||||
PositionSerializer {
|
||||
bit_packer: BitPacker4x::new(),
|
||||
write_stream: CountingWriter::wrap(write_stream),
|
||||
write_skiplist,
|
||||
write_skip_index,
|
||||
block: Vec::with_capacity(128),
|
||||
buffer: vec![0u8; 128 * 4],
|
||||
num_ints: 0u64,
|
||||
@@ -52,7 +52,7 @@ impl<W: io::Write> PositionSerializer<W> {
|
||||
|
||||
fn flush_block(&mut self) -> io::Result<()> {
|
||||
let num_bits = self.bit_packer.num_bits(&self.block[..]);
|
||||
self.write_skiplist.write_all(&[num_bits])?;
|
||||
self.write_skip_index.write_all(&[num_bits])?;
|
||||
let written_len = self
|
||||
.bit_packer
|
||||
.compress(&self.block[..], &mut self.buffer, num_bits);
|
||||
@@ -70,10 +70,10 @@ impl<W: io::Write> PositionSerializer<W> {
|
||||
self.flush_block()?;
|
||||
}
|
||||
for &long_skip in &self.long_skips {
|
||||
long_skip.serialize(&mut self.write_skiplist)?;
|
||||
long_skip.serialize(&mut self.write_skip_index)?;
|
||||
}
|
||||
(self.long_skips.len() as u32).serialize(&mut self.write_skiplist)?;
|
||||
self.write_skiplist.flush()?;
|
||||
(self.long_skips.len() as u32).serialize(&mut self.write_skip_index)?;
|
||||
self.write_skip_index.flush()?;
|
||||
self.write_stream.flush()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -469,7 +469,7 @@ mod tests {
|
||||
let segment_reader = searcher.segment_reader(0);
|
||||
let inverted_index = segment_reader.inverted_index(int_field).unwrap();
|
||||
let term = Term::from_field_u64(int_field, 0u64);
|
||||
let term_info = inverted_index.get_term_info(&term).unwrap();
|
||||
let term_info = inverted_index.get_term_info(&term).unwrap().unwrap();
|
||||
inverted_index
|
||||
.read_block_postings_from_terminfo(&term_info, IndexRecordOption::Basic)
|
||||
.unwrap()
|
||||
@@ -513,7 +513,7 @@ mod tests {
|
||||
{
|
||||
let term = Term::from_field_u64(int_field, 0u64);
|
||||
let inverted_index = segment_reader.inverted_index(int_field)?;
|
||||
let term_info = inverted_index.get_term_info(&term).unwrap();
|
||||
let term_info = inverted_index.get_term_info(&term)?.unwrap();
|
||||
block_segments = inverted_index
|
||||
.read_block_postings_from_terminfo(&term_info, IndexRecordOption::Basic)?;
|
||||
}
|
||||
@@ -521,7 +521,7 @@ mod tests {
|
||||
{
|
||||
let term = Term::from_field_u64(int_field, 1u64);
|
||||
let inverted_index = segment_reader.inverted_index(int_field)?;
|
||||
let term_info = inverted_index.get_term_info(&term).unwrap();
|
||||
let term_info = inverted_index.get_term_info(&term)?.unwrap();
|
||||
inverted_index.reset_block_postings_from_terminfo(&term_info, &mut block_segments)?;
|
||||
}
|
||||
assert_eq!(block_segments.docs(), &[1, 3, 5]);
|
||||
|
||||
@@ -54,7 +54,7 @@ pub mod tests {
|
||||
use crate::DocId;
|
||||
use crate::HasLen;
|
||||
use crate::Score;
|
||||
use std::iter;
|
||||
use std::{iter, mem};
|
||||
|
||||
#[test]
|
||||
pub fn test_position_write() -> crate::Result<()> {
|
||||
@@ -71,6 +71,7 @@ pub mod tests {
|
||||
field_serializer.write_doc(doc_id, 4, &delta_positions)?;
|
||||
}
|
||||
field_serializer.close_term()?;
|
||||
mem::drop(field_serializer);
|
||||
posting_serializer.close()?;
|
||||
let read = segment.open_read(SegmentComponent::POSITIONS)?;
|
||||
assert!(read.len() <= 140);
|
||||
@@ -179,7 +180,7 @@ pub mod tests {
|
||||
let inverted_index = segment_reader.inverted_index(text_field)?;
|
||||
assert_eq!(inverted_index.terms().num_terms(), 1);
|
||||
let mut bytes = vec![];
|
||||
assert!(inverted_index.terms().ord_to_term(0, &mut bytes));
|
||||
assert!(inverted_index.terms().ord_to_term(0, &mut bytes)?);
|
||||
assert_eq!(&bytes, b"hello");
|
||||
}
|
||||
{
|
||||
@@ -191,7 +192,7 @@ pub mod tests {
|
||||
let inverted_index = segment_reader.inverted_index(text_field)?;
|
||||
assert_eq!(inverted_index.terms().num_terms(), 1);
|
||||
let mut bytes = vec![];
|
||||
assert!(inverted_index.terms().ord_to_term(0, &mut bytes));
|
||||
assert!(inverted_index.terms().ord_to_term(0, &mut bytes)?);
|
||||
assert_eq!(&bytes[..], ok_token_text.as_bytes());
|
||||
}
|
||||
Ok(())
|
||||
|
||||
@@ -7,6 +7,7 @@ use crate::schema::{Field, IndexRecordOption};
|
||||
use crate::termdict::{TermDictionary, TermStreamer};
|
||||
use crate::TantivyError;
|
||||
use crate::{DocId, Score};
|
||||
use std::io;
|
||||
use std::sync::Arc;
|
||||
use tantivy_fst::Automaton;
|
||||
|
||||
@@ -19,6 +20,7 @@ pub struct AutomatonWeight<A> {
|
||||
impl<A> AutomatonWeight<A>
|
||||
where
|
||||
A: Automaton + Send + Sync + 'static,
|
||||
A::State: Clone,
|
||||
{
|
||||
/// Create a new AutomationWeight
|
||||
pub fn new<IntoArcA: Into<Arc<A>>>(field: Field, automaton: IntoArcA) -> AutomatonWeight<A> {
|
||||
@@ -28,7 +30,10 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
fn automaton_stream<'a>(&'a self, term_dict: &'a TermDictionary) -> TermStreamer<'a, &'a A> {
|
||||
fn automaton_stream<'a>(
|
||||
&'a self,
|
||||
term_dict: &'a TermDictionary,
|
||||
) -> io::Result<TermStreamer<'a, &'a A>> {
|
||||
let automaton: &A = &*self.automaton;
|
||||
let term_stream_builder = term_dict.search(automaton);
|
||||
term_stream_builder.into_stream()
|
||||
@@ -38,13 +43,14 @@ where
|
||||
impl<A> Weight for AutomatonWeight<A>
|
||||
where
|
||||
A: Automaton + Send + Sync + 'static,
|
||||
A::State: Clone,
|
||||
{
|
||||
fn scorer(&self, reader: &SegmentReader, boost: Score) -> crate::Result<Box<dyn Scorer>> {
|
||||
let max_doc = reader.max_doc();
|
||||
let mut doc_bitset = BitSet::with_max_value(max_doc);
|
||||
let inverted_index = reader.inverted_index(self.field)?;
|
||||
let term_dict = inverted_index.terms();
|
||||
let mut term_stream = self.automaton_stream(term_dict);
|
||||
let mut term_stream = self.automaton_stream(term_dict)?;
|
||||
while term_stream.advance() {
|
||||
let term_info = term_stream.value();
|
||||
let mut block_segment_postings = inverted_index
|
||||
@@ -98,6 +104,7 @@ mod tests {
|
||||
index
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
enum State {
|
||||
Start,
|
||||
NotMatching,
|
||||
|
||||
@@ -106,7 +106,7 @@ impl BM25Weight {
|
||||
BM25Weight::new(idf_explain, avg_fieldnorm)
|
||||
}
|
||||
|
||||
fn new(idf_explain: Explanation, average_fieldnorm: Score) -> BM25Weight {
|
||||
pub(crate) fn new(idf_explain: Explanation, average_fieldnorm: Score) -> BM25Weight {
|
||||
let weight = idf_explain.value() * (1.0 + K1);
|
||||
BM25Weight {
|
||||
idf_explain,
|
||||
|
||||
@@ -11,6 +11,7 @@ use crate::schema::{Field, IndexRecordOption, Term};
|
||||
use crate::termdict::{TermDictionary, TermStreamer};
|
||||
use crate::{DocId, Score};
|
||||
use std::collections::Bound;
|
||||
use std::io;
|
||||
use std::ops::Range;
|
||||
|
||||
fn map_bound<TFrom, TTo, Transform: Fn(&TFrom) -> TTo>(
|
||||
@@ -274,7 +275,7 @@ pub struct RangeWeight {
|
||||
}
|
||||
|
||||
impl RangeWeight {
|
||||
fn term_range<'a>(&self, term_dict: &'a TermDictionary) -> TermStreamer<'a> {
|
||||
fn term_range<'a>(&self, term_dict: &'a TermDictionary) -> io::Result<TermStreamer<'a>> {
|
||||
use std::collections::Bound::*;
|
||||
let mut term_stream_builder = term_dict.range();
|
||||
term_stream_builder = match self.left_bound {
|
||||
@@ -298,7 +299,7 @@ impl Weight for RangeWeight {
|
||||
|
||||
let inverted_index = reader.inverted_index(self.field)?;
|
||||
let term_dict = inverted_index.terms();
|
||||
let mut term_range = self.term_range(term_dict);
|
||||
let mut term_range = self.term_range(term_dict)?;
|
||||
while term_range.advance() {
|
||||
let term_info = term_range.value();
|
||||
let mut block_segment_postings = inverted_index
|
||||
|
||||
@@ -12,7 +12,7 @@ use std::marker::PhantomData;
|
||||
/// This is useful for queries like `+somethingrequired somethingoptional`.
|
||||
///
|
||||
/// Note that `somethingoptional` has no impact on the `DocSet`.
|
||||
pub struct RequiredOptionalScorer<TReqScorer, TOptScorer, TScoreCombiner> {
|
||||
pub struct RequiredOptionalScorer<TReqScorer, TOptScorer, TScoreCombiner: ScoreCombiner> {
|
||||
req_scorer: TReqScorer,
|
||||
opt_scorer: TOptScorer,
|
||||
score_cache: Option<Score>,
|
||||
@@ -23,6 +23,7 @@ impl<TReqScorer, TOptScorer, TScoreCombiner>
|
||||
RequiredOptionalScorer<TReqScorer, TOptScorer, TScoreCombiner>
|
||||
where
|
||||
TOptScorer: DocSet,
|
||||
TScoreCombiner: ScoreCombiner,
|
||||
{
|
||||
/// Creates a new `RequiredOptionalScorer`.
|
||||
pub fn new(
|
||||
@@ -43,6 +44,7 @@ impl<TReqScorer, TOptScorer, TScoreCombiner> DocSet
|
||||
where
|
||||
TReqScorer: DocSet,
|
||||
TOptScorer: DocSet,
|
||||
TScoreCombiner: ScoreCombiner,
|
||||
{
|
||||
fn advance(&mut self) -> DocId {
|
||||
self.score_cache = None;
|
||||
|
||||
@@ -3,7 +3,7 @@ use crate::Score;
|
||||
|
||||
/// The `ScoreCombiner` trait defines how to compute
|
||||
/// an overall score given a list of scores.
|
||||
pub trait ScoreCombiner: Default + Clone + Copy + 'static {
|
||||
pub trait ScoreCombiner: Default + Clone + Send + Copy + 'static {
|
||||
/// Aggregates the score combiner with the given scorer.
|
||||
///
|
||||
/// The `ScoreCombiner` may decide to call `.scorer.score()`
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use super::term_weight::TermWeight;
|
||||
use crate::query::bm25::BM25Weight;
|
||||
use crate::query::Query;
|
||||
use crate::query::Weight;
|
||||
use crate::query::{Explanation, Query};
|
||||
use crate::schema::IndexRecordOption;
|
||||
use crate::Searcher;
|
||||
use crate::Term;
|
||||
@@ -100,7 +100,13 @@ impl TermQuery {
|
||||
field_entry.name()
|
||||
)));
|
||||
}
|
||||
let bm25_weight = BM25Weight::for_terms(searcher, &[term])?;
|
||||
let bm25_weight;
|
||||
if scoring_enabled {
|
||||
bm25_weight = BM25Weight::for_terms(searcher, &[term])?;
|
||||
} else {
|
||||
bm25_weight =
|
||||
BM25Weight::new(Explanation::new("<no score>".to_string(), 1.0f32), 1.0f32);
|
||||
}
|
||||
let index_record_option = if scoring_enabled {
|
||||
self.index_record_option
|
||||
} else {
|
||||
|
||||
@@ -45,7 +45,7 @@ impl Weight for TermWeight {
|
||||
} else {
|
||||
let field = self.term.field();
|
||||
let inv_index = reader.inverted_index(field)?;
|
||||
let term_info = inv_index.get_term_info(&self.term);
|
||||
let term_info = inv_index.get_term_info(&self.term)?;
|
||||
Ok(term_info.map(|term_info| term_info.doc_freq).unwrap_or(0))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -233,6 +233,7 @@ mod tests {
|
||||
assert_eq!(Facet::root(), Facet::from("/"));
|
||||
assert_eq!(format!("{}", Facet::root()), "/");
|
||||
assert!(Facet::root().is_root());
|
||||
assert_eq!(Facet::root().encoded_str(), "");
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -3,7 +3,7 @@ use std::io::{self, Read, Write};
|
||||
/// Name of the compression scheme used in the doc store.
|
||||
///
|
||||
/// This name is appended to the version string of tantivy.
|
||||
pub const COMPRESSION: &'static str = "lz4";
|
||||
pub const COMPRESSION: &str = "lz4";
|
||||
|
||||
pub fn compress(uncompressed: &[u8], compressed: &mut Vec<u8>) -> io::Result<()> {
|
||||
compressed.clear();
|
||||
|
||||
@@ -19,7 +19,7 @@ impl<'a> Iterator for LayerCursor<'a> {
|
||||
return None;
|
||||
}
|
||||
let (block_mut, remaining_mut) = (&mut self.block, &mut self.remaining);
|
||||
if let Err(_) = block_mut.deserialize(remaining_mut) {
|
||||
if block_mut.deserialize(remaining_mut).is_err() {
|
||||
return None;
|
||||
}
|
||||
self.cursor = 0;
|
||||
@@ -50,8 +50,7 @@ impl Layer {
|
||||
|
||||
fn seek_start_at_offset(&self, target: DocId, offset: u64) -> Option<Checkpoint> {
|
||||
self.cursor_at_offset(offset)
|
||||
.filter(|checkpoint| checkpoint.end_doc > target)
|
||||
.next()
|
||||
.find(|checkpoint| checkpoint.end_doc > target)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
27
src/termdict/fst_termdict/mod.rs
Normal file
27
src/termdict/fst_termdict/mod.rs
Normal file
@@ -0,0 +1,27 @@
|
||||
/*!
|
||||
The term dictionary main role is to associate the sorted [`Term`s](../struct.Term.html) to
|
||||
a [`TermInfo`](../postings/struct.TermInfo.html) struct that contains some meta-information
|
||||
about the term.
|
||||
|
||||
Internally, the term dictionary relies on the `fst` crate to store
|
||||
a sorted mapping that associate each term to its rank in the lexicographical order.
|
||||
For instance, in a dictionary containing the sorted terms "abba", "bjork", "blur" and "donovan",
|
||||
the `TermOrdinal` are respectively `0`, `1`, `2`, and `3`.
|
||||
|
||||
For `u64`-terms, tantivy explicitely uses a `BigEndian` representation to ensure that the
|
||||
lexicographical order matches the natural order of integers.
|
||||
|
||||
`i64`-terms are transformed to `u64` using a continuous mapping `val ⟶ val - i64::min_value()`
|
||||
and then treated as a `u64`.
|
||||
|
||||
`f64`-terms are transformed to `u64` using a mapping that preserve order, and are then treated
|
||||
as `u64`.
|
||||
|
||||
A second datastructure makes it possible to access a [`TermInfo`](../postings/struct.TermInfo.html).
|
||||
*/
|
||||
mod streamer;
|
||||
mod term_info_store;
|
||||
mod termdict;
|
||||
|
||||
pub use self::streamer::{TermStreamer, TermStreamerBuilder};
|
||||
pub use self::termdict::{TermDictionary, TermDictionaryBuilder};
|
||||
@@ -1,3 +1,5 @@
|
||||
use std::io;
|
||||
|
||||
use super::TermDictionary;
|
||||
use crate::postings::TermInfo;
|
||||
use crate::termdict::TermOrdinal;
|
||||
@@ -59,14 +61,14 @@ where
|
||||
|
||||
/// Creates the stream corresponding to the range
|
||||
/// of terms defined using the `TermStreamerBuilder`.
|
||||
pub fn into_stream(self) -> TermStreamer<'a, A> {
|
||||
TermStreamer {
|
||||
pub fn into_stream(self) -> io::Result<TermStreamer<'a, A>> {
|
||||
Ok(TermStreamer {
|
||||
fst_map: self.fst_map,
|
||||
stream: self.stream_builder.into_stream(),
|
||||
term_ord: 0u64,
|
||||
current_key: Vec::with_capacity(100),
|
||||
current_value: TermInfo::default(),
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -80,7 +80,6 @@ where
|
||||
.serialize(&mut counting_writer)?;
|
||||
let footer_size = counting_writer.written_bytes();
|
||||
(footer_size as u64).serialize(&mut counting_writer)?;
|
||||
counting_writer.flush()?;
|
||||
}
|
||||
Ok(file)
|
||||
}
|
||||
@@ -139,8 +138,8 @@ impl TermDictionary {
|
||||
}
|
||||
|
||||
/// Returns the ordinal associated to a given term.
|
||||
pub fn term_ord<K: AsRef<[u8]>>(&self, key: K) -> Option<TermOrdinal> {
|
||||
self.fst_index.get(key)
|
||||
pub fn term_ord<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TermOrdinal>> {
|
||||
Ok(self.fst_index.get(key))
|
||||
}
|
||||
|
||||
/// Returns the term associated to a given term ordinal.
|
||||
@@ -152,7 +151,7 @@ impl TermDictionary {
|
||||
///
|
||||
/// Regardless of whether the term is found or not,
|
||||
/// the buffer may be modified.
|
||||
pub fn ord_to_term(&self, mut ord: TermOrdinal, bytes: &mut Vec<u8>) -> bool {
|
||||
pub fn ord_to_term(&self, mut ord: TermOrdinal, bytes: &mut Vec<u8>) -> io::Result<bool> {
|
||||
bytes.clear();
|
||||
let fst = self.fst_index.as_fst();
|
||||
let mut node = fst.root();
|
||||
@@ -167,10 +166,10 @@ impl TermDictionary {
|
||||
let new_node_addr = transition.addr;
|
||||
node = fst.node(new_node_addr);
|
||||
} else {
|
||||
return false;
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
true
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
/// Returns the number of terms in the dictionary.
|
||||
@@ -179,9 +178,10 @@ impl TermDictionary {
|
||||
}
|
||||
|
||||
/// Lookups the value corresponding to the key.
|
||||
pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Option<TermInfo> {
|
||||
self.term_ord(key)
|
||||
.map(|term_ord| self.term_info_from_ord(term_ord))
|
||||
pub fn get<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TermInfo>> {
|
||||
Ok(self
|
||||
.term_ord(key)?
|
||||
.map(|term_ord| self.term_info_from_ord(term_ord)))
|
||||
}
|
||||
|
||||
/// Returns a range builder, to stream all of the terms
|
||||
@@ -191,7 +191,7 @@ impl TermDictionary {
|
||||
}
|
||||
|
||||
/// A stream of all the sorted terms. [See also `.stream_field()`](#method.stream_field)
|
||||
pub fn stream(&self) -> TermStreamer<'_> {
|
||||
pub fn stream(&self) -> io::Result<TermStreamer<'_>> {
|
||||
self.range().into_stream()
|
||||
}
|
||||
|
||||
@@ -20,438 +20,37 @@ as `u64`.
|
||||
A second datastructure makes it possible to access a [`TermInfo`](../postings/struct.TermInfo.html).
|
||||
*/
|
||||
|
||||
use tantivy_fst::automaton::AlwaysMatch;
|
||||
|
||||
mod fst_termdict;
|
||||
use fst_termdict as termdict;
|
||||
|
||||
mod merger;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
/// Position of the term in the sorted list of terms.
|
||||
pub type TermOrdinal = u64;
|
||||
|
||||
mod merger;
|
||||
mod streamer;
|
||||
mod term_info_store;
|
||||
mod termdict;
|
||||
/// The term dictionary contains all of the terms in
|
||||
/// `tantivy index` in a sorted manner.
|
||||
pub type TermDictionary = self::termdict::TermDictionary;
|
||||
|
||||
pub use self::merger::TermMerger;
|
||||
pub use self::streamer::{TermStreamer, TermStreamerBuilder};
|
||||
pub use self::termdict::{TermDictionary, TermDictionaryBuilder};
|
||||
/// Builder for the new term dictionary.
|
||||
///
|
||||
/// Inserting must be done in the order of the `keys`.
|
||||
pub type TermDictionaryBuilder<W> = self::termdict::TermDictionaryBuilder<W>;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{TermDictionary, TermDictionaryBuilder, TermStreamer};
|
||||
use crate::core::Index;
|
||||
use crate::directory::{Directory, FileSlice, RAMDirectory};
|
||||
use crate::postings::TermInfo;
|
||||
use crate::schema::{Schema, TEXT};
|
||||
use std::path::PathBuf;
|
||||
use std::str;
|
||||
/// Given a list of sorted term streams,
|
||||
/// returns an iterator over sorted unique terms.
|
||||
///
|
||||
/// The item yield is actually a pair with
|
||||
/// - the term
|
||||
/// - a slice with the ordinal of the segments containing
|
||||
/// the terms.
|
||||
pub type TermMerger<'a> = self::merger::TermMerger<'a>;
|
||||
|
||||
const BLOCK_SIZE: usize = 1_500;
|
||||
|
||||
fn make_term_info(term_ord: u64) -> TermInfo {
|
||||
let offset = |term_ord: u64| term_ord * 100 + term_ord * term_ord;
|
||||
TermInfo {
|
||||
doc_freq: term_ord as u32,
|
||||
postings_start_offset: offset(term_ord),
|
||||
postings_stop_offset: offset(term_ord + 1),
|
||||
positions_idx: offset(term_ord) * 2u64,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_empty_term_dictionary() {
|
||||
let empty = TermDictionary::empty();
|
||||
assert!(empty.stream().next().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_term_ordinals() -> crate::Result<()> {
|
||||
const COUNTRIES: [&'static str; 7] = [
|
||||
"San Marino",
|
||||
"Serbia",
|
||||
"Slovakia",
|
||||
"Slovenia",
|
||||
"Spain",
|
||||
"Sweden",
|
||||
"Switzerland",
|
||||
];
|
||||
let directory = RAMDirectory::create();
|
||||
let path = PathBuf::from("TermDictionary");
|
||||
{
|
||||
let write = directory.open_write(&path)?;
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(write)?;
|
||||
for term in COUNTRIES.iter() {
|
||||
term_dictionary_builder.insert(term.as_bytes(), &make_term_info(0u64))?;
|
||||
}
|
||||
term_dictionary_builder.finish()?;
|
||||
}
|
||||
let term_file = directory.open_read(&path)?;
|
||||
let term_dict: TermDictionary = TermDictionary::open(term_file)?;
|
||||
for (term_ord, term) in COUNTRIES.iter().enumerate() {
|
||||
assert_eq!(term_dict.term_ord(term).unwrap(), term_ord as u64);
|
||||
let mut bytes = vec![];
|
||||
assert!(term_dict.ord_to_term(term_ord as u64, &mut bytes));
|
||||
assert_eq!(bytes, term.as_bytes());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_term_dictionary_simple() -> crate::Result<()> {
|
||||
let directory = RAMDirectory::create();
|
||||
let path = PathBuf::from("TermDictionary");
|
||||
{
|
||||
let write = directory.open_write(&path)?;
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(write)?;
|
||||
term_dictionary_builder.insert("abc".as_bytes(), &make_term_info(34u64))?;
|
||||
term_dictionary_builder.insert("abcd".as_bytes(), &make_term_info(346u64))?;
|
||||
term_dictionary_builder.finish()?;
|
||||
}
|
||||
let file = directory.open_read(&path)?;
|
||||
let term_dict: TermDictionary = TermDictionary::open(file)?;
|
||||
assert_eq!(term_dict.get("abc").unwrap().doc_freq, 34u32);
|
||||
assert_eq!(term_dict.get("abcd").unwrap().doc_freq, 346u32);
|
||||
let mut stream = term_dict.stream();
|
||||
{
|
||||
{
|
||||
let (k, v) = stream.next().unwrap();
|
||||
assert_eq!(k.as_ref(), "abc".as_bytes());
|
||||
assert_eq!(v.doc_freq, 34u32);
|
||||
}
|
||||
assert_eq!(stream.key(), "abc".as_bytes());
|
||||
assert_eq!(stream.value().doc_freq, 34u32);
|
||||
}
|
||||
{
|
||||
{
|
||||
let (k, v) = stream.next().unwrap();
|
||||
assert_eq!(k, "abcd".as_bytes());
|
||||
assert_eq!(v.doc_freq, 346u32);
|
||||
}
|
||||
assert_eq!(stream.key(), "abcd".as_bytes());
|
||||
assert_eq!(stream.value().doc_freq, 346u32);
|
||||
}
|
||||
assert!(!stream.advance());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_term_iterator() -> crate::Result<()> {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let text_field = schema_builder.add_text_field("text", TEXT);
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
{
|
||||
let mut index_writer = index.writer_for_tests()?;
|
||||
index_writer.add_document(doc!(text_field=>"a b d f"));
|
||||
index_writer.commit()?;
|
||||
index_writer.add_document(doc!(text_field=>"a b c d f"));
|
||||
index_writer.commit()?;
|
||||
index_writer.add_document(doc!(text_field => "e f"));
|
||||
index_writer.commit()?;
|
||||
}
|
||||
let searcher = index.reader()?.searcher();
|
||||
|
||||
let field_searcher = searcher.field(text_field)?;
|
||||
let mut term_it = field_searcher.terms();
|
||||
let mut term_string = String::new();
|
||||
while term_it.advance() {
|
||||
//let term = Term::from_bytes(term_it.key());
|
||||
term_string.push_str(str::from_utf8(term_it.key()).expect("test"));
|
||||
}
|
||||
assert_eq!(&*term_string, "abcdef");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_term_dictionary_stream() -> crate::Result<()> {
|
||||
let ids: Vec<_> = (0u32..10_000u32)
|
||||
.map(|i| (format!("doc{:0>6}", i), i))
|
||||
.collect();
|
||||
let buffer: Vec<u8> = {
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
|
||||
for &(ref id, ref i) in &ids {
|
||||
term_dictionary_builder
|
||||
.insert(id.as_bytes(), &make_term_info(*i as u64))
|
||||
.unwrap();
|
||||
}
|
||||
term_dictionary_builder.finish().unwrap()
|
||||
};
|
||||
let term_file = FileSlice::from(buffer);
|
||||
let term_dictionary: TermDictionary = TermDictionary::open(term_file)?;
|
||||
{
|
||||
let mut streamer = term_dictionary.stream();
|
||||
let mut i = 0;
|
||||
while let Some((streamer_k, streamer_v)) = streamer.next() {
|
||||
let &(ref key, ref v) = &ids[i];
|
||||
assert_eq!(streamer_k.as_ref(), key.as_bytes());
|
||||
assert_eq!(streamer_v, &make_term_info(*v as u64));
|
||||
i += 1;
|
||||
}
|
||||
}
|
||||
|
||||
let &(ref key, ref val) = &ids[2047];
|
||||
assert_eq!(
|
||||
term_dictionary.get(key.as_bytes()),
|
||||
Some(make_term_info(*val as u64))
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_stream_high_range_prefix_suffix() -> crate::Result<()> {
|
||||
let buffer: Vec<u8> = {
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
|
||||
// term requires more than 16bits
|
||||
term_dictionary_builder.insert("abcdefghijklmnopqrstuvwxy", &make_term_info(1))?;
|
||||
term_dictionary_builder.insert("abcdefghijklmnopqrstuvwxyz", &make_term_info(2))?;
|
||||
term_dictionary_builder.insert("abr", &make_term_info(3))?;
|
||||
term_dictionary_builder.finish()?
|
||||
};
|
||||
let term_dict_file = FileSlice::from(buffer);
|
||||
let term_dictionary: TermDictionary = TermDictionary::open(term_dict_file)?;
|
||||
let mut kv_stream = term_dictionary.stream();
|
||||
assert!(kv_stream.advance());
|
||||
assert_eq!(kv_stream.key(), "abcdefghijklmnopqrstuvwxy".as_bytes());
|
||||
assert_eq!(kv_stream.value(), &make_term_info(1));
|
||||
assert!(kv_stream.advance());
|
||||
assert_eq!(kv_stream.key(), "abcdefghijklmnopqrstuvwxyz".as_bytes());
|
||||
assert_eq!(kv_stream.value(), &make_term_info(2));
|
||||
assert!(kv_stream.advance());
|
||||
assert_eq!(kv_stream.key(), "abr".as_bytes());
|
||||
assert_eq!(kv_stream.value(), &make_term_info(3));
|
||||
assert!(!kv_stream.advance());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_stream_range() -> crate::Result<()> {
|
||||
let ids: Vec<_> = (0u32..10_000u32)
|
||||
.map(|i| (format!("doc{:0>6}", i), i))
|
||||
.collect();
|
||||
let buffer: Vec<u8> = {
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
|
||||
for &(ref id, ref i) in &ids {
|
||||
term_dictionary_builder
|
||||
.insert(id.as_bytes(), &make_term_info(*i as u64))
|
||||
.unwrap();
|
||||
}
|
||||
term_dictionary_builder.finish().unwrap()
|
||||
};
|
||||
|
||||
let file = FileSlice::from(buffer);
|
||||
|
||||
let term_dictionary: TermDictionary = TermDictionary::open(file)?;
|
||||
{
|
||||
for i in (0..20).chain(6000..8_000) {
|
||||
let &(ref target_key, _) = &ids[i];
|
||||
let mut streamer = term_dictionary
|
||||
.range()
|
||||
.ge(target_key.as_bytes())
|
||||
.into_stream();
|
||||
for j in 0..3 {
|
||||
let (streamer_k, streamer_v) = streamer.next().unwrap();
|
||||
let &(ref key, ref v) = &ids[i + j];
|
||||
assert_eq!(str::from_utf8(streamer_k.as_ref()).unwrap(), key);
|
||||
assert_eq!(streamer_v.doc_freq, *v);
|
||||
assert_eq!(streamer_v, &make_term_info(*v as u64));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
for i in (0..20).chain(BLOCK_SIZE - 10..BLOCK_SIZE + 10) {
|
||||
let &(ref target_key, _) = &ids[i];
|
||||
let mut streamer = term_dictionary
|
||||
.range()
|
||||
.gt(target_key.as_bytes())
|
||||
.into_stream();
|
||||
for j in 0..3 {
|
||||
let (streamer_k, streamer_v) = streamer.next().unwrap();
|
||||
let &(ref key, ref v) = &ids[i + j + 1];
|
||||
assert_eq!(streamer_k.as_ref(), key.as_bytes());
|
||||
assert_eq!(streamer_v.doc_freq, *v);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
for i in (0..20).chain(BLOCK_SIZE - 10..BLOCK_SIZE + 10) {
|
||||
for j in 0..3 {
|
||||
let &(ref fst_key, _) = &ids[i];
|
||||
let &(ref last_key, _) = &ids[i + j];
|
||||
let mut streamer = term_dictionary
|
||||
.range()
|
||||
.ge(fst_key.as_bytes())
|
||||
.lt(last_key.as_bytes())
|
||||
.into_stream();
|
||||
for _ in 0..j {
|
||||
assert!(streamer.next().is_some());
|
||||
}
|
||||
assert!(streamer.next().is_none());
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_empty_string() -> crate::Result<()> {
|
||||
let buffer: Vec<u8> = {
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
|
||||
term_dictionary_builder
|
||||
.insert(&[], &make_term_info(1 as u64))
|
||||
.unwrap();
|
||||
term_dictionary_builder
|
||||
.insert(&[1u8], &make_term_info(2 as u64))
|
||||
.unwrap();
|
||||
term_dictionary_builder.finish().unwrap()
|
||||
};
|
||||
let file = FileSlice::from(buffer);
|
||||
let term_dictionary: TermDictionary = TermDictionary::open(file)?;
|
||||
let mut stream = term_dictionary.stream();
|
||||
assert!(stream.advance());
|
||||
assert!(stream.key().is_empty());
|
||||
assert!(stream.advance());
|
||||
assert_eq!(stream.key(), &[1u8]);
|
||||
assert!(!stream.advance());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_stream_range_boundaries() -> crate::Result<()> {
|
||||
let buffer: Vec<u8> = {
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(Vec::new())?;
|
||||
for i in 0u8..10u8 {
|
||||
let number_arr = [i; 1];
|
||||
term_dictionary_builder.insert(&number_arr, &make_term_info(i as u64))?;
|
||||
}
|
||||
term_dictionary_builder.finish()?
|
||||
};
|
||||
let file = FileSlice::from(buffer);
|
||||
let term_dictionary: TermDictionary = TermDictionary::open(file)?;
|
||||
|
||||
let value_list = |mut streamer: TermStreamer<'_>, backwards: bool| {
|
||||
let mut res: Vec<u32> = vec![];
|
||||
while let Some((_, ref v)) = streamer.next() {
|
||||
res.push(v.doc_freq);
|
||||
}
|
||||
if backwards {
|
||||
res.reverse();
|
||||
}
|
||||
res
|
||||
};
|
||||
{
|
||||
let range = term_dictionary.range().backward().into_stream();
|
||||
assert_eq!(
|
||||
value_list(range, true),
|
||||
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().ge([2u8]).into_stream();
|
||||
assert_eq!(
|
||||
value_list(range, false),
|
||||
vec![2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().ge([2u8]).backward().into_stream();
|
||||
assert_eq!(
|
||||
value_list(range, true),
|
||||
vec![2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().gt([2u8]).into_stream();
|
||||
assert_eq!(
|
||||
value_list(range, false),
|
||||
vec![3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().gt([2u8]).backward().into_stream();
|
||||
assert_eq!(
|
||||
value_list(range, true),
|
||||
vec![3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().lt([6u8]).into_stream();
|
||||
assert_eq!(
|
||||
value_list(range, false),
|
||||
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().lt([6u8]).backward().into_stream();
|
||||
assert_eq!(
|
||||
value_list(range, true),
|
||||
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().le([6u8]).into_stream();
|
||||
assert_eq!(
|
||||
value_list(range, false),
|
||||
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().le([6u8]).backward().into_stream();
|
||||
assert_eq!(
|
||||
value_list(range, true),
|
||||
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().ge([0u8]).lt([5u8]).into_stream();
|
||||
assert_eq!(value_list(range, false), vec![0u32, 1u32, 2u32, 3u32, 4u32]);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary
|
||||
.range()
|
||||
.ge([0u8])
|
||||
.lt([5u8])
|
||||
.backward()
|
||||
.into_stream();
|
||||
assert_eq!(value_list(range, true), vec![0u32, 1u32, 2u32, 3u32, 4u32]);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_automaton_search() -> crate::Result<()> {
|
||||
use crate::query::DFAWrapper;
|
||||
use levenshtein_automata::LevenshteinAutomatonBuilder;
|
||||
|
||||
const COUNTRIES: [&'static str; 7] = [
|
||||
"San Marino",
|
||||
"Serbia",
|
||||
"Slovakia",
|
||||
"Slovenia",
|
||||
"Spain",
|
||||
"Sweden",
|
||||
"Switzerland",
|
||||
];
|
||||
|
||||
let directory = RAMDirectory::create();
|
||||
let path = PathBuf::from("TermDictionary");
|
||||
{
|
||||
let write = directory.open_write(&path)?;
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(write)?;
|
||||
for term in COUNTRIES.iter() {
|
||||
term_dictionary_builder.insert(term.as_bytes(), &make_term_info(0u64))?;
|
||||
}
|
||||
term_dictionary_builder.finish()?;
|
||||
}
|
||||
let file = directory.open_read(&path)?;
|
||||
let term_dict: TermDictionary = TermDictionary::open(file)?;
|
||||
|
||||
// We can now build an entire dfa.
|
||||
let lev_automaton_builder = LevenshteinAutomatonBuilder::new(2, true);
|
||||
let automaton = DFAWrapper(lev_automaton_builder.build_dfa("Spaen"));
|
||||
|
||||
let mut range = term_dict.search(automaton).into_stream();
|
||||
|
||||
// get the first finding
|
||||
assert!(range.advance());
|
||||
assert_eq!("Spain".as_bytes(), range.key());
|
||||
assert!(!range.advance());
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
/// `TermStreamer` acts as a cursor over a range of terms of a segment.
|
||||
/// Terms are guaranteed to be sorted.
|
||||
pub type TermStreamer<'a, A = AlwaysMatch> = self::termdict::TermStreamer<'a, A>;
|
||||
|
||||
431
src/termdict/tests.rs
Normal file
431
src/termdict/tests.rs
Normal file
@@ -0,0 +1,431 @@
|
||||
use super::{TermDictionary, TermDictionaryBuilder, TermStreamer};
|
||||
|
||||
use crate::directory::{Directory, FileSlice, RAMDirectory, TerminatingWrite};
|
||||
use crate::postings::TermInfo;
|
||||
|
||||
use std::path::PathBuf;
|
||||
use std::str;
|
||||
|
||||
const BLOCK_SIZE: usize = 1_500;
|
||||
|
||||
fn make_term_info(term_ord: u64) -> TermInfo {
|
||||
let offset = |term_ord: u64| term_ord * 100 + term_ord * term_ord;
|
||||
TermInfo {
|
||||
doc_freq: term_ord as u32,
|
||||
postings_start_offset: offset(term_ord),
|
||||
postings_stop_offset: offset(term_ord + 1),
|
||||
positions_idx: offset(term_ord) * 2u64,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_empty_term_dictionary() {
|
||||
let empty = TermDictionary::empty();
|
||||
assert!(empty.stream().unwrap().next().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_term_ordinals() -> crate::Result<()> {
|
||||
const COUNTRIES: [&'static str; 7] = [
|
||||
"San Marino",
|
||||
"Serbia",
|
||||
"Slovakia",
|
||||
"Slovenia",
|
||||
"Spain",
|
||||
"Sweden",
|
||||
"Switzerland",
|
||||
];
|
||||
let directory = RAMDirectory::create();
|
||||
let path = PathBuf::from("TermDictionary");
|
||||
{
|
||||
let write = directory.open_write(&path)?;
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(write)?;
|
||||
for term in COUNTRIES.iter() {
|
||||
term_dictionary_builder.insert(term.as_bytes(), &make_term_info(0u64))?;
|
||||
}
|
||||
term_dictionary_builder.finish()?.terminate()?;
|
||||
}
|
||||
let term_file = directory.open_read(&path)?;
|
||||
let term_dict: TermDictionary = TermDictionary::open(term_file)?;
|
||||
for (term_ord, term) in COUNTRIES.iter().enumerate() {
|
||||
assert_eq!(term_dict.term_ord(term)?, Some(term_ord as u64));
|
||||
let mut bytes = vec![];
|
||||
assert!(term_dict.ord_to_term(term_ord as u64, &mut bytes)?);
|
||||
assert_eq!(bytes, term.as_bytes());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_term_dictionary_simple() -> crate::Result<()> {
|
||||
let directory = RAMDirectory::create();
|
||||
let path = PathBuf::from("TermDictionary");
|
||||
{
|
||||
let write = directory.open_write(&path)?;
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(write)?;
|
||||
term_dictionary_builder.insert("abc".as_bytes(), &make_term_info(34u64))?;
|
||||
term_dictionary_builder.insert("abcd".as_bytes(), &make_term_info(346u64))?;
|
||||
term_dictionary_builder.finish()?.terminate()?;
|
||||
}
|
||||
let file = directory.open_read(&path)?;
|
||||
let term_dict: TermDictionary = TermDictionary::open(file)?;
|
||||
assert_eq!(term_dict.get("abc")?.unwrap().doc_freq, 34u32);
|
||||
assert_eq!(term_dict.get("abcd")?.unwrap().doc_freq, 346u32);
|
||||
let mut stream = term_dict.stream()?;
|
||||
{
|
||||
{
|
||||
let (k, v) = stream.next().unwrap();
|
||||
assert_eq!(k.as_ref(), "abc".as_bytes());
|
||||
assert_eq!(v.doc_freq, 34u32);
|
||||
}
|
||||
assert_eq!(stream.key(), "abc".as_bytes());
|
||||
assert_eq!(stream.value().doc_freq, 34u32);
|
||||
}
|
||||
{
|
||||
{
|
||||
let (k, v) = stream.next().unwrap();
|
||||
assert_eq!(k, "abcd".as_bytes());
|
||||
assert_eq!(v.doc_freq, 346u32);
|
||||
}
|
||||
assert_eq!(stream.key(), "abcd".as_bytes());
|
||||
assert_eq!(stream.value().doc_freq, 346u32);
|
||||
}
|
||||
assert!(!stream.advance());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_term_dictionary_stream() -> crate::Result<()> {
|
||||
let ids: Vec<_> = (0u32..10_000u32)
|
||||
.map(|i| (format!("doc{:0>6}", i), i))
|
||||
.collect();
|
||||
let buffer: Vec<u8> = {
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
|
||||
for &(ref id, ref i) in &ids {
|
||||
term_dictionary_builder
|
||||
.insert(id.as_bytes(), &make_term_info(*i as u64))
|
||||
.unwrap();
|
||||
}
|
||||
term_dictionary_builder.finish()?
|
||||
};
|
||||
let term_file = FileSlice::from(buffer);
|
||||
let term_dictionary: TermDictionary = TermDictionary::open(term_file)?;
|
||||
{
|
||||
let mut streamer = term_dictionary.stream()?;
|
||||
let mut i = 0;
|
||||
while let Some((streamer_k, streamer_v)) = streamer.next() {
|
||||
let &(ref key, ref v) = &ids[i];
|
||||
assert_eq!(streamer_k.as_ref(), key.as_bytes());
|
||||
assert_eq!(streamer_v, &make_term_info(*v as u64));
|
||||
i += 1;
|
||||
}
|
||||
}
|
||||
|
||||
let &(ref key, ref val) = &ids[2047];
|
||||
assert_eq!(
|
||||
term_dictionary.get(key.as_bytes())?,
|
||||
Some(make_term_info(*val as u64))
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_stream_high_range_prefix_suffix() -> crate::Result<()> {
|
||||
let buffer: Vec<u8> = {
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
|
||||
// term requires more than 16bits
|
||||
term_dictionary_builder.insert("abcdefghijklmnopqrstuvwxy", &make_term_info(1))?;
|
||||
term_dictionary_builder.insert("abcdefghijklmnopqrstuvwxyz", &make_term_info(2))?;
|
||||
term_dictionary_builder.insert("abr", &make_term_info(3))?;
|
||||
term_dictionary_builder.finish()?
|
||||
};
|
||||
let term_dict_file = FileSlice::from(buffer);
|
||||
let term_dictionary: TermDictionary = TermDictionary::open(term_dict_file)?;
|
||||
let mut kv_stream = term_dictionary.stream()?;
|
||||
assert!(kv_stream.advance());
|
||||
assert_eq!(kv_stream.key(), "abcdefghijklmnopqrstuvwxy".as_bytes());
|
||||
assert_eq!(kv_stream.value(), &make_term_info(1));
|
||||
assert!(kv_stream.advance());
|
||||
assert_eq!(kv_stream.key(), "abcdefghijklmnopqrstuvwxyz".as_bytes());
|
||||
assert_eq!(kv_stream.value(), &make_term_info(2));
|
||||
assert!(kv_stream.advance());
|
||||
assert_eq!(kv_stream.key(), "abr".as_bytes());
|
||||
assert_eq!(kv_stream.value(), &make_term_info(3));
|
||||
assert!(!kv_stream.advance());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_stream_range() -> crate::Result<()> {
|
||||
let ids: Vec<_> = (0u32..10_000u32)
|
||||
.map(|i| (format!("doc{:0>6}", i), i))
|
||||
.collect();
|
||||
let buffer: Vec<u8> = {
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
|
||||
for &(ref id, ref i) in &ids {
|
||||
term_dictionary_builder
|
||||
.insert(id.as_bytes(), &make_term_info(*i as u64))
|
||||
.unwrap();
|
||||
}
|
||||
term_dictionary_builder.finish()?
|
||||
};
|
||||
|
||||
let file = FileSlice::from(buffer);
|
||||
|
||||
let term_dictionary: TermDictionary = TermDictionary::open(file)?;
|
||||
{
|
||||
for i in (0..20).chain(6000..8_000) {
|
||||
let &(ref target_key, _) = &ids[i];
|
||||
let mut streamer = term_dictionary
|
||||
.range()
|
||||
.ge(target_key.as_bytes())
|
||||
.into_stream()?;
|
||||
for j in 0..3 {
|
||||
let (streamer_k, streamer_v) = streamer.next().unwrap();
|
||||
let &(ref key, ref v) = &ids[i + j];
|
||||
assert_eq!(str::from_utf8(streamer_k.as_ref()).unwrap(), key);
|
||||
assert_eq!(streamer_v.doc_freq, *v);
|
||||
assert_eq!(streamer_v, &make_term_info(*v as u64));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
for i in (0..20).chain(BLOCK_SIZE - 10..BLOCK_SIZE + 10) {
|
||||
let &(ref target_key, _) = &ids[i];
|
||||
let mut streamer = term_dictionary
|
||||
.range()
|
||||
.gt(target_key.as_bytes())
|
||||
.into_stream()?;
|
||||
for j in 0..3 {
|
||||
let (streamer_k, streamer_v) = streamer.next().unwrap();
|
||||
let &(ref key, ref v) = &ids[i + j + 1];
|
||||
assert_eq!(streamer_k.as_ref(), key.as_bytes());
|
||||
assert_eq!(streamer_v.doc_freq, *v);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
for i in (0..20).chain(BLOCK_SIZE - 10..BLOCK_SIZE + 10) {
|
||||
for j in 0..3 {
|
||||
let &(ref fst_key, _) = &ids[i];
|
||||
let &(ref last_key, _) = &ids[i + j];
|
||||
let mut streamer = term_dictionary
|
||||
.range()
|
||||
.ge(fst_key.as_bytes())
|
||||
.lt(last_key.as_bytes())
|
||||
.into_stream()?;
|
||||
for _ in 0..j {
|
||||
assert!(streamer.next().is_some());
|
||||
}
|
||||
assert!(streamer.next().is_none());
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_empty_string() -> crate::Result<()> {
|
||||
let buffer: Vec<u8> = {
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
|
||||
term_dictionary_builder
|
||||
.insert(&[], &make_term_info(1 as u64))
|
||||
.unwrap();
|
||||
term_dictionary_builder
|
||||
.insert(&[1u8], &make_term_info(2 as u64))
|
||||
.unwrap();
|
||||
term_dictionary_builder.finish()?
|
||||
};
|
||||
let file = FileSlice::from(buffer);
|
||||
let term_dictionary: TermDictionary = TermDictionary::open(file)?;
|
||||
let mut stream = term_dictionary.stream()?;
|
||||
assert!(stream.advance());
|
||||
assert!(stream.key().is_empty());
|
||||
assert!(stream.advance());
|
||||
assert_eq!(stream.key(), &[1u8]);
|
||||
assert!(!stream.advance());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn stream_range_test_dict() -> crate::Result<TermDictionary> {
|
||||
let buffer: Vec<u8> = {
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(Vec::new())?;
|
||||
for i in 0u8..10u8 {
|
||||
let number_arr = [i; 1];
|
||||
term_dictionary_builder.insert(&number_arr, &make_term_info(i as u64))?;
|
||||
}
|
||||
term_dictionary_builder.finish()?
|
||||
};
|
||||
let file = FileSlice::from(buffer);
|
||||
TermDictionary::open(file)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_stream_range_boundaries_forward() -> crate::Result<()> {
|
||||
let term_dictionary = stream_range_test_dict()?;
|
||||
let value_list = |mut streamer: TermStreamer<'_>| {
|
||||
let mut res: Vec<u32> = vec![];
|
||||
while let Some((_, ref v)) = streamer.next() {
|
||||
res.push(v.doc_freq);
|
||||
}
|
||||
res
|
||||
};
|
||||
{
|
||||
let range = term_dictionary.range().ge([2u8]).into_stream()?;
|
||||
assert_eq!(
|
||||
value_list(range),
|
||||
vec![2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().gt([2u8]).into_stream()?;
|
||||
assert_eq!(
|
||||
value_list(range),
|
||||
vec![3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().lt([6u8]).into_stream()?;
|
||||
assert_eq!(value_list(range), vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32]);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().le([6u8]).into_stream()?;
|
||||
assert_eq!(
|
||||
value_list(range),
|
||||
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().ge([0u8]).lt([5u8]).into_stream()?;
|
||||
assert_eq!(value_list(range), vec![0u32, 1u32, 2u32, 3u32, 4u32]);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_stream_range_boundaries_backward() -> crate::Result<()> {
|
||||
let term_dictionary = stream_range_test_dict()?;
|
||||
let value_list_backward = |mut streamer: TermStreamer<'_>| {
|
||||
let mut res: Vec<u32> = vec![];
|
||||
while let Some((_, ref v)) = streamer.next() {
|
||||
res.push(v.doc_freq);
|
||||
}
|
||||
res.reverse();
|
||||
res
|
||||
};
|
||||
{
|
||||
let range = term_dictionary.range().backward().into_stream()?;
|
||||
assert_eq!(
|
||||
value_list_backward(range),
|
||||
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().ge([2u8]).backward().into_stream()?;
|
||||
assert_eq!(
|
||||
value_list_backward(range),
|
||||
vec![2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().gt([2u8]).backward().into_stream()?;
|
||||
assert_eq!(
|
||||
value_list_backward(range),
|
||||
vec![3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().lt([6u8]).backward().into_stream()?;
|
||||
assert_eq!(
|
||||
value_list_backward(range),
|
||||
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().le([6u8]).backward().into_stream()?;
|
||||
assert_eq!(
|
||||
value_list_backward(range),
|
||||
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary
|
||||
.range()
|
||||
.ge([0u8])
|
||||
.lt([5u8])
|
||||
.backward()
|
||||
.into_stream()?;
|
||||
assert_eq!(
|
||||
value_list_backward(range),
|
||||
vec![0u32, 1u32, 2u32, 3u32, 4u32]
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_ord_to_term() -> crate::Result<()> {
|
||||
let termdict = stream_range_test_dict()?;
|
||||
let mut bytes = vec![];
|
||||
for b in 0u8..10u8 {
|
||||
termdict.ord_to_term(b as u64, &mut bytes)?;
|
||||
assert_eq!(&bytes, &[b]);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_stream_term_ord() -> crate::Result<()> {
|
||||
let termdict = stream_range_test_dict()?;
|
||||
let mut stream = termdict.stream()?;
|
||||
for b in 0u8..10u8 {
|
||||
assert!(stream.advance(), true);
|
||||
assert_eq!(stream.term_ord(), b as u64);
|
||||
assert_eq!(stream.key(), &[b]);
|
||||
}
|
||||
assert!(!stream.advance());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_automaton_search() -> crate::Result<()> {
|
||||
use crate::query::DFAWrapper;
|
||||
use levenshtein_automata::LevenshteinAutomatonBuilder;
|
||||
|
||||
const COUNTRIES: [&'static str; 7] = [
|
||||
"San Marino",
|
||||
"Serbia",
|
||||
"Slovakia",
|
||||
"Slovenia",
|
||||
"Spain",
|
||||
"Sweden",
|
||||
"Switzerland",
|
||||
];
|
||||
|
||||
let directory = RAMDirectory::create();
|
||||
let path = PathBuf::from("TermDictionary");
|
||||
{
|
||||
let write = directory.open_write(&path)?;
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(write)?;
|
||||
for term in COUNTRIES.iter() {
|
||||
term_dictionary_builder.insert(term.as_bytes(), &make_term_info(0u64))?;
|
||||
}
|
||||
term_dictionary_builder.finish()?.terminate()?;
|
||||
}
|
||||
let file = directory.open_read(&path)?;
|
||||
let term_dict: TermDictionary = TermDictionary::open(file)?;
|
||||
|
||||
// We can now build an entire dfa.
|
||||
let lev_automaton_builder = LevenshteinAutomatonBuilder::new(2, true);
|
||||
let automaton = DFAWrapper(lev_automaton_builder.build_dfa("Spaen"));
|
||||
|
||||
let mut range = term_dict.search(automaton).into_stream()?;
|
||||
|
||||
// get the first finding
|
||||
assert!(range.advance());
|
||||
assert_eq!("Spain".as_bytes(), range.key());
|
||||
assert!(!range.advance());
|
||||
Ok(())
|
||||
}
|
||||
Reference in New Issue
Block a user