mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-07 09:32:54 +00:00
Compare commits
21 Commits
issue/938
...
barrotstei
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4e93c681c0 | ||
|
|
5a8546bb5a | ||
|
|
f0c1867637 | ||
|
|
35fbf738c9 | ||
|
|
6f919c61c7 | ||
|
|
86efdb778c | ||
|
|
960c2ee39d | ||
|
|
6653ed8eb6 | ||
|
|
e20eedd98b | ||
|
|
90c1fdefdc | ||
|
|
3e27d4c211 | ||
|
|
4dc268482f | ||
|
|
4f20dd410e | ||
|
|
b1125638f4 | ||
|
|
a4c95852e5 | ||
|
|
b28be75728 | ||
|
|
af7cb3ff0f | ||
|
|
30d86c653a | ||
|
|
6f4c051700 | ||
|
|
212e091553 | ||
|
|
146058bdbf |
@@ -8,7 +8,6 @@ Tantivy 0.14.0
|
||||
- Added helper for building intersections and unions in BooleanQuery (@guilload)
|
||||
- Bugfix in `Query::explain`
|
||||
- Removed dependency on `notify` #924. Replaced with `FileWatcher` struct that polls meta file every 500ms in background thread. (@halvorboe @guilload)
|
||||
- Added `FilterCollector`, which wraps another collector and filters docs using a predicate over a fast field (@barrotsteindev)
|
||||
|
||||
Tantivy 0.13.2
|
||||
===================
|
||||
|
||||
@@ -26,7 +26,6 @@ snap = "1"
|
||||
tempfile = {version="3", optional=true}
|
||||
log = "0.4"
|
||||
serde = {version="1", features=["derive"]}
|
||||
serde_cbor = "0.11"
|
||||
serde_json = "1"
|
||||
num_cpus = "1"
|
||||
fs2={version="0.4", optional=true}
|
||||
|
||||
@@ -61,7 +61,7 @@ fn main() -> tantivy::Result<()> {
|
||||
|
||||
let query_ords: HashSet<u64> = facets
|
||||
.iter()
|
||||
.filter_map(|key| facet_dict.term_ord(key.encoded_str()).unwrap())
|
||||
.filter_map(|key| facet_dict.term_ord(key.encoded_str()))
|
||||
.collect();
|
||||
|
||||
let mut facet_ords_buffer: Vec<u64> = Vec::with_capacity(20);
|
||||
|
||||
@@ -274,7 +274,7 @@ impl Collector for FacetCollector {
|
||||
let mut collapse_facet_it = self.facets.iter().peekable();
|
||||
collapse_facet_ords.push(0);
|
||||
{
|
||||
let mut facet_streamer = facet_reader.facet_dict().range().into_stream()?;
|
||||
let mut facet_streamer = facet_reader.facet_dict().range().into_stream();
|
||||
if facet_streamer.advance() {
|
||||
'outer: loop {
|
||||
// at the begining of this loop, facet_streamer
|
||||
@@ -368,12 +368,9 @@ impl SegmentCollector for FacetSegmentCollector {
|
||||
}
|
||||
let mut facet = vec![];
|
||||
let facet_ord = self.collapse_facet_ords[collapsed_facet_ord];
|
||||
// TODO handle errors.
|
||||
if facet_dict.ord_to_term(facet_ord as u64, &mut facet).is_ok() {
|
||||
if let Ok(facet) = Facet::from_encoded(facet) {
|
||||
facet_counts.insert(facet, count);
|
||||
}
|
||||
}
|
||||
facet_dict.ord_to_term(facet_ord as u64, &mut facet);
|
||||
// TODO
|
||||
facet_counts.insert(Facet::from_encoded(facet).unwrap(), count);
|
||||
}
|
||||
FacetCounts { facet_counts }
|
||||
}
|
||||
|
||||
@@ -41,37 +41,33 @@ use crate::{Score, SegmentReader, TantivyError};
|
||||
///
|
||||
/// let query_parser = QueryParser::for_index(&index, vec![title]);
|
||||
/// let query = query_parser.parse_query("diary").unwrap();
|
||||
/// let no_filter_collector = FilterCollector::new(price, &|value| value > 20_120u64, TopDocs::with_limit(2));
|
||||
/// let no_filter_collector = FilterCollector::new(price, &|value| true, TopDocs::with_limit(2));
|
||||
/// let top_docs = searcher.search(&query, &no_filter_collector).unwrap();
|
||||
///
|
||||
/// assert_eq!(top_docs.len(), 1);
|
||||
/// assert_eq!(top_docs.len(), 2);
|
||||
/// assert_eq!(top_docs[0].1, DocAddress(0, 1));
|
||||
/// assert_eq!(top_docs[1].1, DocAddress(0, 3));
|
||||
///
|
||||
/// let filter_all_collector = FilterCollector::new(price, &|value| value < 5u64, TopDocs::with_limit(2));
|
||||
/// let filter_all_collector = FilterCollector::new(price, &|value| false, TopDocs::with_limit(2));
|
||||
/// let filtered_top_docs = searcher.search(&query, &filter_all_collector).unwrap();
|
||||
///
|
||||
/// assert_eq!(filtered_top_docs.len(), 0);
|
||||
/// ```
|
||||
pub struct FilterCollector<TCollector, TPredicate>
|
||||
where
|
||||
TPredicate: 'static,
|
||||
{
|
||||
pub struct FilterCollector<TCollector> {
|
||||
field: Field,
|
||||
collector: TCollector,
|
||||
predicate: &'static TPredicate,
|
||||
predicate: &'static (dyn Fn(u64) -> bool + Send + Sync),
|
||||
}
|
||||
|
||||
impl<TCollector, TPredicate> FilterCollector<TCollector, TPredicate>
|
||||
impl<TCollector> FilterCollector<TCollector>
|
||||
where
|
||||
TCollector: Collector + Send + Sync,
|
||||
TPredicate: Fn(u64) -> bool + Send + Sync,
|
||||
{
|
||||
/// Create a new FilterCollector.
|
||||
pub fn new(
|
||||
field: Field,
|
||||
predicate: &'static TPredicate,
|
||||
predicate: &'static (dyn Fn(u64) -> bool + Send + Sync),
|
||||
collector: TCollector,
|
||||
) -> FilterCollector<TCollector, TPredicate> {
|
||||
) -> FilterCollector<TCollector> {
|
||||
FilterCollector {
|
||||
field,
|
||||
predicate,
|
||||
@@ -80,22 +76,21 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<TCollector, TPredicate> Collector for FilterCollector<TCollector, TPredicate>
|
||||
impl<TCollector> Collector for FilterCollector<TCollector>
|
||||
where
|
||||
TCollector: Collector + Send + Sync,
|
||||
TPredicate: 'static + Fn(u64) -> bool + Send + Sync,
|
||||
{
|
||||
// That's the type of our result.
|
||||
// Our standard deviation will be a float.
|
||||
type Fruit = TCollector::Fruit;
|
||||
|
||||
type Child = FilterSegmentCollector<TCollector::Child, TPredicate>;
|
||||
type Child = FilterSegmentCollector<TCollector::Child>;
|
||||
|
||||
fn for_segment(
|
||||
&self,
|
||||
segment_local_id: u32,
|
||||
segment_reader: &SegmentReader,
|
||||
) -> crate::Result<FilterSegmentCollector<TCollector::Child, TPredicate>> {
|
||||
) -> crate::Result<FilterSegmentCollector<TCollector::Child>> {
|
||||
let fast_field_reader = segment_reader
|
||||
.fast_fields()
|
||||
.u64(self.field)
|
||||
@@ -111,7 +106,7 @@ where
|
||||
.for_segment(segment_local_id, segment_reader)?;
|
||||
Ok(FilterSegmentCollector {
|
||||
fast_field_reader,
|
||||
segment_collector,
|
||||
segment_collector: segment_collector,
|
||||
predicate: self.predicate,
|
||||
})
|
||||
}
|
||||
@@ -128,20 +123,15 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FilterSegmentCollector<TSegmentCollector, TPredicate>
|
||||
where
|
||||
TPredicate: 'static,
|
||||
{
|
||||
pub struct FilterSegmentCollector<TSegmentCollector> {
|
||||
fast_field_reader: FastFieldReader<u64>,
|
||||
segment_collector: TSegmentCollector,
|
||||
predicate: &'static TPredicate,
|
||||
predicate: &'static (dyn Fn(u64) -> bool + Send + Sync),
|
||||
}
|
||||
|
||||
impl<TSegmentCollector, TPredicate> SegmentCollector
|
||||
for FilterSegmentCollector<TSegmentCollector, TPredicate>
|
||||
impl<TSegmentCollector> SegmentCollector for FilterSegmentCollector<TSegmentCollector>
|
||||
where
|
||||
TSegmentCollector: SegmentCollector,
|
||||
TPredicate: 'static + Fn(u64) -> bool + Send + Sync,
|
||||
{
|
||||
type Fruit = TSegmentCollector::Fruit;
|
||||
|
||||
|
||||
@@ -728,7 +728,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_top_collector_not_at_capacity_without_offset() {
|
||||
fn test_top_collector_not_at_capacity() {
|
||||
let index = make_index();
|
||||
let field = index.schema().get_field("text").unwrap();
|
||||
let query_parser = QueryParser::for_index(&index, vec![field]);
|
||||
|
||||
@@ -20,10 +20,9 @@ impl<W: Write> CountingWriter<W> {
|
||||
self.written_bytes
|
||||
}
|
||||
|
||||
/// Returns the underlying write object.
|
||||
/// Note that this method does not trigger any flushing.
|
||||
pub fn finish(self) -> W {
|
||||
self.underlying
|
||||
pub fn finish(mut self) -> io::Result<(W, u64)> {
|
||||
self.flush()?;
|
||||
Ok((self.underlying, self.written_bytes))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -47,6 +46,7 @@ impl<W: Write> Write for CountingWriter<W> {
|
||||
|
||||
impl<W: TerminatingWrite> TerminatingWrite for CountingWriter<W> {
|
||||
fn terminate_ref(&mut self, token: AntiCallToken) -> io::Result<()> {
|
||||
self.flush()?;
|
||||
self.underlying.terminate_ref(token)
|
||||
}
|
||||
}
|
||||
@@ -63,9 +63,8 @@ mod test {
|
||||
let mut counting_writer = CountingWriter::wrap(buffer);
|
||||
let bytes = (0u8..10u8).collect::<Vec<u8>>();
|
||||
counting_writer.write_all(&bytes).unwrap();
|
||||
let len = counting_writer.written_bytes();
|
||||
let buffer_restituted: Vec<u8> = counting_writer.finish();
|
||||
let (w, len): (Vec<u8>, u64) = counting_writer.finish().unwrap();
|
||||
assert_eq!(len, 10u64);
|
||||
assert_eq!(buffer_restituted.len(), 10);
|
||||
assert_eq!(w.len(), 10);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -511,28 +511,28 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_index_manual_policy_mmap() -> crate::Result<()> {
|
||||
fn test_index_manual_policy_mmap() {
|
||||
let schema = throw_away_schema();
|
||||
let field = schema.get_field("num_likes").unwrap();
|
||||
let mut index = Index::create_from_tempdir(schema)?;
|
||||
let mut writer = index.writer_for_tests()?;
|
||||
writer.commit()?;
|
||||
let mut index = Index::create_from_tempdir(schema).unwrap();
|
||||
let mut writer = index.writer_for_tests().unwrap();
|
||||
writer.commit().unwrap();
|
||||
let reader = index
|
||||
.reader_builder()
|
||||
.reload_policy(ReloadPolicy::Manual)
|
||||
.try_into()?;
|
||||
.try_into()
|
||||
.unwrap();
|
||||
assert_eq!(reader.searcher().num_docs(), 0);
|
||||
writer.add_document(doc!(field=>1u64));
|
||||
let (sender, receiver) = crossbeam::channel::unbounded();
|
||||
let _handle = index.directory_mut().watch(WatchCallback::new(move || {
|
||||
let _ = sender.send(());
|
||||
}));
|
||||
writer.commit()?;
|
||||
writer.commit().unwrap();
|
||||
assert!(receiver.recv().is_ok());
|
||||
assert_eq!(reader.searcher().num_docs(), 0);
|
||||
reader.reload()?;
|
||||
reader.reload().unwrap();
|
||||
assert_eq!(reader.searcher().num_docs(), 1);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -66,7 +66,7 @@ impl InvertedIndexReader {
|
||||
}
|
||||
|
||||
/// Returns the term info associated with the term.
|
||||
pub fn get_term_info(&self, term: &Term) -> io::Result<Option<TermInfo>> {
|
||||
pub fn get_term_info(&self, term: &Term) -> Option<TermInfo> {
|
||||
self.termdict.get(term.value_bytes())
|
||||
}
|
||||
|
||||
@@ -106,9 +106,10 @@ impl InvertedIndexReader {
|
||||
term: &Term,
|
||||
option: IndexRecordOption,
|
||||
) -> io::Result<Option<BlockSegmentPostings>> {
|
||||
self.get_term_info(term)?
|
||||
Ok(self
|
||||
.get_term_info(term)
|
||||
.map(move |term_info| self.read_block_postings_from_terminfo(&term_info, option))
|
||||
.transpose()
|
||||
.transpose()?)
|
||||
}
|
||||
|
||||
/// Returns a block postings given a `term_info`.
|
||||
@@ -180,7 +181,7 @@ impl InvertedIndexReader {
|
||||
term: &Term,
|
||||
option: IndexRecordOption,
|
||||
) -> io::Result<Option<SegmentPostings>> {
|
||||
self.get_term_info(term)?
|
||||
self.get_term_info(term)
|
||||
.map(move |term_info| self.read_postings_from_terminfo(&term_info, option))
|
||||
.transpose()
|
||||
}
|
||||
@@ -190,7 +191,7 @@ impl InvertedIndexReader {
|
||||
term: &Term,
|
||||
option: IndexRecordOption,
|
||||
) -> io::Result<Option<SegmentPostings>> {
|
||||
self.get_term_info(term)?
|
||||
self.get_term_info(term)
|
||||
.map(|term_info| self.read_postings_from_terminfo(&term_info, option))
|
||||
.transpose()
|
||||
}
|
||||
@@ -198,7 +199,7 @@ impl InvertedIndexReader {
|
||||
/// Returns the number of documents containing the term.
|
||||
pub fn doc_freq(&self, term: &Term) -> io::Result<u32> {
|
||||
Ok(self
|
||||
.get_term_info(term)?
|
||||
.get_term_info(term)
|
||||
.map(|term_info| term_info.doc_freq)
|
||||
.unwrap_or(0u32))
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@ pub use self::executor::Executor;
|
||||
pub use self::index::Index;
|
||||
pub use self::index_meta::{IndexMeta, SegmentMeta, SegmentMetaInventory};
|
||||
pub use self::inverted_index_reader::InvertedIndexReader;
|
||||
pub use self::searcher::Searcher;
|
||||
pub use self::searcher::{FieldSearcher, Searcher};
|
||||
pub use self::segment::Segment;
|
||||
pub use self::segment::SerializableSegment;
|
||||
pub use self::segment_component::SegmentComponent;
|
||||
|
||||
@@ -1,16 +1,17 @@
|
||||
use crate::collector::Collector;
|
||||
use crate::core::Executor;
|
||||
|
||||
use crate::core::InvertedIndexReader;
|
||||
use crate::core::SegmentReader;
|
||||
use crate::query::Query;
|
||||
use crate::schema::Document;
|
||||
use crate::schema::Schema;
|
||||
use crate::schema::Term;
|
||||
use crate::schema::{Field, Term};
|
||||
use crate::space_usage::SearcherSpaceUsage;
|
||||
use crate::store::StoreReader;
|
||||
use crate::termdict::TermMerger;
|
||||
use crate::DocAddress;
|
||||
use crate::Index;
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::{fmt, io};
|
||||
|
||||
/// Holds a list of `SegmentReader`s ready for search.
|
||||
@@ -147,6 +148,16 @@ impl Searcher {
|
||||
collector.merge_fruits(fruits)
|
||||
}
|
||||
|
||||
/// Return the field searcher associated to a `Field`.
|
||||
pub fn field(&self, field: Field) -> crate::Result<FieldSearcher> {
|
||||
let inv_index_readers: Vec<Arc<InvertedIndexReader>> = self
|
||||
.segment_readers
|
||||
.iter()
|
||||
.map(|segment_reader| segment_reader.inverted_index(field))
|
||||
.collect::<crate::Result<Vec<_>>>()?;
|
||||
Ok(FieldSearcher::new(inv_index_readers))
|
||||
}
|
||||
|
||||
/// Summarize total space usage of this searcher.
|
||||
pub fn space_usage(&self) -> io::Result<SearcherSpaceUsage> {
|
||||
let mut space_usage = SearcherSpaceUsage::new();
|
||||
@@ -157,6 +168,32 @@ impl Searcher {
|
||||
}
|
||||
}
|
||||
|
||||
/// **Experimental API** `FieldSearcher` only gives access to a stream over the terms of a field.
|
||||
pub struct FieldSearcher {
|
||||
inv_index_readers: Vec<Arc<InvertedIndexReader>>,
|
||||
}
|
||||
|
||||
impl FieldSearcher {
|
||||
fn new(inv_index_readers: Vec<Arc<InvertedIndexReader>>) -> FieldSearcher {
|
||||
FieldSearcher { inv_index_readers }
|
||||
}
|
||||
|
||||
/// Returns a Stream over all of the sorted unique terms of
|
||||
/// for the given field.
|
||||
///
|
||||
/// This method does not take into account which documents are deleted, so
|
||||
/// in presence of deletes some terms may not actually exist in any document
|
||||
/// anymore.
|
||||
pub fn terms(&self) -> TermMerger {
|
||||
let term_streamers: Vec<_> = self
|
||||
.inv_index_readers
|
||||
.iter()
|
||||
.map(|inverted_index| inverted_index.terms().stream())
|
||||
.collect();
|
||||
TermMerger::new(term_streamers)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for Searcher {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
let segment_ids = self
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
use crate::directory::directory_lock::Lock;
|
||||
use crate::directory::error::LockError;
|
||||
use crate::directory::error::{DeleteError, OpenReadError, OpenWriteError};
|
||||
use crate::directory::WatchCallback;
|
||||
use crate::directory::WatchHandle;
|
||||
use crate::directory::{FileHandle, WatchCallback};
|
||||
use crate::directory::{FileSlice, WritePtr};
|
||||
use std::fmt;
|
||||
use std::io;
|
||||
@@ -108,13 +108,10 @@ fn retry_policy(is_blocking: bool) -> RetryPolicy {
|
||||
/// should be your default choice.
|
||||
/// - The [`RAMDirectory`](struct.RAMDirectory.html), which
|
||||
/// should be used mostly for tests.
|
||||
///
|
||||
pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
|
||||
/// Opens a file and returns a boxed `FileHandle`.
|
||||
/// Opens a virtual file for read.
|
||||
///
|
||||
/// Users of `Directory` should typically call `Directory::open_read(...)`,
|
||||
/// while `Directory` implementor should implement `get_file_handle()`.
|
||||
fn get_file_handle(&self, path: &Path) -> Result<Box<dyn FileHandle>, OpenReadError>;
|
||||
|
||||
/// Once a virtual file is open, its data may not
|
||||
/// change.
|
||||
///
|
||||
@@ -122,10 +119,7 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
|
||||
/// have no effect on the returned `FileSlice` object.
|
||||
///
|
||||
/// You should only use this to read files create with [Directory::open_write].
|
||||
fn open_read(&self, path: &Path) -> Result<FileSlice, OpenReadError> {
|
||||
let file_handle = self.get_file_handle(path)?;
|
||||
Ok(FileSlice::new(file_handle))
|
||||
}
|
||||
fn open_read(&self, path: &Path) -> Result<FileSlice, OpenReadError>;
|
||||
|
||||
/// Removes a file
|
||||
///
|
||||
|
||||
@@ -2,11 +2,10 @@ use stable_deref_trait::StableDeref;
|
||||
|
||||
use crate::common::HasLen;
|
||||
use crate::directory::OwnedBytes;
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::sync::Arc;
|
||||
use std::{io, ops::Deref};
|
||||
|
||||
pub type ArcBytes = Arc<dyn Deref<Target = [u8]> + Send + Sync + 'static>;
|
||||
pub type WeakArcBytes = Weak<dyn Deref<Target = [u8]> + Send + Sync + 'static>;
|
||||
pub type BoxedData = Box<dyn Deref<Target = [u8]> + Send + Sync + 'static>;
|
||||
|
||||
/// Objects that represents files sections in tantivy.
|
||||
///
|
||||
@@ -41,7 +40,7 @@ where
|
||||
B: StableDeref + Deref<Target = [u8]> + 'static + Send + Sync,
|
||||
{
|
||||
fn from(bytes: B) -> FileSlice {
|
||||
FileSlice::new(Box::new(OwnedBytes::new(bytes)))
|
||||
FileSlice::new(OwnedBytes::new(bytes))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -51,25 +50,22 @@ where
|
||||
///
|
||||
#[derive(Clone)]
|
||||
pub struct FileSlice {
|
||||
data: Arc<dyn FileHandle>,
|
||||
data: Arc<Box<dyn FileHandle>>,
|
||||
start: usize,
|
||||
stop: usize,
|
||||
}
|
||||
|
||||
impl FileSlice {
|
||||
/// Wraps a FileHandle.
|
||||
pub fn new(file_handle: Box<dyn FileHandle>) -> Self {
|
||||
let num_bytes = file_handle.len();
|
||||
FileSlice::new_with_num_bytes(file_handle, num_bytes)
|
||||
}
|
||||
|
||||
/// Wraps a FileHandle.
|
||||
#[doc(hidden)]
|
||||
pub fn new_with_num_bytes(file_handle: Box<dyn FileHandle>, num_bytes: usize) -> Self {
|
||||
pub fn new<D>(data: D) -> Self
|
||||
where
|
||||
D: FileHandle,
|
||||
{
|
||||
let len = data.len();
|
||||
FileSlice {
|
||||
data: Arc::from(file_handle),
|
||||
data: Arc::new(Box::new(data)),
|
||||
start: 0,
|
||||
stop: num_bytes,
|
||||
stop: len,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -150,12 +146,6 @@ impl FileSlice {
|
||||
}
|
||||
}
|
||||
|
||||
impl FileHandle for FileSlice {
|
||||
fn read_bytes(&self, from: usize, to: usize) -> io::Result<OwnedBytes> {
|
||||
self.read_bytes_slice(from, to)
|
||||
}
|
||||
}
|
||||
|
||||
impl HasLen for FileSlice {
|
||||
fn len(&self) -> usize {
|
||||
self.stop - self.start
|
||||
@@ -170,7 +160,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_file_slice() -> io::Result<()> {
|
||||
let file_slice = FileSlice::new(Box::new(b"abcdef".as_ref()));
|
||||
let file_slice = FileSlice::new(b"abcdef".as_ref());
|
||||
assert_eq!(file_slice.len(), 6);
|
||||
assert_eq!(file_slice.slice_from(2).read_bytes()?.as_slice(), b"cdef");
|
||||
assert_eq!(file_slice.slice_to(2).read_bytes()?.as_slice(), b"ab");
|
||||
@@ -214,7 +204,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_slice_simple_read() -> io::Result<()> {
|
||||
let slice = FileSlice::new(Box::new(&b"abcdef"[..]));
|
||||
let slice = FileSlice::new(&b"abcdef"[..]);
|
||||
assert_eq!(slice.len(), 6);
|
||||
assert_eq!(slice.read_bytes()?.as_ref(), b"abcdef");
|
||||
assert_eq!(slice.slice(1, 4).read_bytes()?.as_ref(), b"bcd");
|
||||
@@ -223,7 +213,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_slice_read_slice() -> io::Result<()> {
|
||||
let slice_deref = FileSlice::new(Box::new(&b"abcdef"[..]));
|
||||
let slice_deref = FileSlice::new(&b"abcdef"[..]);
|
||||
assert_eq!(slice_deref.read_bytes_slice(1, 4)?.as_ref(), b"bcd");
|
||||
Ok(())
|
||||
}
|
||||
@@ -231,14 +221,14 @@ mod tests {
|
||||
#[test]
|
||||
#[should_panic(expected = "assertion failed: from <= to")]
|
||||
fn test_slice_read_slice_invalid_range() {
|
||||
let slice_deref = FileSlice::new(Box::new(&b"abcdef"[..]));
|
||||
let slice_deref = FileSlice::new(&b"abcdef"[..]);
|
||||
assert_eq!(slice_deref.read_bytes_slice(1, 0).unwrap().as_ref(), b"bcd");
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic(expected = "`to` exceeds the fileslice length")]
|
||||
fn test_slice_read_slice_invalid_range_exceeds() {
|
||||
let slice_deref = FileSlice::new(Box::new(&b"abcdef"[..]));
|
||||
let slice_deref = FileSlice::new(&b"abcdef"[..]);
|
||||
assert_eq!(
|
||||
slice_deref.read_bytes_slice(0, 10).unwrap().as_ref(),
|
||||
b"bcd"
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
use crate::core::{MANAGED_FILEPATH, META_FILEPATH};
|
||||
use crate::directory::error::{DeleteError, LockError, OpenReadError, OpenWriteError};
|
||||
use crate::directory::footer::{Footer, FooterProxy};
|
||||
use crate::directory::DirectoryLock;
|
||||
use crate::directory::GarbageCollectionResult;
|
||||
use crate::directory::Lock;
|
||||
use crate::directory::META_LOCK;
|
||||
use crate::directory::{DirectoryLock, FileHandle};
|
||||
use crate::directory::{FileSlice, WritePtr};
|
||||
use crate::directory::{WatchCallback, WatchHandle};
|
||||
use crate::error::DataCorruption;
|
||||
@@ -274,11 +274,6 @@ impl ManagedDirectory {
|
||||
}
|
||||
|
||||
impl Directory for ManagedDirectory {
|
||||
fn get_file_handle(&self, path: &Path) -> Result<Box<dyn FileHandle>, OpenReadError> {
|
||||
let file_slice = self.open_read(path)?;
|
||||
Ok(Box::new(file_slice))
|
||||
}
|
||||
|
||||
fn open_read(&self, path: &Path) -> result::Result<FileSlice, OpenReadError> {
|
||||
let file_slice = self.directory.open_read(path)?;
|
||||
let (footer, reader) = Footer::extract_footer(file_slice)
|
||||
|
||||
@@ -2,13 +2,14 @@ use crate::core::META_FILEPATH;
|
||||
use crate::directory::error::LockError;
|
||||
use crate::directory::error::{DeleteError, OpenDirectoryError, OpenReadError, OpenWriteError};
|
||||
use crate::directory::file_watcher::FileWatcher;
|
||||
use crate::directory::AntiCallToken;
|
||||
use crate::directory::BoxedData;
|
||||
use crate::directory::Directory;
|
||||
use crate::directory::DirectoryLock;
|
||||
use crate::directory::FileSlice;
|
||||
use crate::directory::Lock;
|
||||
use crate::directory::WatchCallback;
|
||||
use crate::directory::WatchHandle;
|
||||
use crate::directory::{AntiCallToken, FileHandle, OwnedBytes};
|
||||
use crate::directory::{ArcBytes, WeakArcBytes};
|
||||
use crate::directory::{TerminatingWrite, WritePtr};
|
||||
use fs2::FileExt;
|
||||
use memmap::Mmap;
|
||||
@@ -24,6 +25,7 @@ use std::path::{Path, PathBuf};
|
||||
use std::result;
|
||||
use std::sync::Arc;
|
||||
use std::sync::RwLock;
|
||||
use std::sync::Weak;
|
||||
use std::{collections::HashMap, ops::Deref};
|
||||
use tempfile::TempDir;
|
||||
|
||||
@@ -76,7 +78,7 @@ pub struct CacheInfo {
|
||||
|
||||
struct MmapCache {
|
||||
counters: CacheCounters,
|
||||
cache: HashMap<PathBuf, WeakArcBytes>,
|
||||
cache: HashMap<PathBuf, Weak<BoxedData>>,
|
||||
}
|
||||
|
||||
impl Default for MmapCache {
|
||||
@@ -110,7 +112,7 @@ impl MmapCache {
|
||||
}
|
||||
|
||||
// Returns None if the file exists but as a len of 0 (and hence is not mmappable).
|
||||
fn get_mmap(&mut self, full_path: &Path) -> Result<Option<ArcBytes>, OpenReadError> {
|
||||
fn get_mmap(&mut self, full_path: &Path) -> Result<Option<Arc<BoxedData>>, OpenReadError> {
|
||||
if let Some(mmap_weak) = self.cache.get(full_path) {
|
||||
if let Some(mmap_arc) = mmap_weak.upgrade() {
|
||||
self.counters.hit += 1;
|
||||
@@ -121,7 +123,7 @@ impl MmapCache {
|
||||
self.counters.miss += 1;
|
||||
let mmap_opt = open_mmap(full_path)?;
|
||||
Ok(mmap_opt.map(|mmap| {
|
||||
let mmap_arc: ArcBytes = Arc::new(mmap);
|
||||
let mmap_arc: Arc<BoxedData> = Arc::new(Box::new(mmap));
|
||||
let mmap_weak = Arc::downgrade(&mmap_arc);
|
||||
self.cache.insert(full_path.to_owned(), mmap_weak);
|
||||
mmap_arc
|
||||
@@ -159,7 +161,7 @@ impl MmapDirectoryInner {
|
||||
mmap_cache: Default::default(),
|
||||
_temp_directory: temp_directory,
|
||||
watcher: FileWatcher::new(&root_path.join(*META_FILEPATH)),
|
||||
root_path,
|
||||
root_path: root_path,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -314,7 +316,7 @@ impl TerminatingWrite for SafeFileWriter {
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct MmapArc(Arc<dyn Deref<Target = [u8]> + Send + Sync>);
|
||||
struct MmapArc(Arc<Box<dyn Deref<Target = [u8]> + Send + Sync>>);
|
||||
|
||||
impl Deref for MmapArc {
|
||||
type Target = [u8];
|
||||
@@ -344,7 +346,7 @@ pub(crate) fn atomic_write(path: &Path, content: &[u8]) -> io::Result<()> {
|
||||
}
|
||||
|
||||
impl Directory for MmapDirectory {
|
||||
fn get_file_handle(&self, path: &Path) -> result::Result<Box<dyn FileHandle>, OpenReadError> {
|
||||
fn open_read(&self, path: &Path) -> result::Result<FileSlice, OpenReadError> {
|
||||
debug!("Open Read {:?}", path);
|
||||
let full_path = self.resolve_path(path);
|
||||
|
||||
@@ -357,16 +359,11 @@ impl Directory for MmapDirectory {
|
||||
let io_err = make_io_err(msg);
|
||||
OpenReadError::wrap_io_error(io_err, path.to_path_buf())
|
||||
})?;
|
||||
|
||||
let owned_bytes = mmap_cache
|
||||
.get_mmap(&full_path)?
|
||||
.map(|mmap_arc| {
|
||||
let mmap_arc_obj = MmapArc(mmap_arc);
|
||||
OwnedBytes::new(mmap_arc_obj)
|
||||
})
|
||||
.unwrap_or_else(OwnedBytes::empty);
|
||||
|
||||
Ok(Box::new(owned_bytes))
|
||||
if let Some(mmap_arc) = mmap_cache.get_mmap(&full_path)? {
|
||||
Ok(FileSlice::from(MmapArc(mmap_arc)))
|
||||
} else {
|
||||
Ok(FileSlice::empty())
|
||||
}
|
||||
}
|
||||
|
||||
/// Any entry associated to the path in the mmap will be
|
||||
@@ -449,8 +446,7 @@ impl Directory for MmapDirectory {
|
||||
fn atomic_write(&self, path: &Path, content: &[u8]) -> io::Result<()> {
|
||||
debug!("Atomic Write {:?}", path);
|
||||
let full_path = self.resolve_path(path);
|
||||
atomic_write(&full_path, content)?;
|
||||
self.sync_directory()
|
||||
atomic_write(&full_path, content)
|
||||
}
|
||||
|
||||
fn acquire_lock(&self, lock: &Lock) -> Result<DirectoryLock, LockError> {
|
||||
|
||||
@@ -23,7 +23,7 @@ pub mod error;
|
||||
pub use self::directory::DirectoryLock;
|
||||
pub use self::directory::{Directory, DirectoryClone};
|
||||
pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK};
|
||||
pub(crate) use self::file_slice::{ArcBytes, WeakArcBytes};
|
||||
pub(crate) use self::file_slice::BoxedData;
|
||||
pub use self::file_slice::{FileHandle, FileSlice};
|
||||
pub use self::owned_bytes::OwnedBytes;
|
||||
pub use self::ram_directory::RAMDirectory;
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use crate::directory::FileHandle;
|
||||
use stable_deref_trait::StableDeref;
|
||||
use std::convert::TryInto;
|
||||
use std::mem;
|
||||
use std::ops::Deref;
|
||||
use std::sync::Arc;
|
||||
@@ -96,24 +95,6 @@ impl OwnedBytes {
|
||||
pub fn advance(&mut self, advance_len: usize) {
|
||||
self.data = &self.data[advance_len..]
|
||||
}
|
||||
|
||||
/// Reads an `u8` from the `OwnedBytes` and advance by one byte.
|
||||
pub fn read_u8(&mut self) -> u8 {
|
||||
assert!(!self.is_empty());
|
||||
|
||||
let byte = self.as_slice()[0];
|
||||
self.advance(1);
|
||||
byte
|
||||
}
|
||||
|
||||
/// Reads an `u64` encoded as little-endian from the `OwnedBytes` and advance by 8 bytes.
|
||||
pub fn read_u64(&mut self) -> u64 {
|
||||
assert!(self.len() > 7);
|
||||
|
||||
let octlet: [u8; 8] = self.as_slice()[..8].try_into().unwrap();
|
||||
self.advance(8);
|
||||
u64::from_le_bytes(octlet)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for OwnedBytes {
|
||||
@@ -249,22 +230,6 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_owned_bytes_read_u8() -> io::Result<()> {
|
||||
let mut bytes = OwnedBytes::new(b"\xFF".as_ref());
|
||||
assert_eq!(bytes.read_u8(), 255);
|
||||
assert_eq!(bytes.len(), 0);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_owned_bytes_read_u64() -> io::Result<()> {
|
||||
let mut bytes = OwnedBytes::new(b"\0\xFF\xFF\xFF\xFF\xFF\xFF\xFF".as_ref());
|
||||
assert_eq!(bytes.read_u64(), u64::MAX - 255);
|
||||
assert_eq!(bytes.len(), 0);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_owned_bytes_split() {
|
||||
let bytes = OwnedBytes::new(b"abcdefghi".as_ref());
|
||||
|
||||
@@ -12,8 +12,6 @@ use std::path::{Path, PathBuf};
|
||||
use std::result;
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use super::FileHandle;
|
||||
|
||||
/// Writer associated with the `RAMDirectory`
|
||||
///
|
||||
/// The Writer just writes a buffer.
|
||||
@@ -165,11 +163,6 @@ impl RAMDirectory {
|
||||
}
|
||||
|
||||
impl Directory for RAMDirectory {
|
||||
fn get_file_handle(&self, path: &Path) -> Result<Box<dyn FileHandle>, OpenReadError> {
|
||||
let file_slice = self.open_read(path)?;
|
||||
Ok(Box::new(file_slice))
|
||||
}
|
||||
|
||||
fn open_read(&self, path: &Path) -> result::Result<FileSlice, OpenReadError> {
|
||||
self.fs.read().unwrap().open_read(path)
|
||||
}
|
||||
|
||||
@@ -6,12 +6,12 @@ use std::sync::Weak;
|
||||
|
||||
/// Cloneable wrapper for callbacks registered when watching files of a `Directory`.
|
||||
#[derive(Clone)]
|
||||
pub struct WatchCallback(Arc<dyn Fn() + Sync + Send>);
|
||||
pub struct WatchCallback(Arc<Box<dyn Fn() + Sync + Send>>);
|
||||
|
||||
impl WatchCallback {
|
||||
/// Wraps a `Fn()` to create a WatchCallback.
|
||||
pub fn new<F: Fn() + Sync + Send + 'static>(op: F) -> Self {
|
||||
WatchCallback(Arc::new(op))
|
||||
WatchCallback(Arc::new(Box::new(op)))
|
||||
}
|
||||
|
||||
fn call(&self) {
|
||||
|
||||
@@ -10,7 +10,7 @@ use std::borrow::BorrowMut;
|
||||
pub const TERMINATED: DocId = std::i32::MAX as u32;
|
||||
|
||||
/// Represents an iterable set of sorted doc ids.
|
||||
pub trait DocSet: Send {
|
||||
pub trait DocSet {
|
||||
/// Goes to the next element.
|
||||
///
|
||||
/// The DocId of the next element is returned.
|
||||
@@ -129,14 +129,6 @@ impl<'a> DocSet for &'a mut dyn DocSet {
|
||||
fn size_hint(&self) -> u32 {
|
||||
(**self).size_hint()
|
||||
}
|
||||
|
||||
fn count(&mut self, delete_bitset: &DeleteBitSet) -> u32 {
|
||||
(**self).count(delete_bitset)
|
||||
}
|
||||
|
||||
fn count_including_deleted(&mut self) -> u32 {
|
||||
(**self).count_including_deleted()
|
||||
}
|
||||
}
|
||||
|
||||
impl<TDocSet: DocSet + ?Sized> DocSet for Box<TDocSet> {
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
use super::MultiValueIntFastFieldReader;
|
||||
use crate::error::DataCorruption;
|
||||
use crate::schema::Facet;
|
||||
use crate::termdict::TermDictionary;
|
||||
use crate::termdict::TermOrdinal;
|
||||
@@ -63,13 +62,12 @@ impl FacetReader {
|
||||
&mut self,
|
||||
facet_ord: TermOrdinal,
|
||||
output: &mut Facet,
|
||||
) -> crate::Result<()> {
|
||||
) -> Result<(), str::Utf8Error> {
|
||||
let found_term = self
|
||||
.term_dict
|
||||
.ord_to_term(facet_ord as u64, &mut self.buffer)?;
|
||||
.ord_to_term(facet_ord as u64, &mut self.buffer);
|
||||
assert!(found_term, "Term ordinal {} no found.", facet_ord);
|
||||
let facet_str = str::from_utf8(&self.buffer[..])
|
||||
.map_err(|utf8_err| DataCorruption::comment_only(utf8_err.to_string()))?;
|
||||
let facet_str = str::from_utf8(&self.buffer[..])?;
|
||||
output.set_facet_str(facet_str);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -61,38 +61,16 @@ impl FieldNormReaders {
|
||||
/// precompute computationally expensive functions of the fieldnorm
|
||||
/// in a very short array.
|
||||
#[derive(Clone)]
|
||||
pub struct FieldNormReader(ReaderImplEnum);
|
||||
|
||||
impl From<ReaderImplEnum> for FieldNormReader {
|
||||
fn from(reader_enum: ReaderImplEnum) -> FieldNormReader {
|
||||
FieldNormReader(reader_enum)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
enum ReaderImplEnum {
|
||||
FromData(OwnedBytes),
|
||||
Const {
|
||||
num_docs: u32,
|
||||
fieldnorm_id: u8,
|
||||
fieldnorm: u32,
|
||||
},
|
||||
pub struct FieldNormReader {
|
||||
data: OwnedBytes,
|
||||
}
|
||||
|
||||
impl FieldNormReader {
|
||||
/// Creates a `FieldNormReader` with a constant fieldnorm.
|
||||
///
|
||||
/// The fieldnorm will be subjected to compression as if it was coming
|
||||
/// from an array-backed fieldnorm reader.
|
||||
pub fn constant(num_docs: u32, fieldnorm: u32) -> FieldNormReader {
|
||||
let fieldnorm_id = fieldnorm_to_id(fieldnorm);
|
||||
let fieldnorm = id_to_fieldnorm(fieldnorm_id);
|
||||
ReaderImplEnum::Const {
|
||||
num_docs,
|
||||
fieldnorm_id,
|
||||
fieldnorm,
|
||||
}
|
||||
.into()
|
||||
let field_norms_data = OwnedBytes::new(vec![fieldnorm_id; num_docs as usize]);
|
||||
FieldNormReader::new(field_norms_data)
|
||||
}
|
||||
|
||||
/// Opens a field norm reader given its file.
|
||||
@@ -102,15 +80,12 @@ impl FieldNormReader {
|
||||
}
|
||||
|
||||
fn new(data: OwnedBytes) -> Self {
|
||||
ReaderImplEnum::FromData(data).into()
|
||||
FieldNormReader { data }
|
||||
}
|
||||
|
||||
/// Returns the number of documents in this segment.
|
||||
pub fn num_docs(&self) -> u32 {
|
||||
match &self.0 {
|
||||
ReaderImplEnum::FromData(data) => data.len() as u32,
|
||||
ReaderImplEnum::Const { num_docs, .. } => *num_docs,
|
||||
}
|
||||
self.data.len() as u32
|
||||
}
|
||||
|
||||
/// Returns the `fieldnorm` associated to a doc id.
|
||||
@@ -123,25 +98,14 @@ impl FieldNormReader {
|
||||
/// The fieldnorm is effectively decoded from the
|
||||
/// `fieldnorm_id` by doing a simple table lookup.
|
||||
pub fn fieldnorm(&self, doc_id: DocId) -> u32 {
|
||||
match &self.0 {
|
||||
ReaderImplEnum::FromData(data) => {
|
||||
let fieldnorm_id = data.as_slice()[doc_id as usize];
|
||||
id_to_fieldnorm(fieldnorm_id)
|
||||
}
|
||||
ReaderImplEnum::Const { fieldnorm, .. } => *fieldnorm,
|
||||
}
|
||||
let fieldnorm_id = self.fieldnorm_id(doc_id);
|
||||
id_to_fieldnorm(fieldnorm_id)
|
||||
}
|
||||
|
||||
/// Returns the `fieldnorm_id` associated to a document.
|
||||
#[inline(always)]
|
||||
pub fn fieldnorm_id(&self, doc_id: DocId) -> u8 {
|
||||
match &self.0 {
|
||||
ReaderImplEnum::FromData(data) => {
|
||||
let fieldnorm_id = data.as_slice()[doc_id as usize];
|
||||
fieldnorm_id
|
||||
}
|
||||
ReaderImplEnum::Const { fieldnorm_id, .. } => *fieldnorm_id,
|
||||
}
|
||||
self.data.as_slice()[doc_id as usize]
|
||||
}
|
||||
|
||||
/// Converts a `fieldnorm_id` into a fieldnorm.
|
||||
@@ -165,7 +129,9 @@ impl FieldNormReader {
|
||||
.map(FieldNormReader::fieldnorm_to_id)
|
||||
.collect::<Vec<u8>>();
|
||||
let field_norms_data = OwnedBytes::new(field_norms_id);
|
||||
FieldNormReader::new(field_norms_data)
|
||||
FieldNormReader {
|
||||
data: field_norms_data,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -184,20 +150,4 @@ mod tests {
|
||||
assert_eq!(fieldnorm_reader.fieldnorm(3), 4);
|
||||
assert_eq!(fieldnorm_reader.fieldnorm(4), 983_064);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_const_fieldnorm_reader_small_fieldnorm_id() {
|
||||
let fieldnorm_reader = FieldNormReader::constant(1_000_000u32, 10u32);
|
||||
assert_eq!(fieldnorm_reader.num_docs(), 1_000_000u32);
|
||||
assert_eq!(fieldnorm_reader.fieldnorm(0u32), 10u32);
|
||||
assert_eq!(fieldnorm_reader.fieldnorm_id(0u32), 10u8);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_const_fieldnorm_reader_large_fieldnorm_id() {
|
||||
let fieldnorm_reader = FieldNormReader::constant(1_000_000u32, 300u32);
|
||||
assert_eq!(fieldnorm_reader.num_docs(), 1_000_000u32);
|
||||
assert_eq!(fieldnorm_reader.fieldnorm(0u32), 280u32);
|
||||
assert_eq!(fieldnorm_reader.fieldnorm_id(0u32), 72u8);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -53,7 +53,7 @@ impl DeleteQueue {
|
||||
return block;
|
||||
}
|
||||
let block = Arc::new(Block {
|
||||
operations: Arc::new([]),
|
||||
operations: Arc::default(),
|
||||
next: NextBlock::from(self.clone()),
|
||||
});
|
||||
wlock.last_block = Arc::downgrade(&block);
|
||||
@@ -108,7 +108,7 @@ impl DeleteQueue {
|
||||
let delete_operations = mem::replace(&mut self_wlock.writer, vec![]);
|
||||
|
||||
let new_block = Arc::new(Block {
|
||||
operations: Arc::from(delete_operations.into_boxed_slice()),
|
||||
operations: Arc::new(delete_operations.into_boxed_slice()),
|
||||
next: NextBlock::from(self.clone()),
|
||||
});
|
||||
|
||||
@@ -167,7 +167,7 @@ impl NextBlock {
|
||||
}
|
||||
|
||||
struct Block {
|
||||
operations: Arc<[DeleteOperation]>,
|
||||
operations: Arc<Box<[DeleteOperation]>>,
|
||||
next: NextBlock,
|
||||
}
|
||||
|
||||
|
||||
@@ -449,7 +449,7 @@ impl IndexWriter {
|
||||
}
|
||||
|
||||
/// Accessor to the merge policy.
|
||||
pub fn get_merge_policy(&self) -> Arc<dyn MergePolicy> {
|
||||
pub fn get_merge_policy(&self) -> Arc<Box<dyn MergePolicy>> {
|
||||
self.segment_updater.get_merge_policy()
|
||||
}
|
||||
|
||||
|
||||
@@ -503,6 +503,7 @@ impl IndexMerger {
|
||||
let mut positions_buffer: Vec<u32> = Vec::with_capacity(1_000);
|
||||
let mut delta_computer = DeltaComputer::new();
|
||||
|
||||
let mut field_term_streams = Vec::new();
|
||||
let mut max_term_ords: Vec<TermOrdinal> = Vec::new();
|
||||
|
||||
let field_readers: Vec<Arc<InvertedIndexReader>> = self
|
||||
@@ -511,13 +512,10 @@ impl IndexMerger {
|
||||
.map(|reader| reader.inverted_index(indexed_field))
|
||||
.collect::<crate::Result<Vec<_>>>()?;
|
||||
|
||||
let mut field_term_streams = Vec::new();
|
||||
|
||||
for field_reader in &field_readers {
|
||||
let terms = field_reader.terms();
|
||||
field_term_streams.push(terms.stream());
|
||||
max_term_ords.push(terms.num_terms() as u64);
|
||||
let term_stream = terms.stream()?;
|
||||
field_term_streams.push(term_stream);
|
||||
}
|
||||
|
||||
let mut term_ord_mapping_opt = if *field_type == FieldType::HierarchicalFacet {
|
||||
|
||||
@@ -9,15 +9,6 @@ pub struct DeleteOperation {
|
||||
pub term: Term,
|
||||
}
|
||||
|
||||
impl Default for DeleteOperation {
|
||||
fn default() -> Self {
|
||||
DeleteOperation {
|
||||
opstamp: 0u64,
|
||||
term: Term::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Timestamped Add operation.
|
||||
#[derive(Eq, PartialEq, Debug)]
|
||||
pub struct AddOperation {
|
||||
|
||||
@@ -154,7 +154,7 @@ pub(crate) struct InnerSegmentUpdater {
|
||||
|
||||
index: Index,
|
||||
segment_manager: SegmentManager,
|
||||
merge_policy: RwLock<Arc<dyn MergePolicy>>,
|
||||
merge_policy: RwLock<Arc<Box<dyn MergePolicy>>>,
|
||||
killed: AtomicBool,
|
||||
stamper: Stamper,
|
||||
merge_operations: MergeOperationInventory,
|
||||
@@ -193,19 +193,19 @@ impl SegmentUpdater {
|
||||
merge_thread_pool,
|
||||
index,
|
||||
segment_manager,
|
||||
merge_policy: RwLock::new(Arc::new(DefaultMergePolicy::default())),
|
||||
merge_policy: RwLock::new(Arc::new(Box::new(DefaultMergePolicy::default()))),
|
||||
killed: AtomicBool::new(false),
|
||||
stamper,
|
||||
merge_operations: Default::default(),
|
||||
})))
|
||||
}
|
||||
|
||||
pub fn get_merge_policy(&self) -> Arc<dyn MergePolicy> {
|
||||
pub fn get_merge_policy(&self) -> Arc<Box<dyn MergePolicy>> {
|
||||
self.merge_policy.read().unwrap().clone()
|
||||
}
|
||||
|
||||
pub fn set_merge_policy(&self, merge_policy: Box<dyn MergePolicy>) {
|
||||
let arc_merge_policy = Arc::from(merge_policy);
|
||||
let arc_merge_policy = Arc::new(merge_policy);
|
||||
*self.merge_policy.write().unwrap() = arc_merge_policy;
|
||||
}
|
||||
|
||||
|
||||
@@ -160,7 +160,7 @@ pub use self::docset::{DocSet, TERMINATED};
|
||||
pub use crate::common::HasLen;
|
||||
pub use crate::common::{f64_to_u64, i64_to_u64, u64_to_f64, u64_to_i64};
|
||||
pub use crate::core::{Executor, SegmentComponent};
|
||||
pub use crate::core::{Index, IndexMeta, Searcher, Segment, SegmentId, SegmentMeta};
|
||||
pub use crate::core::{FieldSearcher, Index, IndexMeta, Searcher, Segment, SegmentId, SegmentMeta};
|
||||
pub use crate::core::{InvertedIndexReader, SegmentReader};
|
||||
pub use crate::directory::Directory;
|
||||
pub use crate::indexer::operation::UserOperation;
|
||||
|
||||
@@ -8,7 +8,7 @@ use std::io::{self, Write};
|
||||
pub struct PositionSerializer<W: io::Write> {
|
||||
bit_packer: BitPacker4x,
|
||||
write_stream: CountingWriter<W>,
|
||||
write_skip_index: W,
|
||||
write_skiplist: W,
|
||||
block: Vec<u32>,
|
||||
buffer: Vec<u8>,
|
||||
num_ints: u64,
|
||||
@@ -16,11 +16,11 @@ pub struct PositionSerializer<W: io::Write> {
|
||||
}
|
||||
|
||||
impl<W: io::Write> PositionSerializer<W> {
|
||||
pub fn new(write_stream: W, write_skip_index: W) -> PositionSerializer<W> {
|
||||
pub fn new(write_stream: W, write_skiplist: W) -> PositionSerializer<W> {
|
||||
PositionSerializer {
|
||||
bit_packer: BitPacker4x::new(),
|
||||
write_stream: CountingWriter::wrap(write_stream),
|
||||
write_skip_index,
|
||||
write_skiplist,
|
||||
block: Vec::with_capacity(128),
|
||||
buffer: vec![0u8; 128 * 4],
|
||||
num_ints: 0u64,
|
||||
@@ -52,7 +52,7 @@ impl<W: io::Write> PositionSerializer<W> {
|
||||
|
||||
fn flush_block(&mut self) -> io::Result<()> {
|
||||
let num_bits = self.bit_packer.num_bits(&self.block[..]);
|
||||
self.write_skip_index.write_all(&[num_bits])?;
|
||||
self.write_skiplist.write_all(&[num_bits])?;
|
||||
let written_len = self
|
||||
.bit_packer
|
||||
.compress(&self.block[..], &mut self.buffer, num_bits);
|
||||
@@ -70,10 +70,10 @@ impl<W: io::Write> PositionSerializer<W> {
|
||||
self.flush_block()?;
|
||||
}
|
||||
for &long_skip in &self.long_skips {
|
||||
long_skip.serialize(&mut self.write_skip_index)?;
|
||||
long_skip.serialize(&mut self.write_skiplist)?;
|
||||
}
|
||||
(self.long_skips.len() as u32).serialize(&mut self.write_skip_index)?;
|
||||
self.write_skip_index.flush()?;
|
||||
(self.long_skips.len() as u32).serialize(&mut self.write_skiplist)?;
|
||||
self.write_skiplist.flush()?;
|
||||
self.write_stream.flush()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -469,7 +469,7 @@ mod tests {
|
||||
let segment_reader = searcher.segment_reader(0);
|
||||
let inverted_index = segment_reader.inverted_index(int_field).unwrap();
|
||||
let term = Term::from_field_u64(int_field, 0u64);
|
||||
let term_info = inverted_index.get_term_info(&term).unwrap().unwrap();
|
||||
let term_info = inverted_index.get_term_info(&term).unwrap();
|
||||
inverted_index
|
||||
.read_block_postings_from_terminfo(&term_info, IndexRecordOption::Basic)
|
||||
.unwrap()
|
||||
@@ -513,7 +513,7 @@ mod tests {
|
||||
{
|
||||
let term = Term::from_field_u64(int_field, 0u64);
|
||||
let inverted_index = segment_reader.inverted_index(int_field)?;
|
||||
let term_info = inverted_index.get_term_info(&term)?.unwrap();
|
||||
let term_info = inverted_index.get_term_info(&term).unwrap();
|
||||
block_segments = inverted_index
|
||||
.read_block_postings_from_terminfo(&term_info, IndexRecordOption::Basic)?;
|
||||
}
|
||||
@@ -521,7 +521,7 @@ mod tests {
|
||||
{
|
||||
let term = Term::from_field_u64(int_field, 1u64);
|
||||
let inverted_index = segment_reader.inverted_index(int_field)?;
|
||||
let term_info = inverted_index.get_term_info(&term)?.unwrap();
|
||||
let term_info = inverted_index.get_term_info(&term).unwrap();
|
||||
inverted_index.reset_block_postings_from_terminfo(&term_info, &mut block_segments)?;
|
||||
}
|
||||
assert_eq!(block_segments.docs(), &[1, 3, 5]);
|
||||
|
||||
@@ -54,7 +54,7 @@ pub mod tests {
|
||||
use crate::DocId;
|
||||
use crate::HasLen;
|
||||
use crate::Score;
|
||||
use std::{iter, mem};
|
||||
use std::iter;
|
||||
|
||||
#[test]
|
||||
pub fn test_position_write() -> crate::Result<()> {
|
||||
@@ -71,7 +71,6 @@ pub mod tests {
|
||||
field_serializer.write_doc(doc_id, 4, &delta_positions)?;
|
||||
}
|
||||
field_serializer.close_term()?;
|
||||
mem::drop(field_serializer);
|
||||
posting_serializer.close()?;
|
||||
let read = segment.open_read(SegmentComponent::POSITIONS)?;
|
||||
assert!(read.len() <= 140);
|
||||
@@ -180,7 +179,7 @@ pub mod tests {
|
||||
let inverted_index = segment_reader.inverted_index(text_field)?;
|
||||
assert_eq!(inverted_index.terms().num_terms(), 1);
|
||||
let mut bytes = vec![];
|
||||
assert!(inverted_index.terms().ord_to_term(0, &mut bytes)?);
|
||||
assert!(inverted_index.terms().ord_to_term(0, &mut bytes));
|
||||
assert_eq!(&bytes, b"hello");
|
||||
}
|
||||
{
|
||||
@@ -192,7 +191,7 @@ pub mod tests {
|
||||
let inverted_index = segment_reader.inverted_index(text_field)?;
|
||||
assert_eq!(inverted_index.terms().num_terms(), 1);
|
||||
let mut bytes = vec![];
|
||||
assert!(inverted_index.terms().ord_to_term(0, &mut bytes)?);
|
||||
assert!(inverted_index.terms().ord_to_term(0, &mut bytes));
|
||||
assert_eq!(&bytes[..], ok_token_text.as_bytes());
|
||||
}
|
||||
Ok(())
|
||||
|
||||
@@ -7,7 +7,6 @@ use crate::schema::{Field, IndexRecordOption};
|
||||
use crate::termdict::{TermDictionary, TermStreamer};
|
||||
use crate::TantivyError;
|
||||
use crate::{DocId, Score};
|
||||
use std::io;
|
||||
use std::sync::Arc;
|
||||
use tantivy_fst::Automaton;
|
||||
|
||||
@@ -20,7 +19,6 @@ pub struct AutomatonWeight<A> {
|
||||
impl<A> AutomatonWeight<A>
|
||||
where
|
||||
A: Automaton + Send + Sync + 'static,
|
||||
A::State: Clone,
|
||||
{
|
||||
/// Create a new AutomationWeight
|
||||
pub fn new<IntoArcA: Into<Arc<A>>>(field: Field, automaton: IntoArcA) -> AutomatonWeight<A> {
|
||||
@@ -30,10 +28,7 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
fn automaton_stream<'a>(
|
||||
&'a self,
|
||||
term_dict: &'a TermDictionary,
|
||||
) -> io::Result<TermStreamer<'a, &'a A>> {
|
||||
fn automaton_stream<'a>(&'a self, term_dict: &'a TermDictionary) -> TermStreamer<'a, &'a A> {
|
||||
let automaton: &A = &*self.automaton;
|
||||
let term_stream_builder = term_dict.search(automaton);
|
||||
term_stream_builder.into_stream()
|
||||
@@ -43,14 +38,13 @@ where
|
||||
impl<A> Weight for AutomatonWeight<A>
|
||||
where
|
||||
A: Automaton + Send + Sync + 'static,
|
||||
A::State: Clone,
|
||||
{
|
||||
fn scorer(&self, reader: &SegmentReader, boost: Score) -> crate::Result<Box<dyn Scorer>> {
|
||||
let max_doc = reader.max_doc();
|
||||
let mut doc_bitset = BitSet::with_max_value(max_doc);
|
||||
let inverted_index = reader.inverted_index(self.field)?;
|
||||
let term_dict = inverted_index.terms();
|
||||
let mut term_stream = self.automaton_stream(term_dict)?;
|
||||
let mut term_stream = self.automaton_stream(term_dict);
|
||||
while term_stream.advance() {
|
||||
let term_info = term_stream.value();
|
||||
let mut block_segment_postings = inverted_index
|
||||
@@ -104,7 +98,6 @@ mod tests {
|
||||
index
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
enum State {
|
||||
Start,
|
||||
NotMatching,
|
||||
|
||||
@@ -106,7 +106,7 @@ impl BM25Weight {
|
||||
BM25Weight::new(idf_explain, avg_fieldnorm)
|
||||
}
|
||||
|
||||
pub(crate) fn new(idf_explain: Explanation, average_fieldnorm: Score) -> BM25Weight {
|
||||
fn new(idf_explain: Explanation, average_fieldnorm: Score) -> BM25Weight {
|
||||
let weight = idf_explain.value() * (1.0 + K1);
|
||||
BM25Weight {
|
||||
idf_explain,
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
use rayon::iter::IntoParallelRefIterator;
|
||||
|
||||
use crate::core::SegmentReader;
|
||||
use crate::postings::FreqReadingOption;
|
||||
use crate::query::explanation::does_not_match;
|
||||
@@ -24,7 +22,7 @@ enum SpecializedScorer {
|
||||
|
||||
fn scorer_union<TScoreCombiner>(scorers: Vec<Box<dyn Scorer>>) -> SpecializedScorer
|
||||
where
|
||||
TScoreCombiner: ScoreCombiner + Send,
|
||||
TScoreCombiner: ScoreCombiner,
|
||||
{
|
||||
assert!(!scorers.is_empty());
|
||||
if scorers.len() == 1 {
|
||||
@@ -54,7 +52,7 @@ where
|
||||
SpecializedScorer::Other(Box::new(Union::<_, TScoreCombiner>::from(scorers)))
|
||||
}
|
||||
|
||||
fn into_box_scorer<TScoreCombiner: ScoreCombiner + Send>(scorer: SpecializedScorer) -> Box<dyn Scorer> {
|
||||
fn into_box_scorer<TScoreCombiner: ScoreCombiner>(scorer: SpecializedScorer) -> Box<dyn Scorer> {
|
||||
match scorer {
|
||||
SpecializedScorer::TermUnion(term_scorers) => {
|
||||
let union_scorer = Union::<TermScorer, TScoreCombiner>::from(term_scorers);
|
||||
@@ -82,32 +80,18 @@ impl BooleanWeight {
|
||||
reader: &SegmentReader,
|
||||
boost: Score,
|
||||
) -> crate::Result<HashMap<Occur, Vec<Box<dyn Scorer>>>> {
|
||||
use rayon::iter::ParallelIterator;
|
||||
use rayon::iter::IndexedParallelIterator;
|
||||
let mut per_occur_scorers: HashMap<Occur, Vec<Box<dyn Scorer>>> = HashMap::new();
|
||||
let mut items_res: Vec<crate::Result<(Occur, Box<dyn Scorer>)>> = Vec::new();
|
||||
let pool = rayon::ThreadPoolBuilder::new().num_threads(self.weights.len()).build().unwrap();
|
||||
pool.install(|| {
|
||||
self.weights.iter()
|
||||
.collect::<Vec<_>>()
|
||||
.par_iter()
|
||||
.map(|(occur, subweight)| {
|
||||
let sub_scorer: Box<dyn Scorer> = subweight.scorer(reader, boost)?;
|
||||
Ok((*occur, sub_scorer))
|
||||
})
|
||||
.collect_into_vec(&mut items_res);
|
||||
});
|
||||
for item_res in items_res {
|
||||
let (occur, sub_scorer) = item_res?;
|
||||
for &(ref occur, ref subweight) in &self.weights {
|
||||
let sub_scorer: Box<dyn Scorer> = subweight.scorer(reader, boost)?;
|
||||
per_occur_scorers
|
||||
.entry(occur)
|
||||
.entry(*occur)
|
||||
.or_insert_with(Vec::new)
|
||||
.push(sub_scorer);
|
||||
}
|
||||
Ok(per_occur_scorers)
|
||||
}
|
||||
|
||||
fn complex_scorer<TScoreCombiner: ScoreCombiner >(
|
||||
fn complex_scorer<TScoreCombiner: ScoreCombiner>(
|
||||
&self,
|
||||
reader: &SegmentReader,
|
||||
boost: Score,
|
||||
|
||||
@@ -11,7 +11,6 @@ use crate::schema::{Field, IndexRecordOption, Term};
|
||||
use crate::termdict::{TermDictionary, TermStreamer};
|
||||
use crate::{DocId, Score};
|
||||
use std::collections::Bound;
|
||||
use std::io;
|
||||
use std::ops::Range;
|
||||
|
||||
fn map_bound<TFrom, TTo, Transform: Fn(&TFrom) -> TTo>(
|
||||
@@ -275,7 +274,7 @@ pub struct RangeWeight {
|
||||
}
|
||||
|
||||
impl RangeWeight {
|
||||
fn term_range<'a>(&self, term_dict: &'a TermDictionary) -> io::Result<TermStreamer<'a>> {
|
||||
fn term_range<'a>(&self, term_dict: &'a TermDictionary) -> TermStreamer<'a> {
|
||||
use std::collections::Bound::*;
|
||||
let mut term_stream_builder = term_dict.range();
|
||||
term_stream_builder = match self.left_bound {
|
||||
@@ -299,7 +298,7 @@ impl Weight for RangeWeight {
|
||||
|
||||
let inverted_index = reader.inverted_index(self.field)?;
|
||||
let term_dict = inverted_index.terms();
|
||||
let mut term_range = self.term_range(term_dict)?;
|
||||
let mut term_range = self.term_range(term_dict);
|
||||
while term_range.advance() {
|
||||
let term_info = term_range.value();
|
||||
let mut block_segment_postings = inverted_index
|
||||
|
||||
@@ -12,7 +12,7 @@ use std::marker::PhantomData;
|
||||
/// This is useful for queries like `+somethingrequired somethingoptional`.
|
||||
///
|
||||
/// Note that `somethingoptional` has no impact on the `DocSet`.
|
||||
pub struct RequiredOptionalScorer<TReqScorer, TOptScorer, TScoreCombiner: ScoreCombiner> {
|
||||
pub struct RequiredOptionalScorer<TReqScorer, TOptScorer, TScoreCombiner> {
|
||||
req_scorer: TReqScorer,
|
||||
opt_scorer: TOptScorer,
|
||||
score_cache: Option<Score>,
|
||||
@@ -23,7 +23,6 @@ impl<TReqScorer, TOptScorer, TScoreCombiner>
|
||||
RequiredOptionalScorer<TReqScorer, TOptScorer, TScoreCombiner>
|
||||
where
|
||||
TOptScorer: DocSet,
|
||||
TScoreCombiner: ScoreCombiner,
|
||||
{
|
||||
/// Creates a new `RequiredOptionalScorer`.
|
||||
pub fn new(
|
||||
@@ -44,7 +43,6 @@ impl<TReqScorer, TOptScorer, TScoreCombiner> DocSet
|
||||
where
|
||||
TReqScorer: DocSet,
|
||||
TOptScorer: DocSet,
|
||||
TScoreCombiner: ScoreCombiner,
|
||||
{
|
||||
fn advance(&mut self) -> DocId {
|
||||
self.score_cache = None;
|
||||
|
||||
@@ -3,7 +3,7 @@ use crate::Score;
|
||||
|
||||
/// The `ScoreCombiner` trait defines how to compute
|
||||
/// an overall score given a list of scores.
|
||||
pub trait ScoreCombiner: Default + Clone + Send + Copy + 'static {
|
||||
pub trait ScoreCombiner: Default + Clone + Copy + 'static {
|
||||
/// Aggregates the score combiner with the given scorer.
|
||||
///
|
||||
/// The `ScoreCombiner` may decide to call `.scorer.score()`
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use super::term_weight::TermWeight;
|
||||
use crate::query::bm25::BM25Weight;
|
||||
use crate::query::Query;
|
||||
use crate::query::Weight;
|
||||
use crate::query::{Explanation, Query};
|
||||
use crate::schema::IndexRecordOption;
|
||||
use crate::Searcher;
|
||||
use crate::Term;
|
||||
@@ -100,13 +100,7 @@ impl TermQuery {
|
||||
field_entry.name()
|
||||
)));
|
||||
}
|
||||
let bm25_weight;
|
||||
if scoring_enabled {
|
||||
bm25_weight = BM25Weight::for_terms(searcher, &[term])?;
|
||||
} else {
|
||||
bm25_weight =
|
||||
BM25Weight::new(Explanation::new("<no score>".to_string(), 1.0f32), 1.0f32);
|
||||
}
|
||||
let bm25_weight = BM25Weight::for_terms(searcher, &[term])?;
|
||||
let index_record_option = if scoring_enabled {
|
||||
self.index_record_option
|
||||
} else {
|
||||
|
||||
@@ -45,7 +45,7 @@ impl Weight for TermWeight {
|
||||
} else {
|
||||
let field = self.term.field();
|
||||
let inv_index = reader.inverted_index(field)?;
|
||||
let term_info = inv_index.get_term_info(&self.term)?;
|
||||
let term_info = inv_index.get_term_info(&self.term);
|
||||
Ok(term_info.map(|term_info| term_info.doc_freq).unwrap_or(0))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -233,7 +233,6 @@ mod tests {
|
||||
assert_eq!(Facet::root(), Facet::from("/"));
|
||||
assert_eq!(format!("{}", Facet::root()), "/");
|
||||
assert!(Facet::root().is_root());
|
||||
assert_eq!(Facet::root().encoded_str(), "");
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -19,7 +19,7 @@ impl<'a> Iterator for LayerCursor<'a> {
|
||||
return None;
|
||||
}
|
||||
let (block_mut, remaining_mut) = (&mut self.block, &mut self.remaining);
|
||||
if block_mut.deserialize(remaining_mut).is_err() {
|
||||
if let Err(_) = block_mut.deserialize(remaining_mut) {
|
||||
return None;
|
||||
}
|
||||
self.cursor = 0;
|
||||
@@ -50,7 +50,8 @@ impl Layer {
|
||||
|
||||
fn seek_start_at_offset(&self, target: DocId, offset: u64) -> Option<Checkpoint> {
|
||||
self.cursor_at_offset(offset)
|
||||
.find(|checkpoint| checkpoint.end_doc > target)
|
||||
.filter(|checkpoint| checkpoint.end_doc > target)
|
||||
.next()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,27 +0,0 @@
|
||||
/*!
|
||||
The term dictionary main role is to associate the sorted [`Term`s](../struct.Term.html) to
|
||||
a [`TermInfo`](../postings/struct.TermInfo.html) struct that contains some meta-information
|
||||
about the term.
|
||||
|
||||
Internally, the term dictionary relies on the `fst` crate to store
|
||||
a sorted mapping that associate each term to its rank in the lexicographical order.
|
||||
For instance, in a dictionary containing the sorted terms "abba", "bjork", "blur" and "donovan",
|
||||
the `TermOrdinal` are respectively `0`, `1`, `2`, and `3`.
|
||||
|
||||
For `u64`-terms, tantivy explicitely uses a `BigEndian` representation to ensure that the
|
||||
lexicographical order matches the natural order of integers.
|
||||
|
||||
`i64`-terms are transformed to `u64` using a continuous mapping `val ⟶ val - i64::min_value()`
|
||||
and then treated as a `u64`.
|
||||
|
||||
`f64`-terms are transformed to `u64` using a mapping that preserve order, and are then treated
|
||||
as `u64`.
|
||||
|
||||
A second datastructure makes it possible to access a [`TermInfo`](../postings/struct.TermInfo.html).
|
||||
*/
|
||||
mod streamer;
|
||||
mod term_info_store;
|
||||
mod termdict;
|
||||
|
||||
pub use self::streamer::{TermStreamer, TermStreamerBuilder};
|
||||
pub use self::termdict::{TermDictionary, TermDictionaryBuilder};
|
||||
@@ -20,39 +20,438 @@ as `u64`.
|
||||
A second datastructure makes it possible to access a [`TermInfo`](../postings/struct.TermInfo.html).
|
||||
*/
|
||||
|
||||
use tantivy_fst::automaton::AlwaysMatch;
|
||||
|
||||
// mod fst_termdict;
|
||||
// use fst_termdict as termdict;
|
||||
mod sstable_termdict;
|
||||
use sstable_termdict as termdict;
|
||||
|
||||
mod merger;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
/// Position of the term in the sorted list of terms.
|
||||
pub type TermOrdinal = u64;
|
||||
|
||||
/// The term dictionary contains all of the terms in
|
||||
/// `tantivy index` in a sorted manner.
|
||||
pub type TermDictionary = self::termdict::TermDictionary;
|
||||
mod merger;
|
||||
mod streamer;
|
||||
mod term_info_store;
|
||||
mod termdict;
|
||||
|
||||
/// Builder for the new term dictionary.
|
||||
///
|
||||
/// Inserting must be done in the order of the `keys`.
|
||||
pub type TermDictionaryBuilder<W> = self::termdict::TermDictionaryBuilder<W>;
|
||||
pub use self::merger::TermMerger;
|
||||
pub use self::streamer::{TermStreamer, TermStreamerBuilder};
|
||||
pub use self::termdict::{TermDictionary, TermDictionaryBuilder};
|
||||
|
||||
/// Given a list of sorted term streams,
|
||||
/// returns an iterator over sorted unique terms.
|
||||
///
|
||||
/// The item yield is actually a pair with
|
||||
/// - the term
|
||||
/// - a slice with the ordinal of the segments containing
|
||||
/// the terms.
|
||||
pub type TermMerger<'a> = self::merger::TermMerger<'a>;
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{TermDictionary, TermDictionaryBuilder, TermStreamer};
|
||||
use crate::core::Index;
|
||||
use crate::directory::{Directory, FileSlice, RAMDirectory};
|
||||
use crate::postings::TermInfo;
|
||||
use crate::schema::{Schema, TEXT};
|
||||
use std::path::PathBuf;
|
||||
use std::str;
|
||||
|
||||
/// `TermStreamer` acts as a cursor over a range of terms of a segment.
|
||||
/// Terms are guaranteed to be sorted.
|
||||
pub type TermStreamer<'a, A = AlwaysMatch> = self::termdict::TermStreamer<'a, A>;
|
||||
const BLOCK_SIZE: usize = 1_500;
|
||||
|
||||
fn make_term_info(term_ord: u64) -> TermInfo {
|
||||
let offset = |term_ord: u64| term_ord * 100 + term_ord * term_ord;
|
||||
TermInfo {
|
||||
doc_freq: term_ord as u32,
|
||||
postings_start_offset: offset(term_ord),
|
||||
postings_stop_offset: offset(term_ord + 1),
|
||||
positions_idx: offset(term_ord) * 2u64,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_empty_term_dictionary() {
|
||||
let empty = TermDictionary::empty();
|
||||
assert!(empty.stream().next().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_term_ordinals() -> crate::Result<()> {
|
||||
const COUNTRIES: [&'static str; 7] = [
|
||||
"San Marino",
|
||||
"Serbia",
|
||||
"Slovakia",
|
||||
"Slovenia",
|
||||
"Spain",
|
||||
"Sweden",
|
||||
"Switzerland",
|
||||
];
|
||||
let directory = RAMDirectory::create();
|
||||
let path = PathBuf::from("TermDictionary");
|
||||
{
|
||||
let write = directory.open_write(&path)?;
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(write)?;
|
||||
for term in COUNTRIES.iter() {
|
||||
term_dictionary_builder.insert(term.as_bytes(), &make_term_info(0u64))?;
|
||||
}
|
||||
term_dictionary_builder.finish()?;
|
||||
}
|
||||
let term_file = directory.open_read(&path)?;
|
||||
let term_dict: TermDictionary = TermDictionary::open(term_file)?;
|
||||
for (term_ord, term) in COUNTRIES.iter().enumerate() {
|
||||
assert_eq!(term_dict.term_ord(term).unwrap(), term_ord as u64);
|
||||
let mut bytes = vec![];
|
||||
assert!(term_dict.ord_to_term(term_ord as u64, &mut bytes));
|
||||
assert_eq!(bytes, term.as_bytes());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_term_dictionary_simple() -> crate::Result<()> {
|
||||
let directory = RAMDirectory::create();
|
||||
let path = PathBuf::from("TermDictionary");
|
||||
{
|
||||
let write = directory.open_write(&path)?;
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(write)?;
|
||||
term_dictionary_builder.insert("abc".as_bytes(), &make_term_info(34u64))?;
|
||||
term_dictionary_builder.insert("abcd".as_bytes(), &make_term_info(346u64))?;
|
||||
term_dictionary_builder.finish()?;
|
||||
}
|
||||
let file = directory.open_read(&path)?;
|
||||
let term_dict: TermDictionary = TermDictionary::open(file)?;
|
||||
assert_eq!(term_dict.get("abc").unwrap().doc_freq, 34u32);
|
||||
assert_eq!(term_dict.get("abcd").unwrap().doc_freq, 346u32);
|
||||
let mut stream = term_dict.stream();
|
||||
{
|
||||
{
|
||||
let (k, v) = stream.next().unwrap();
|
||||
assert_eq!(k.as_ref(), "abc".as_bytes());
|
||||
assert_eq!(v.doc_freq, 34u32);
|
||||
}
|
||||
assert_eq!(stream.key(), "abc".as_bytes());
|
||||
assert_eq!(stream.value().doc_freq, 34u32);
|
||||
}
|
||||
{
|
||||
{
|
||||
let (k, v) = stream.next().unwrap();
|
||||
assert_eq!(k, "abcd".as_bytes());
|
||||
assert_eq!(v.doc_freq, 346u32);
|
||||
}
|
||||
assert_eq!(stream.key(), "abcd".as_bytes());
|
||||
assert_eq!(stream.value().doc_freq, 346u32);
|
||||
}
|
||||
assert!(!stream.advance());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_term_iterator() -> crate::Result<()> {
|
||||
let mut schema_builder = Schema::builder();
|
||||
let text_field = schema_builder.add_text_field("text", TEXT);
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
{
|
||||
let mut index_writer = index.writer_for_tests()?;
|
||||
index_writer.add_document(doc!(text_field=>"a b d f"));
|
||||
index_writer.commit()?;
|
||||
index_writer.add_document(doc!(text_field=>"a b c d f"));
|
||||
index_writer.commit()?;
|
||||
index_writer.add_document(doc!(text_field => "e f"));
|
||||
index_writer.commit()?;
|
||||
}
|
||||
let searcher = index.reader()?.searcher();
|
||||
|
||||
let field_searcher = searcher.field(text_field)?;
|
||||
let mut term_it = field_searcher.terms();
|
||||
let mut term_string = String::new();
|
||||
while term_it.advance() {
|
||||
//let term = Term::from_bytes(term_it.key());
|
||||
term_string.push_str(str::from_utf8(term_it.key()).expect("test"));
|
||||
}
|
||||
assert_eq!(&*term_string, "abcdef");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_term_dictionary_stream() -> crate::Result<()> {
|
||||
let ids: Vec<_> = (0u32..10_000u32)
|
||||
.map(|i| (format!("doc{:0>6}", i), i))
|
||||
.collect();
|
||||
let buffer: Vec<u8> = {
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
|
||||
for &(ref id, ref i) in &ids {
|
||||
term_dictionary_builder
|
||||
.insert(id.as_bytes(), &make_term_info(*i as u64))
|
||||
.unwrap();
|
||||
}
|
||||
term_dictionary_builder.finish().unwrap()
|
||||
};
|
||||
let term_file = FileSlice::from(buffer);
|
||||
let term_dictionary: TermDictionary = TermDictionary::open(term_file)?;
|
||||
{
|
||||
let mut streamer = term_dictionary.stream();
|
||||
let mut i = 0;
|
||||
while let Some((streamer_k, streamer_v)) = streamer.next() {
|
||||
let &(ref key, ref v) = &ids[i];
|
||||
assert_eq!(streamer_k.as_ref(), key.as_bytes());
|
||||
assert_eq!(streamer_v, &make_term_info(*v as u64));
|
||||
i += 1;
|
||||
}
|
||||
}
|
||||
|
||||
let &(ref key, ref val) = &ids[2047];
|
||||
assert_eq!(
|
||||
term_dictionary.get(key.as_bytes()),
|
||||
Some(make_term_info(*val as u64))
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_stream_high_range_prefix_suffix() -> crate::Result<()> {
|
||||
let buffer: Vec<u8> = {
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
|
||||
// term requires more than 16bits
|
||||
term_dictionary_builder.insert("abcdefghijklmnopqrstuvwxy", &make_term_info(1))?;
|
||||
term_dictionary_builder.insert("abcdefghijklmnopqrstuvwxyz", &make_term_info(2))?;
|
||||
term_dictionary_builder.insert("abr", &make_term_info(3))?;
|
||||
term_dictionary_builder.finish()?
|
||||
};
|
||||
let term_dict_file = FileSlice::from(buffer);
|
||||
let term_dictionary: TermDictionary = TermDictionary::open(term_dict_file)?;
|
||||
let mut kv_stream = term_dictionary.stream();
|
||||
assert!(kv_stream.advance());
|
||||
assert_eq!(kv_stream.key(), "abcdefghijklmnopqrstuvwxy".as_bytes());
|
||||
assert_eq!(kv_stream.value(), &make_term_info(1));
|
||||
assert!(kv_stream.advance());
|
||||
assert_eq!(kv_stream.key(), "abcdefghijklmnopqrstuvwxyz".as_bytes());
|
||||
assert_eq!(kv_stream.value(), &make_term_info(2));
|
||||
assert!(kv_stream.advance());
|
||||
assert_eq!(kv_stream.key(), "abr".as_bytes());
|
||||
assert_eq!(kv_stream.value(), &make_term_info(3));
|
||||
assert!(!kv_stream.advance());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_stream_range() -> crate::Result<()> {
|
||||
let ids: Vec<_> = (0u32..10_000u32)
|
||||
.map(|i| (format!("doc{:0>6}", i), i))
|
||||
.collect();
|
||||
let buffer: Vec<u8> = {
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
|
||||
for &(ref id, ref i) in &ids {
|
||||
term_dictionary_builder
|
||||
.insert(id.as_bytes(), &make_term_info(*i as u64))
|
||||
.unwrap();
|
||||
}
|
||||
term_dictionary_builder.finish().unwrap()
|
||||
};
|
||||
|
||||
let file = FileSlice::from(buffer);
|
||||
|
||||
let term_dictionary: TermDictionary = TermDictionary::open(file)?;
|
||||
{
|
||||
for i in (0..20).chain(6000..8_000) {
|
||||
let &(ref target_key, _) = &ids[i];
|
||||
let mut streamer = term_dictionary
|
||||
.range()
|
||||
.ge(target_key.as_bytes())
|
||||
.into_stream();
|
||||
for j in 0..3 {
|
||||
let (streamer_k, streamer_v) = streamer.next().unwrap();
|
||||
let &(ref key, ref v) = &ids[i + j];
|
||||
assert_eq!(str::from_utf8(streamer_k.as_ref()).unwrap(), key);
|
||||
assert_eq!(streamer_v.doc_freq, *v);
|
||||
assert_eq!(streamer_v, &make_term_info(*v as u64));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
for i in (0..20).chain(BLOCK_SIZE - 10..BLOCK_SIZE + 10) {
|
||||
let &(ref target_key, _) = &ids[i];
|
||||
let mut streamer = term_dictionary
|
||||
.range()
|
||||
.gt(target_key.as_bytes())
|
||||
.into_stream();
|
||||
for j in 0..3 {
|
||||
let (streamer_k, streamer_v) = streamer.next().unwrap();
|
||||
let &(ref key, ref v) = &ids[i + j + 1];
|
||||
assert_eq!(streamer_k.as_ref(), key.as_bytes());
|
||||
assert_eq!(streamer_v.doc_freq, *v);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
for i in (0..20).chain(BLOCK_SIZE - 10..BLOCK_SIZE + 10) {
|
||||
for j in 0..3 {
|
||||
let &(ref fst_key, _) = &ids[i];
|
||||
let &(ref last_key, _) = &ids[i + j];
|
||||
let mut streamer = term_dictionary
|
||||
.range()
|
||||
.ge(fst_key.as_bytes())
|
||||
.lt(last_key.as_bytes())
|
||||
.into_stream();
|
||||
for _ in 0..j {
|
||||
assert!(streamer.next().is_some());
|
||||
}
|
||||
assert!(streamer.next().is_none());
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_empty_string() -> crate::Result<()> {
|
||||
let buffer: Vec<u8> = {
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
|
||||
term_dictionary_builder
|
||||
.insert(&[], &make_term_info(1 as u64))
|
||||
.unwrap();
|
||||
term_dictionary_builder
|
||||
.insert(&[1u8], &make_term_info(2 as u64))
|
||||
.unwrap();
|
||||
term_dictionary_builder.finish().unwrap()
|
||||
};
|
||||
let file = FileSlice::from(buffer);
|
||||
let term_dictionary: TermDictionary = TermDictionary::open(file)?;
|
||||
let mut stream = term_dictionary.stream();
|
||||
assert!(stream.advance());
|
||||
assert!(stream.key().is_empty());
|
||||
assert!(stream.advance());
|
||||
assert_eq!(stream.key(), &[1u8]);
|
||||
assert!(!stream.advance());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_stream_range_boundaries() -> crate::Result<()> {
|
||||
let buffer: Vec<u8> = {
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(Vec::new())?;
|
||||
for i in 0u8..10u8 {
|
||||
let number_arr = [i; 1];
|
||||
term_dictionary_builder.insert(&number_arr, &make_term_info(i as u64))?;
|
||||
}
|
||||
term_dictionary_builder.finish()?
|
||||
};
|
||||
let file = FileSlice::from(buffer);
|
||||
let term_dictionary: TermDictionary = TermDictionary::open(file)?;
|
||||
|
||||
let value_list = |mut streamer: TermStreamer<'_>, backwards: bool| {
|
||||
let mut res: Vec<u32> = vec![];
|
||||
while let Some((_, ref v)) = streamer.next() {
|
||||
res.push(v.doc_freq);
|
||||
}
|
||||
if backwards {
|
||||
res.reverse();
|
||||
}
|
||||
res
|
||||
};
|
||||
{
|
||||
let range = term_dictionary.range().backward().into_stream();
|
||||
assert_eq!(
|
||||
value_list(range, true),
|
||||
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().ge([2u8]).into_stream();
|
||||
assert_eq!(
|
||||
value_list(range, false),
|
||||
vec![2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().ge([2u8]).backward().into_stream();
|
||||
assert_eq!(
|
||||
value_list(range, true),
|
||||
vec![2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().gt([2u8]).into_stream();
|
||||
assert_eq!(
|
||||
value_list(range, false),
|
||||
vec![3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().gt([2u8]).backward().into_stream();
|
||||
assert_eq!(
|
||||
value_list(range, true),
|
||||
vec![3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().lt([6u8]).into_stream();
|
||||
assert_eq!(
|
||||
value_list(range, false),
|
||||
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().lt([6u8]).backward().into_stream();
|
||||
assert_eq!(
|
||||
value_list(range, true),
|
||||
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().le([6u8]).into_stream();
|
||||
assert_eq!(
|
||||
value_list(range, false),
|
||||
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().le([6u8]).backward().into_stream();
|
||||
assert_eq!(
|
||||
value_list(range, true),
|
||||
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().ge([0u8]).lt([5u8]).into_stream();
|
||||
assert_eq!(value_list(range, false), vec![0u32, 1u32, 2u32, 3u32, 4u32]);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary
|
||||
.range()
|
||||
.ge([0u8])
|
||||
.lt([5u8])
|
||||
.backward()
|
||||
.into_stream();
|
||||
assert_eq!(value_list(range, true), vec![0u32, 1u32, 2u32, 3u32, 4u32]);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_automaton_search() -> crate::Result<()> {
|
||||
use crate::query::DFAWrapper;
|
||||
use levenshtein_automata::LevenshteinAutomatonBuilder;
|
||||
|
||||
const COUNTRIES: [&'static str; 7] = [
|
||||
"San Marino",
|
||||
"Serbia",
|
||||
"Slovakia",
|
||||
"Slovenia",
|
||||
"Spain",
|
||||
"Sweden",
|
||||
"Switzerland",
|
||||
];
|
||||
|
||||
let directory = RAMDirectory::create();
|
||||
let path = PathBuf::from("TermDictionary");
|
||||
{
|
||||
let write = directory.open_write(&path)?;
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(write)?;
|
||||
for term in COUNTRIES.iter() {
|
||||
term_dictionary_builder.insert(term.as_bytes(), &make_term_info(0u64))?;
|
||||
}
|
||||
term_dictionary_builder.finish()?;
|
||||
}
|
||||
let file = directory.open_read(&path)?;
|
||||
let term_dict: TermDictionary = TermDictionary::open(file)?;
|
||||
|
||||
// We can now build an entire dfa.
|
||||
let lev_automaton_builder = LevenshteinAutomatonBuilder::new(2, true);
|
||||
let automaton = DFAWrapper(lev_automaton_builder.build_dfa("Spaen"));
|
||||
|
||||
let mut range = term_dict.search(automaton).into_stream();
|
||||
|
||||
// get the first finding
|
||||
assert!(range.advance());
|
||||
assert_eq!("Spain".as_bytes(), range.key());
|
||||
assert!(!range.advance());
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,148 +0,0 @@
|
||||
use std::io;
|
||||
|
||||
mod sstable;
|
||||
mod streamer;
|
||||
mod termdict;
|
||||
|
||||
use self::sstable::value::{ValueReader, ValueWriter};
|
||||
use self::sstable::{BlockReader, SSTable};
|
||||
|
||||
use crate::common::VInt;
|
||||
use crate::postings::TermInfo;
|
||||
|
||||
pub use self::streamer::{TermStreamer, TermStreamerBuilder};
|
||||
pub use self::termdict::{TermDictionary, TermDictionaryBuilder};
|
||||
|
||||
pub struct TermSSTable;
|
||||
|
||||
impl SSTable for TermSSTable {
|
||||
type Value = TermInfo;
|
||||
type Reader = TermInfoReader;
|
||||
type Writer = TermInfoWriter;
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct TermInfoReader {
|
||||
term_infos: Vec<TermInfo>,
|
||||
}
|
||||
|
||||
impl ValueReader for TermInfoReader {
|
||||
type Value = TermInfo;
|
||||
|
||||
fn value(&self, idx: usize) -> &TermInfo {
|
||||
&self.term_infos[idx]
|
||||
}
|
||||
|
||||
fn read(&mut self, reader: &mut BlockReader) -> io::Result<()> {
|
||||
self.term_infos.clear();
|
||||
let num_els = VInt::deserialize_u64(reader)?;
|
||||
let mut start_offset = VInt::deserialize_u64(reader)?;
|
||||
let mut positions_idx = 0;
|
||||
for _ in 0..num_els {
|
||||
let doc_freq = VInt::deserialize_u64(reader)? as u32;
|
||||
let posting_num_bytes = VInt::deserialize_u64(reader)?;
|
||||
let stop_offset = start_offset + posting_num_bytes;
|
||||
let delta_positions_idx = VInt::deserialize_u64(reader)?;
|
||||
positions_idx += delta_positions_idx;
|
||||
let term_info = TermInfo {
|
||||
doc_freq,
|
||||
postings_start_offset: start_offset,
|
||||
postings_stop_offset: stop_offset,
|
||||
positions_idx,
|
||||
};
|
||||
self.term_infos.push(term_info);
|
||||
start_offset = stop_offset;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct TermInfoWriter {
|
||||
term_infos: Vec<TermInfo>,
|
||||
}
|
||||
|
||||
impl ValueWriter for TermInfoWriter {
|
||||
type Value = TermInfo;
|
||||
|
||||
fn write(&mut self, term_info: &TermInfo) {
|
||||
self.term_infos.push(term_info.clone());
|
||||
}
|
||||
|
||||
fn write_block(&mut self, buffer: &mut Vec<u8>) {
|
||||
VInt(self.term_infos.len() as u64).serialize_into_vec(buffer);
|
||||
if self.term_infos.is_empty() {
|
||||
return;
|
||||
}
|
||||
let mut prev_position_idx = 0u64;
|
||||
VInt(self.term_infos[0].postings_start_offset).serialize_into_vec(buffer);
|
||||
for term_info in &self.term_infos {
|
||||
VInt(term_info.doc_freq as u64).serialize_into_vec(buffer);
|
||||
VInt(term_info.postings_stop_offset - term_info.postings_start_offset)
|
||||
.serialize_into_vec(buffer);
|
||||
VInt(term_info.positions_idx - prev_position_idx).serialize_into_vec(buffer);
|
||||
prev_position_idx = term_info.positions_idx;
|
||||
}
|
||||
self.term_infos.clear();
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::io;
|
||||
|
||||
use super::BlockReader;
|
||||
|
||||
use crate::directory::OwnedBytes;
|
||||
use crate::postings::TermInfo;
|
||||
use crate::termdict::sstable_termdict::sstable::value::{ValueReader, ValueWriter};
|
||||
use crate::termdict::sstable_termdict::TermInfoReader;
|
||||
|
||||
#[test]
|
||||
fn test_block_terminfos() -> io::Result<()> {
|
||||
let mut term_info_writer = super::TermInfoWriter::default();
|
||||
term_info_writer.write(&TermInfo {
|
||||
doc_freq: 120u32,
|
||||
postings_start_offset: 17u64,
|
||||
postings_stop_offset: 45u64,
|
||||
positions_idx: 10u64,
|
||||
});
|
||||
term_info_writer.write(&TermInfo {
|
||||
doc_freq: 10u32,
|
||||
postings_start_offset: 45u64,
|
||||
postings_stop_offset: 450u64,
|
||||
positions_idx: 104u64,
|
||||
});
|
||||
term_info_writer.write(&TermInfo {
|
||||
doc_freq: 17u32,
|
||||
postings_start_offset: 450u64,
|
||||
postings_stop_offset: 462u64,
|
||||
positions_idx: 210u64,
|
||||
});
|
||||
let mut buffer = Vec::new();
|
||||
term_info_writer.write_block(&mut buffer);
|
||||
let mut block_reader = make_block_reader(&buffer[..]);
|
||||
let mut term_info_reader = TermInfoReader::default();
|
||||
term_info_reader.read(&mut block_reader)?;
|
||||
assert_eq!(
|
||||
term_info_reader.value(0),
|
||||
&TermInfo {
|
||||
doc_freq: 120u32,
|
||||
postings_start_offset: 17u64,
|
||||
postings_stop_offset: 45u64,
|
||||
positions_idx: 10u64
|
||||
}
|
||||
);
|
||||
assert!(block_reader.buffer().is_empty());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn make_block_reader(data: &[u8]) -> BlockReader {
|
||||
let mut buffer = (data.len() as u32).to_le_bytes().to_vec();
|
||||
buffer.extend_from_slice(data);
|
||||
let owned_bytes = OwnedBytes::new(buffer);
|
||||
let mut block_reader = BlockReader::new(Box::new(owned_bytes));
|
||||
block_reader.read_block().unwrap();
|
||||
block_reader
|
||||
}
|
||||
}
|
||||
@@ -1,84 +0,0 @@
|
||||
use byteorder::{LittleEndian, ReadBytesExt};
|
||||
use std::io::{self, Read};
|
||||
|
||||
pub struct BlockReader<'a> {
|
||||
buffer: Vec<u8>,
|
||||
reader: Box<dyn io::Read + 'a>,
|
||||
offset: usize,
|
||||
}
|
||||
|
||||
impl<'a> BlockReader<'a> {
|
||||
pub fn new(reader: Box<dyn io::Read + 'a>) -> BlockReader<'a> {
|
||||
BlockReader {
|
||||
buffer: Vec::new(),
|
||||
reader,
|
||||
offset: 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn deserialize_u64(&mut self) -> u64 {
|
||||
let (num_bytes, val) = super::vint::deserialize_read(self.buffer());
|
||||
self.advance(num_bytes);
|
||||
val
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn buffer_from_to(&self, start: usize, end: usize) -> &[u8] {
|
||||
&self.buffer[start..end]
|
||||
}
|
||||
|
||||
pub fn buffer_from(&self, start: usize) -> &[u8] {
|
||||
&self.buffer[start..]
|
||||
}
|
||||
|
||||
pub fn read_block(&mut self) -> io::Result<bool> {
|
||||
self.offset = 0;
|
||||
let block_len_res = self.reader.read_u32::<LittleEndian>();
|
||||
if let Err(err) = &block_len_res {
|
||||
if err.kind() == io::ErrorKind::UnexpectedEof {
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
let block_len = block_len_res?;
|
||||
if block_len == 0u32 {
|
||||
self.buffer.clear();
|
||||
return Ok(false);
|
||||
}
|
||||
self.buffer.resize(block_len as usize, 0u8);
|
||||
self.reader.read_exact(&mut self.buffer[..])?;
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
pub fn offset(&self) -> usize {
|
||||
self.offset
|
||||
}
|
||||
|
||||
pub fn advance(&mut self, num_bytes: usize) {
|
||||
self.offset += num_bytes;
|
||||
}
|
||||
|
||||
pub fn buffer(&self) -> &[u8] {
|
||||
&self.buffer[self.offset..]
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> io::Read for BlockReader<'a> {
|
||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||
let len = self.buffer().read(buf)?;
|
||||
self.advance(len);
|
||||
Ok(len)
|
||||
}
|
||||
|
||||
fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
|
||||
let len = self.buffer.len();
|
||||
buf.extend_from_slice(self.buffer());
|
||||
self.advance(len);
|
||||
Ok(len)
|
||||
}
|
||||
|
||||
fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
|
||||
self.buffer().read_exact(buf)?;
|
||||
self.advance(buf.len());
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -1,203 +0,0 @@
|
||||
use std::io::{self, BufWriter, Write};
|
||||
|
||||
use crate::common::CountingWriter;
|
||||
|
||||
use super::value::ValueWriter;
|
||||
use super::{value, vint, BlockReader};
|
||||
|
||||
const FOUR_BIT_LIMITS: usize = 1 << 4;
|
||||
const VINT_MODE: u8 = 1u8;
|
||||
const BLOCK_LEN: usize = 256_000;
|
||||
|
||||
pub struct DeltaWriter<W, TValueWriter>
|
||||
where
|
||||
W: io::Write,
|
||||
{
|
||||
block: Vec<u8>,
|
||||
write: CountingWriter<BufWriter<W>>,
|
||||
value_writer: TValueWriter,
|
||||
}
|
||||
|
||||
impl<W, TValueWriter> DeltaWriter<W, TValueWriter>
|
||||
where
|
||||
W: io::Write,
|
||||
TValueWriter: ValueWriter,
|
||||
{
|
||||
pub fn new(wrt: W) -> Self {
|
||||
DeltaWriter {
|
||||
block: Vec::with_capacity(BLOCK_LEN * 2),
|
||||
write: CountingWriter::wrap(BufWriter::new(wrt)),
|
||||
value_writer: TValueWriter::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<W, TValueWriter> DeltaWriter<W, TValueWriter>
|
||||
where
|
||||
W: io::Write,
|
||||
TValueWriter: value::ValueWriter,
|
||||
{
|
||||
pub fn flush_block(&mut self) -> io::Result<Option<(u64, u64)>> {
|
||||
if self.block.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
let start_offset = self.write.written_bytes();
|
||||
// TODO avoid buffer allocation
|
||||
let mut buffer = Vec::new();
|
||||
self.value_writer.write_block(&mut buffer);
|
||||
let block_len = buffer.len() + self.block.len();
|
||||
self.write.write_all(&(block_len as u32).to_le_bytes())?;
|
||||
self.write.write_all(&buffer[..])?;
|
||||
self.write.write_all(&mut self.block[..])?;
|
||||
let end_offset = self.write.written_bytes();
|
||||
self.block.clear();
|
||||
Ok(Some((start_offset, end_offset)))
|
||||
}
|
||||
|
||||
fn encode_keep_add(&mut self, keep_len: usize, add_len: usize) {
|
||||
if keep_len < FOUR_BIT_LIMITS && add_len < FOUR_BIT_LIMITS {
|
||||
let b = (keep_len | add_len << 4) as u8;
|
||||
self.block.extend_from_slice(&[b])
|
||||
} else {
|
||||
let mut buf = [VINT_MODE; 20];
|
||||
let mut len = 1 + vint::serialize(keep_len as u64, &mut buf[1..]);
|
||||
len += vint::serialize(add_len as u64, &mut buf[len..]);
|
||||
self.block.extend_from_slice(&mut buf[..len])
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn write_suffix(&mut self, common_prefix_len: usize, suffix: &[u8]) {
|
||||
let keep_len = common_prefix_len;
|
||||
let add_len = suffix.len();
|
||||
self.encode_keep_add(keep_len, add_len);
|
||||
self.block.extend_from_slice(suffix);
|
||||
}
|
||||
|
||||
pub(crate) fn write_value(&mut self, value: &TValueWriter::Value) {
|
||||
self.value_writer.write(value);
|
||||
}
|
||||
|
||||
pub fn write_delta(
|
||||
&mut self,
|
||||
common_prefix_len: usize,
|
||||
suffix: &[u8],
|
||||
value: &TValueWriter::Value,
|
||||
) {
|
||||
self.write_suffix(common_prefix_len, suffix);
|
||||
self.write_value(value);
|
||||
}
|
||||
|
||||
pub fn flush_block_if_required(&mut self) -> io::Result<Option<(u64, u64)>> {
|
||||
if self.block.len() > BLOCK_LEN {
|
||||
return self.flush_block();
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
pub fn finalize(mut self) -> CountingWriter<BufWriter<W>> {
|
||||
self.write
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DeltaReader<'a, TValueReader> {
|
||||
common_prefix_len: usize,
|
||||
suffix_start: usize,
|
||||
suffix_end: usize,
|
||||
value_reader: TValueReader,
|
||||
block_reader: BlockReader<'a>,
|
||||
idx: usize,
|
||||
}
|
||||
|
||||
impl<'a, TValueReader> DeltaReader<'a, TValueReader>
|
||||
where
|
||||
TValueReader: value::ValueReader,
|
||||
{
|
||||
pub fn new<R: io::Read + 'a>(reader: R) -> Self {
|
||||
DeltaReader {
|
||||
idx: 0,
|
||||
common_prefix_len: 0,
|
||||
suffix_start: 0,
|
||||
suffix_end: 0,
|
||||
value_reader: TValueReader::default(),
|
||||
block_reader: BlockReader::new(Box::new(reader)),
|
||||
}
|
||||
}
|
||||
|
||||
fn deserialize_vint(&mut self) -> u64 {
|
||||
self.block_reader.deserialize_u64()
|
||||
}
|
||||
|
||||
fn read_keep_add(&mut self) -> Option<(usize, usize)> {
|
||||
let b = {
|
||||
let buf = &self.block_reader.buffer();
|
||||
if buf.is_empty() {
|
||||
return None;
|
||||
}
|
||||
buf[0]
|
||||
};
|
||||
self.block_reader.advance(1);
|
||||
match b {
|
||||
VINT_MODE => {
|
||||
let keep = self.deserialize_vint() as usize;
|
||||
let add = self.deserialize_vint() as usize;
|
||||
Some((keep, add))
|
||||
}
|
||||
b => {
|
||||
let keep = (b & 0b1111) as usize;
|
||||
let add = (b >> 4) as usize;
|
||||
Some((keep, add))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn read_delta_key(&mut self) -> bool {
|
||||
if let Some((keep, add)) = self.read_keep_add() {
|
||||
self.common_prefix_len = keep;
|
||||
self.suffix_start = self.block_reader.offset();
|
||||
self.suffix_end = self.suffix_start + add;
|
||||
self.block_reader.advance(add);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
pub fn advance(&mut self) -> io::Result<bool> {
|
||||
if self.block_reader.buffer().is_empty() {
|
||||
if !self.block_reader.read_block()? {
|
||||
return Ok(false);
|
||||
}
|
||||
self.value_reader.read(&mut self.block_reader)?;
|
||||
self.idx = 0;
|
||||
} else {
|
||||
self.idx += 1;
|
||||
}
|
||||
if !self.read_delta_key() {
|
||||
return Ok(false);
|
||||
}
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
pub fn common_prefix_len(&self) -> usize {
|
||||
self.common_prefix_len
|
||||
}
|
||||
|
||||
pub fn suffix(&self) -> &[u8] {
|
||||
&self
|
||||
.block_reader
|
||||
.buffer_from_to(self.suffix_start, self.suffix_end)
|
||||
}
|
||||
|
||||
pub fn suffix_from(&self, offset: usize) -> &[u8] {
|
||||
&self.block_reader.buffer_from_to(
|
||||
self.suffix_start
|
||||
.wrapping_add(offset)
|
||||
.wrapping_sub(self.common_prefix_len),
|
||||
self.suffix_end,
|
||||
)
|
||||
}
|
||||
|
||||
pub fn value(&self) -> &TValueReader::Value {
|
||||
self.value_reader.value(self.idx)
|
||||
}
|
||||
}
|
||||
@@ -1,72 +0,0 @@
|
||||
use crate::termdict::sstable_termdict::sstable::{Reader, SSTable, Writer};
|
||||
|
||||
use super::SingleValueMerger;
|
||||
use super::ValueMerger;
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::binary_heap::PeekMut;
|
||||
use std::collections::BinaryHeap;
|
||||
use std::io;
|
||||
|
||||
struct HeapItem<B: AsRef<[u8]>>(B);
|
||||
|
||||
impl<B: AsRef<[u8]>> Ord for HeapItem<B> {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
other.0.as_ref().cmp(self.0.as_ref())
|
||||
}
|
||||
}
|
||||
impl<B: AsRef<[u8]>> PartialOrd for HeapItem<B> {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
Some(other.0.as_ref().cmp(self.0.as_ref()))
|
||||
}
|
||||
}
|
||||
|
||||
impl<B: AsRef<[u8]>> Eq for HeapItem<B> {}
|
||||
impl<B: AsRef<[u8]>> PartialEq for HeapItem<B> {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.0.as_ref() == other.0.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn merge_sstable<SST: SSTable, W: io::Write, M: ValueMerger<SST::Value>>(
|
||||
readers: Vec<Reader<SST::Reader>>,
|
||||
mut writer: Writer<W, SST::Writer>,
|
||||
mut merger: M,
|
||||
) -> io::Result<()> {
|
||||
let mut heap: BinaryHeap<HeapItem<Reader<SST::Reader>>> =
|
||||
BinaryHeap::with_capacity(readers.len());
|
||||
for mut reader in readers {
|
||||
if reader.advance()? {
|
||||
heap.push(HeapItem(reader));
|
||||
}
|
||||
}
|
||||
loop {
|
||||
let len = heap.len();
|
||||
let mut value_merger;
|
||||
if let Some(mut head) = heap.peek_mut() {
|
||||
writer.write_key(head.0.key());
|
||||
value_merger = merger.new_value(head.0.value());
|
||||
if !head.0.advance()? {
|
||||
PeekMut::pop(head);
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
for _ in 0..len - 1 {
|
||||
if let Some(mut head) = heap.peek_mut() {
|
||||
if head.0.key() == writer.current_key() {
|
||||
value_merger.add(head.0.value());
|
||||
if !head.0.advance()? {
|
||||
PeekMut::pop(head);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
let value = value_merger.finish();
|
||||
writer.write_value(&value);
|
||||
writer.flush_block_if_required()?;
|
||||
}
|
||||
writer.finalize()?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -1,184 +0,0 @@
|
||||
mod heap_merge;
|
||||
|
||||
pub use self::heap_merge::merge_sstable;
|
||||
|
||||
pub trait SingleValueMerger<V> {
|
||||
fn add(&mut self, v: &V);
|
||||
fn finish(self) -> V;
|
||||
}
|
||||
|
||||
pub trait ValueMerger<V> {
|
||||
type TSingleValueMerger: SingleValueMerger<V>;
|
||||
fn new_value(&mut self, v: &V) -> Self::TSingleValueMerger;
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct KeepFirst;
|
||||
|
||||
pub struct FirstVal<V>(V);
|
||||
|
||||
impl<V: Clone> ValueMerger<V> for KeepFirst {
|
||||
type TSingleValueMerger = FirstVal<V>;
|
||||
|
||||
fn new_value(&mut self, v: &V) -> FirstVal<V> {
|
||||
FirstVal(v.clone())
|
||||
}
|
||||
}
|
||||
|
||||
impl<V> SingleValueMerger<V> for FirstVal<V> {
|
||||
fn add(&mut self, _: &V) {}
|
||||
|
||||
fn finish(self) -> V {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
pub struct VoidMerge;
|
||||
impl ValueMerger<()> for VoidMerge {
|
||||
type TSingleValueMerger = ();
|
||||
|
||||
fn new_value(&mut self, _: &()) -> () {
|
||||
()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct U64Merge;
|
||||
impl ValueMerger<u64> for U64Merge {
|
||||
type TSingleValueMerger = u64;
|
||||
|
||||
fn new_value(&mut self, val: &u64) -> u64 {
|
||||
*val
|
||||
}
|
||||
}
|
||||
|
||||
impl SingleValueMerger<u64> for u64 {
|
||||
fn add(&mut self, val: &u64) {
|
||||
*self += *val;
|
||||
}
|
||||
|
||||
fn finish(self) -> u64 {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl SingleValueMerger<()> for () {
|
||||
fn add(&mut self, _: &()) {}
|
||||
|
||||
fn finish(self) -> () {
|
||||
()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use super::super::SSTable;
|
||||
use super::super::{SSTableMonotonicU64, VoidSSTable};
|
||||
use super::U64Merge;
|
||||
use super::VoidMerge;
|
||||
use std::collections::{BTreeMap, BTreeSet};
|
||||
use std::str;
|
||||
|
||||
fn write_sstable(keys: &[&'static str]) -> Vec<u8> {
|
||||
let mut buffer: Vec<u8> = vec![];
|
||||
{
|
||||
let mut sstable_writer = VoidSSTable::writer(&mut buffer);
|
||||
for &key in keys {
|
||||
assert!(sstable_writer.write(key.as_bytes(), &()).is_ok());
|
||||
}
|
||||
assert!(sstable_writer.finalize().is_ok());
|
||||
}
|
||||
dbg!(&buffer);
|
||||
buffer
|
||||
}
|
||||
|
||||
fn write_sstable_u64(keys: &[(&'static str, u64)]) -> Vec<u8> {
|
||||
let mut buffer: Vec<u8> = vec![];
|
||||
{
|
||||
let mut sstable_writer = SSTableMonotonicU64::writer(&mut buffer);
|
||||
for (key, val) in keys {
|
||||
assert!(sstable_writer.write(key.as_bytes(), val).is_ok());
|
||||
}
|
||||
assert!(sstable_writer.finalize().is_ok());
|
||||
}
|
||||
buffer
|
||||
}
|
||||
|
||||
fn merge_test_aux(arrs: &[&[&'static str]]) {
|
||||
let sstables = arrs.iter().cloned().map(write_sstable).collect::<Vec<_>>();
|
||||
let sstables_ref: Vec<&[u8]> = sstables.iter().map(|s| s.as_ref()).collect();
|
||||
let mut merged = BTreeSet::new();
|
||||
for &arr in arrs.iter() {
|
||||
for &s in arr {
|
||||
merged.insert(s.to_string());
|
||||
}
|
||||
}
|
||||
let mut w = Vec::new();
|
||||
assert!(VoidSSTable::merge(sstables_ref, &mut w, VoidMerge).is_ok());
|
||||
let mut reader = VoidSSTable::reader(&w[..]);
|
||||
for k in merged {
|
||||
assert!(reader.advance().unwrap());
|
||||
assert_eq!(reader.key(), k.as_bytes());
|
||||
}
|
||||
assert!(!reader.advance().unwrap());
|
||||
}
|
||||
|
||||
fn merge_test_u64_monotonic_aux(arrs: &[&[(&'static str, u64)]]) {
|
||||
let sstables = arrs
|
||||
.iter()
|
||||
.cloned()
|
||||
.map(write_sstable_u64)
|
||||
.collect::<Vec<_>>();
|
||||
let sstables_ref: Vec<&[u8]> = sstables.iter().map(|s| s.as_ref()).collect();
|
||||
let mut merged = BTreeMap::new();
|
||||
for &arr in arrs.iter() {
|
||||
for (key, val) in arr {
|
||||
let entry = merged.entry(key.to_string()).or_insert(0u64);
|
||||
*entry += val;
|
||||
}
|
||||
}
|
||||
let mut w = Vec::new();
|
||||
assert!(SSTableMonotonicU64::merge(sstables_ref, &mut w, U64Merge).is_ok());
|
||||
let mut reader = SSTableMonotonicU64::reader(&w[..]);
|
||||
for (k, v) in merged {
|
||||
assert!(reader.advance().unwrap());
|
||||
assert_eq!(reader.key(), k.as_bytes());
|
||||
assert_eq!(reader.value(), &v);
|
||||
}
|
||||
assert!(!reader.advance().unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_merge_simple_reproduce() {
|
||||
let sstable_data = write_sstable(&["a"]);
|
||||
let mut reader = VoidSSTable::reader(&sstable_data[..]);
|
||||
assert!(reader.advance().unwrap());
|
||||
assert_eq!(reader.key(), b"a");
|
||||
assert!(!reader.advance().unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_merge() {
|
||||
merge_test_aux(&[]);
|
||||
merge_test_aux(&[&["a"]]);
|
||||
merge_test_aux(&[&["a", "b"], &["ab"]]); // a, ab, b
|
||||
merge_test_aux(&[&["a", "b"], &["a", "b"]]);
|
||||
merge_test_aux(&[
|
||||
&["happy", "hello", "payer", "tax"],
|
||||
&["habitat", "hello", "zoo"],
|
||||
&[],
|
||||
&["a"],
|
||||
]);
|
||||
merge_test_aux(&[&["a"]]);
|
||||
merge_test_aux(&[&["a", "b"], &["ab"]]);
|
||||
merge_test_aux(&[&["a", "b"], &["a", "b"]]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_merge_u64() {
|
||||
merge_test_u64_monotonic_aux(&[]);
|
||||
merge_test_u64_monotonic_aux(&[&[("a", 1u64)]]);
|
||||
merge_test_u64_monotonic_aux(&[&[("a", 1u64), ("b", 3u64)], &[("ab", 2u64)]]); // a, ab, b
|
||||
merge_test_u64_monotonic_aux(&[&[("a", 1u64), ("b", 2u64)], &[("a", 16u64), ("b", 23u64)]]);
|
||||
}
|
||||
}
|
||||
@@ -1,365 +0,0 @@
|
||||
use merge::ValueMerger;
|
||||
use std::io::{self, Write};
|
||||
use std::usize;
|
||||
|
||||
mod delta;
|
||||
pub mod merge;
|
||||
pub mod value;
|
||||
|
||||
pub(crate) mod sstable_index;
|
||||
|
||||
pub(crate) use self::sstable_index::{SSTableIndex, SSTableIndexBuilder};
|
||||
pub(crate) mod vint;
|
||||
|
||||
mod block_reader;
|
||||
pub use self::delta::DeltaReader;
|
||||
use self::delta::DeltaWriter;
|
||||
use self::value::{U64MonotonicReader, U64MonotonicWriter, ValueReader, ValueWriter};
|
||||
|
||||
pub use self::block_reader::BlockReader;
|
||||
pub use self::merge::VoidMerge;
|
||||
|
||||
const DEFAULT_KEY_CAPACITY: usize = 50;
|
||||
|
||||
pub(crate) fn common_prefix_len(left: &[u8], right: &[u8]) -> usize {
|
||||
left.iter()
|
||||
.cloned()
|
||||
.zip(right.iter().cloned())
|
||||
.take_while(|(left, right)| left == right)
|
||||
.count()
|
||||
}
|
||||
|
||||
pub trait SSTable: Sized {
|
||||
type Value;
|
||||
type Reader: ValueReader<Value = Self::Value>;
|
||||
type Writer: ValueWriter<Value = Self::Value>;
|
||||
|
||||
fn delta_writer<W: io::Write>(write: W) -> DeltaWriter<W, Self::Writer> {
|
||||
DeltaWriter::new(write)
|
||||
}
|
||||
|
||||
fn writer<W: io::Write>(write: W) -> Writer<W, Self::Writer> {
|
||||
Writer {
|
||||
previous_key: Vec::with_capacity(DEFAULT_KEY_CAPACITY),
|
||||
num_terms: 0u64,
|
||||
index_builder: SSTableIndexBuilder::default(),
|
||||
delta_writer: Self::delta_writer(write),
|
||||
first_ordinal_of_the_block: 0u64,
|
||||
}
|
||||
}
|
||||
|
||||
fn delta_reader<'a, R: io::Read + 'a>(reader: R) -> DeltaReader<'a, Self::Reader> {
|
||||
DeltaReader::new(reader)
|
||||
}
|
||||
|
||||
fn reader<'a, R: io::Read + 'a>(reader: R) -> Reader<'a, Self::Reader> {
|
||||
Reader {
|
||||
key: Vec::with_capacity(DEFAULT_KEY_CAPACITY),
|
||||
delta_reader: Self::delta_reader(reader),
|
||||
}
|
||||
}
|
||||
|
||||
fn merge<R: io::Read, W: io::Write, M: ValueMerger<Self::Value>>(
|
||||
io_readers: Vec<R>,
|
||||
w: W,
|
||||
merger: M,
|
||||
) -> io::Result<()> {
|
||||
let readers: Vec<_> = io_readers.into_iter().map(Self::reader).collect();
|
||||
let writer = Self::writer(w);
|
||||
merge::merge_sstable::<Self, _, _>(readers, writer, merger)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct VoidSSTable;
|
||||
|
||||
impl SSTable for VoidSSTable {
|
||||
type Value = ();
|
||||
type Reader = value::VoidReader;
|
||||
type Writer = value::VoidWriter;
|
||||
}
|
||||
|
||||
pub struct SSTableMonotonicU64;
|
||||
|
||||
impl SSTable for SSTableMonotonicU64 {
|
||||
type Value = u64;
|
||||
|
||||
type Reader = U64MonotonicReader;
|
||||
|
||||
type Writer = U64MonotonicWriter;
|
||||
}
|
||||
|
||||
pub struct Reader<'a, TValueReader> {
|
||||
key: Vec<u8>,
|
||||
delta_reader: DeltaReader<'a, TValueReader>,
|
||||
}
|
||||
|
||||
impl<'a, TValueReader> Reader<'a, TValueReader>
|
||||
where
|
||||
TValueReader: ValueReader,
|
||||
{
|
||||
pub fn advance(&mut self) -> io::Result<bool> {
|
||||
if !self.delta_reader.advance()? {
|
||||
return Ok(false);
|
||||
}
|
||||
let common_prefix_len = self.delta_reader.common_prefix_len();
|
||||
let suffix = self.delta_reader.suffix();
|
||||
let new_len = self.delta_reader.common_prefix_len() + suffix.len();
|
||||
self.key.resize(new_len, 0u8);
|
||||
self.key[common_prefix_len..].copy_from_slice(suffix);
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
pub fn key(&self) -> &[u8] {
|
||||
&self.key
|
||||
}
|
||||
|
||||
pub fn value(&self) -> &TValueReader::Value {
|
||||
self.delta_reader.value()
|
||||
}
|
||||
|
||||
pub(crate) fn into_delta_reader(self) -> DeltaReader<'a, TValueReader> {
|
||||
assert!(self.key.is_empty());
|
||||
self.delta_reader
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, TValueReader> AsRef<[u8]> for Reader<'a, TValueReader> {
|
||||
fn as_ref(&self) -> &[u8] {
|
||||
&self.key
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Writer<W, TValueWriter>
|
||||
where
|
||||
W: io::Write,
|
||||
{
|
||||
previous_key: Vec<u8>,
|
||||
index_builder: SSTableIndexBuilder,
|
||||
delta_writer: DeltaWriter<W, TValueWriter>,
|
||||
num_terms: u64,
|
||||
first_ordinal_of_the_block: u64,
|
||||
}
|
||||
|
||||
impl<W, TValueWriter> Writer<W, TValueWriter>
|
||||
where
|
||||
W: io::Write,
|
||||
TValueWriter: value::ValueWriter,
|
||||
{
|
||||
pub(crate) fn current_key(&self) -> &[u8] {
|
||||
&self.previous_key[..]
|
||||
}
|
||||
|
||||
pub fn write_key(&mut self, key: &[u8]) {
|
||||
let keep_len = common_prefix_len(&self.previous_key, key);
|
||||
let add_len = key.len() - keep_len;
|
||||
let increasing_keys = add_len > 0 && (self.previous_key.len() == keep_len)
|
||||
|| self.previous_key.is_empty()
|
||||
|| self.previous_key[keep_len] < key[keep_len];
|
||||
assert!(
|
||||
increasing_keys,
|
||||
"Keys should be increasing. ({:?} > {:?})",
|
||||
self.previous_key, key
|
||||
);
|
||||
self.previous_key.resize(key.len(), 0u8);
|
||||
self.previous_key[keep_len..].copy_from_slice(&key[keep_len..]);
|
||||
self.delta_writer.write_suffix(keep_len, &key[keep_len..]);
|
||||
}
|
||||
|
||||
pub(crate) fn into_delta_writer(self) -> DeltaWriter<W, TValueWriter> {
|
||||
self.delta_writer
|
||||
}
|
||||
|
||||
pub fn write(&mut self, key: &[u8], value: &TValueWriter::Value) -> io::Result<()> {
|
||||
self.write_key(key);
|
||||
self.write_value(value)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn write_value(&mut self, value: &TValueWriter::Value) -> io::Result<()> {
|
||||
self.delta_writer.write_value(value);
|
||||
self.num_terms += 1u64;
|
||||
self.flush_block_if_required()
|
||||
}
|
||||
|
||||
pub fn flush_block_if_required(&mut self) -> io::Result<()> {
|
||||
if let Some((start_offset, end_offset)) = self.delta_writer.flush_block_if_required()? {
|
||||
self.index_builder.add_block(
|
||||
&self.previous_key[..],
|
||||
start_offset,
|
||||
end_offset,
|
||||
self.first_ordinal_of_the_block,
|
||||
);
|
||||
self.first_ordinal_of_the_block = self.num_terms;
|
||||
self.previous_key.clear();
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn finalize(mut self) -> io::Result<W> {
|
||||
if let Some((start_offset, end_offset)) = self.delta_writer.flush_block()? {
|
||||
self.index_builder.add_block(
|
||||
&self.previous_key[..],
|
||||
start_offset,
|
||||
end_offset,
|
||||
self.first_ordinal_of_the_block,
|
||||
);
|
||||
self.first_ordinal_of_the_block = self.num_terms;
|
||||
}
|
||||
let mut wrt = self.delta_writer.finalize();
|
||||
wrt.write_all(&0u32.to_le_bytes())?;
|
||||
|
||||
let offset = wrt.written_bytes();
|
||||
|
||||
self.index_builder.serialize(&mut wrt)?;
|
||||
wrt.write_all(&offset.to_le_bytes())?;
|
||||
wrt.write_all(&self.num_terms.to_le_bytes())?;
|
||||
let wrt = wrt.finish();
|
||||
Ok(wrt.into_inner()?)
|
||||
}
|
||||
}
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::io;
|
||||
|
||||
use super::SSTable;
|
||||
use super::VoidMerge;
|
||||
use super::VoidSSTable;
|
||||
use super::{common_prefix_len, SSTableMonotonicU64};
|
||||
|
||||
fn aux_test_common_prefix_len(left: &str, right: &str, expect_len: usize) {
|
||||
assert_eq!(
|
||||
common_prefix_len(left.as_bytes(), right.as_bytes()),
|
||||
expect_len
|
||||
);
|
||||
assert_eq!(
|
||||
common_prefix_len(right.as_bytes(), left.as_bytes()),
|
||||
expect_len
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_common_prefix_len() {
|
||||
aux_test_common_prefix_len("a", "ab", 1);
|
||||
aux_test_common_prefix_len("", "ab", 0);
|
||||
aux_test_common_prefix_len("ab", "abc", 2);
|
||||
aux_test_common_prefix_len("abde", "abce", 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_long_key_diff() {
|
||||
let long_key = (0..1_024).map(|x| (x % 255) as u8).collect::<Vec<_>>();
|
||||
let long_key2 = (1..300).map(|x| (x % 255) as u8).collect::<Vec<_>>();
|
||||
let mut buffer = vec![];
|
||||
{
|
||||
let mut sstable_writer = VoidSSTable::writer(&mut buffer);
|
||||
assert!(sstable_writer.write(&long_key[..], &()).is_ok());
|
||||
assert!(sstable_writer.write(&[0, 3, 4], &()).is_ok());
|
||||
assert!(sstable_writer.write(&long_key2[..], &()).is_ok());
|
||||
assert!(sstable_writer.finalize().is_ok());
|
||||
}
|
||||
let mut sstable_reader = VoidSSTable::reader(&buffer[..]);
|
||||
assert!(sstable_reader.advance().unwrap());
|
||||
assert_eq!(sstable_reader.key(), &long_key[..]);
|
||||
assert!(sstable_reader.advance().unwrap());
|
||||
assert_eq!(sstable_reader.key(), &[0, 3, 4]);
|
||||
assert!(sstable_reader.advance().unwrap());
|
||||
assert_eq!(sstable_reader.key(), &long_key2[..]);
|
||||
assert!(!sstable_reader.advance().unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_simple_sstable() {
|
||||
let mut buffer = vec![];
|
||||
{
|
||||
let mut sstable_writer = VoidSSTable::writer(&mut buffer);
|
||||
assert!(sstable_writer.write(&[17u8], &()).is_ok());
|
||||
assert!(sstable_writer.write(&[17u8, 18u8, 19u8], &()).is_ok());
|
||||
assert!(sstable_writer.write(&[17u8, 20u8], &()).is_ok());
|
||||
assert!(sstable_writer.finalize().is_ok());
|
||||
}
|
||||
assert_eq!(
|
||||
&buffer,
|
||||
&[
|
||||
// block len
|
||||
7u8, 0u8, 0u8, 0u8, // keep 0 push 1 | ""
|
||||
16u8, 17u8, // keep 1 push 2 | 18 19
|
||||
33u8, 18u8, 19u8, // keep 1 push 1 | 20
|
||||
17u8, 20u8, 0u8, 0u8, 0u8, 0u8, // no more blocks
|
||||
// index
|
||||
161, 102, 98, 108, 111, 99, 107, 115, 129, 162, 104, 108, 97, 115, 116, 95, 107,
|
||||
101, 121, 130, 17, 20, 106, 98, 108, 111, 99, 107, 95, 97, 100, 100, 114, 163, 108,
|
||||
115, 116, 97, 114, 116, 95, 111, 102, 102, 115, 101, 116, 0, 106, 101, 110, 100,
|
||||
95, 111, 102, 102, 115, 101, 116, 11, 109, 102, 105, 114, 115, 116, 95, 111, 114,
|
||||
100, 105, 110, 97, 108, 0, 15, 0, 0, 0, 0, 0, 0, 0, // offset for the index
|
||||
3u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8 // num terms
|
||||
]
|
||||
);
|
||||
let mut sstable_reader = VoidSSTable::reader(&buffer[..]);
|
||||
assert!(sstable_reader.advance().unwrap());
|
||||
assert_eq!(sstable_reader.key(), &[17u8]);
|
||||
assert!(sstable_reader.advance().unwrap());
|
||||
assert_eq!(sstable_reader.key(), &[17u8, 18u8, 19u8]);
|
||||
assert!(sstable_reader.advance().unwrap());
|
||||
assert_eq!(sstable_reader.key(), &[17u8, 20u8]);
|
||||
assert!(!sstable_reader.advance().unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
fn test_simple_sstable_non_increasing_key() {
|
||||
let mut buffer = vec![];
|
||||
let mut sstable_writer = VoidSSTable::writer(&mut buffer);
|
||||
assert!(sstable_writer.write(&[17u8], &()).is_ok());
|
||||
assert!(sstable_writer.write(&[16u8], &()).is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_merge_abcd_abe() {
|
||||
let mut buffer = Vec::new();
|
||||
{
|
||||
let mut writer = VoidSSTable::writer(&mut buffer);
|
||||
writer.write(b"abcd", &()).unwrap();
|
||||
writer.write(b"abe", &()).unwrap();
|
||||
writer.finalize().unwrap();
|
||||
}
|
||||
let mut output = Vec::new();
|
||||
assert!(VoidSSTable::merge(vec![&buffer[..], &buffer[..]], &mut output, VoidMerge).is_ok());
|
||||
assert_eq!(&output[..], &buffer[..]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_sstable() {
|
||||
let mut buffer = Vec::new();
|
||||
{
|
||||
let mut writer = VoidSSTable::writer(&mut buffer);
|
||||
writer.write(b"abcd", &()).unwrap();
|
||||
writer.write(b"abe", &()).unwrap();
|
||||
writer.finalize().unwrap();
|
||||
}
|
||||
let mut output = Vec::new();
|
||||
assert!(VoidSSTable::merge(vec![&buffer[..], &buffer[..]], &mut output, VoidMerge).is_ok());
|
||||
assert_eq!(&output[..], &buffer[..]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_sstable_u64() -> io::Result<()> {
|
||||
let mut buffer = Vec::new();
|
||||
let mut writer = SSTableMonotonicU64::writer(&mut buffer);
|
||||
writer.write(b"abcd", &1u64)?;
|
||||
writer.write(b"abe", &4u64)?;
|
||||
writer.write(b"gogo", &4324234234234234u64)?;
|
||||
writer.finalize()?;
|
||||
let mut reader = SSTableMonotonicU64::reader(&buffer[..]);
|
||||
assert!(reader.advance()?);
|
||||
assert_eq!(reader.key(), b"abcd");
|
||||
assert_eq!(reader.value(), &1u64);
|
||||
assert!(reader.advance()?);
|
||||
assert_eq!(reader.key(), b"abe");
|
||||
assert_eq!(reader.value(), &4u64);
|
||||
assert!(reader.advance()?);
|
||||
assert_eq!(reader.key(), b"gogo");
|
||||
assert_eq!(reader.value(), &4324234234234234u64);
|
||||
assert!(!reader.advance()?);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -1,90 +0,0 @@
|
||||
use std::io;
|
||||
|
||||
use serde;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Default, Debug, Serialize, Deserialize)]
|
||||
pub struct SSTableIndex {
|
||||
blocks: Vec<BlockMeta>,
|
||||
}
|
||||
|
||||
impl SSTableIndex {
|
||||
pub fn load(data: &[u8]) -> SSTableIndex {
|
||||
// TODO
|
||||
serde_cbor::de::from_slice(data).unwrap()
|
||||
}
|
||||
|
||||
pub fn search(&self, key: &[u8]) -> Option<BlockAddr> {
|
||||
self.blocks
|
||||
.iter()
|
||||
.find(|block| &block.last_key[..] >= &key)
|
||||
.map(|block| block.block_addr)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Eq, PartialEq, Debug, Copy, Serialize, Deserialize)]
|
||||
pub struct BlockAddr {
|
||||
pub start_offset: u64,
|
||||
pub end_offset: u64,
|
||||
pub first_ordinal: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct BlockMeta {
|
||||
pub last_key: Vec<u8>,
|
||||
pub block_addr: BlockAddr,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct SSTableIndexBuilder {
|
||||
index: SSTableIndex,
|
||||
}
|
||||
|
||||
impl SSTableIndexBuilder {
|
||||
pub fn add_block(
|
||||
&mut self,
|
||||
last_key: &[u8],
|
||||
start_offset: u64,
|
||||
stop_offset: u64,
|
||||
first_ordinal: u64,
|
||||
) {
|
||||
self.index.blocks.push(BlockMeta {
|
||||
last_key: last_key.to_vec(),
|
||||
block_addr: BlockAddr {
|
||||
start_offset,
|
||||
end_offset: stop_offset,
|
||||
first_ordinal,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
pub fn serialize(&self, wrt: &mut dyn io::Write) -> io::Result<()> {
|
||||
serde_cbor::ser::to_writer(wrt, &self.index).unwrap();
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{BlockAddr, SSTableIndex, SSTableIndexBuilder};
|
||||
|
||||
#[test]
|
||||
fn test_sstable_index() {
|
||||
let mut sstable_builder = SSTableIndexBuilder::default();
|
||||
sstable_builder.add_block(b"aaa", 10u64, 20u64, 0u64);
|
||||
sstable_builder.add_block(b"bbbbbbb", 20u64, 30u64, 564);
|
||||
sstable_builder.add_block(b"ccc", 30u64, 40u64, 10u64);
|
||||
sstable_builder.add_block(b"dddd", 40u64, 50u64, 15u64);
|
||||
let mut buffer: Vec<u8> = Vec::new();
|
||||
sstable_builder.serialize(&mut buffer).unwrap();
|
||||
let sstable = SSTableIndex::load(&buffer[..]);
|
||||
assert_eq!(
|
||||
sstable.search(b"bbbde"),
|
||||
Some(BlockAddr {
|
||||
first_ordinal: 10u64,
|
||||
start_offset: 30u64,
|
||||
end_offset: 40u64
|
||||
})
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -1,94 +0,0 @@
|
||||
use super::{vint, BlockReader};
|
||||
use std::io;
|
||||
|
||||
pub trait ValueReader: Default {
|
||||
type Value;
|
||||
|
||||
fn value(&self, idx: usize) -> &Self::Value;
|
||||
|
||||
fn read(&mut self, reader: &mut BlockReader) -> io::Result<()>;
|
||||
}
|
||||
|
||||
pub trait ValueWriter: Default {
|
||||
type Value;
|
||||
|
||||
fn write(&mut self, val: &Self::Value);
|
||||
|
||||
fn write_block(&mut self, writer: &mut Vec<u8>);
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct VoidReader;
|
||||
|
||||
impl ValueReader for VoidReader {
|
||||
type Value = ();
|
||||
|
||||
fn value(&self, _idx: usize) -> &() {
|
||||
&()
|
||||
}
|
||||
|
||||
fn read(&mut self, _reader: &mut BlockReader) -> io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct VoidWriter;
|
||||
|
||||
impl ValueWriter for VoidWriter {
|
||||
type Value = ();
|
||||
|
||||
fn write(&mut self, _val: &()) {}
|
||||
|
||||
fn write_block(&mut self, _writer: &mut Vec<u8>) {}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct U64MonotonicWriter {
|
||||
vals: Vec<u64>,
|
||||
}
|
||||
|
||||
impl ValueWriter for U64MonotonicWriter {
|
||||
type Value = u64;
|
||||
|
||||
fn write(&mut self, val: &Self::Value) {
|
||||
self.vals.push(*val);
|
||||
}
|
||||
|
||||
fn write_block(&mut self, writer: &mut Vec<u8>) {
|
||||
let mut prev_val = 0u64;
|
||||
vint::serialize_into_vec(self.vals.len() as u64, writer);
|
||||
for &val in &self.vals {
|
||||
let delta = val - prev_val;
|
||||
vint::serialize_into_vec(delta, writer);
|
||||
prev_val = val;
|
||||
}
|
||||
self.vals.clear();
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct U64MonotonicReader {
|
||||
vals: Vec<u64>,
|
||||
}
|
||||
|
||||
impl ValueReader for U64MonotonicReader {
|
||||
type Value = u64;
|
||||
|
||||
fn value(&self, idx: usize) -> &Self::Value {
|
||||
&self.vals[idx]
|
||||
}
|
||||
|
||||
fn read(&mut self, reader: &mut BlockReader) -> io::Result<()> {
|
||||
let len = reader.deserialize_u64() as usize;
|
||||
self.vals.clear();
|
||||
let mut prev_val = 0u64;
|
||||
for _ in 0..len {
|
||||
let delta = reader.deserialize_u64() as u64;
|
||||
let val = prev_val + delta;
|
||||
self.vals.push(val);
|
||||
prev_val = val;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -1,74 +0,0 @@
|
||||
use super::BlockReader;
|
||||
|
||||
const CONTINUE_BIT: u8 = 128u8;
|
||||
|
||||
pub fn serialize(mut val: u64, buffer: &mut [u8]) -> usize {
|
||||
for (i, b) in buffer.iter_mut().enumerate() {
|
||||
let next_byte: u8 = (val & 127u64) as u8;
|
||||
val = val >> 7;
|
||||
if val == 0u64 {
|
||||
*b = next_byte;
|
||||
return i + 1;
|
||||
} else {
|
||||
*b = next_byte | CONTINUE_BIT;
|
||||
}
|
||||
}
|
||||
10 //< actually unreachable
|
||||
}
|
||||
|
||||
pub fn serialize_into_vec(val: u64, buffer: &mut Vec<u8>) {
|
||||
let mut buf = [0u8; 10];
|
||||
let num_bytes = serialize(val, &mut buf[..]);
|
||||
buffer.extend_from_slice(&buf[..num_bytes]);
|
||||
}
|
||||
|
||||
// super slow but we don't care
|
||||
pub fn deserialize_read(buf: &[u8]) -> (usize, u64) {
|
||||
let mut result = 0u64;
|
||||
let mut shift = 0u64;
|
||||
let mut consumed = 0;
|
||||
|
||||
for &b in buf {
|
||||
consumed += 1;
|
||||
result |= u64::from(b % 128u8) << shift;
|
||||
if b < CONTINUE_BIT {
|
||||
break;
|
||||
}
|
||||
shift += 7;
|
||||
}
|
||||
(consumed, result)
|
||||
}
|
||||
|
||||
pub fn deserialize_from_block(block: &mut BlockReader) -> u64 {
|
||||
let (num_bytes, val) = deserialize_read(block.buffer());
|
||||
block.advance(num_bytes);
|
||||
val
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{deserialize_read, serialize};
|
||||
use std::u64;
|
||||
|
||||
fn aux_test_int(val: u64, expect_len: usize) {
|
||||
let mut buffer = [0u8; 14];
|
||||
assert_eq!(serialize(val, &mut buffer[..]), expect_len);
|
||||
assert_eq!(deserialize_read(&buffer), (expect_len, val));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_vint() {
|
||||
aux_test_int(0u64, 1);
|
||||
aux_test_int(17u64, 1);
|
||||
aux_test_int(127u64, 1);
|
||||
aux_test_int(128u64, 2);
|
||||
aux_test_int(123423418u64, 4);
|
||||
for i in 1..63 {
|
||||
let power_of_two = 1u64 << i;
|
||||
aux_test_int(power_of_two + 1, (i / 7) + 1);
|
||||
aux_test_int(power_of_two, (i / 7) + 1);
|
||||
aux_test_int(power_of_two - 1, ((i - 1) / 7) + 1);
|
||||
}
|
||||
aux_test_int(u64::MAX, 10);
|
||||
}
|
||||
}
|
||||
@@ -1,227 +0,0 @@
|
||||
use super::TermDictionary;
|
||||
use crate::postings::TermInfo;
|
||||
use crate::termdict::sstable_termdict::TermInfoReader;
|
||||
use crate::termdict::TermOrdinal;
|
||||
use std::io;
|
||||
use std::ops::Bound;
|
||||
use tantivy_fst::automaton::AlwaysMatch;
|
||||
use tantivy_fst::Automaton;
|
||||
|
||||
/// `TermStreamerBuilder` is a helper object used to define
|
||||
/// a range of terms that should be streamed.
|
||||
pub struct TermStreamerBuilder<'a, A = AlwaysMatch>
|
||||
where
|
||||
A: Automaton,
|
||||
A::State: Clone,
|
||||
{
|
||||
term_dict: &'a TermDictionary,
|
||||
automaton: A,
|
||||
lower: Bound<Vec<u8>>,
|
||||
upper: Bound<Vec<u8>>,
|
||||
}
|
||||
|
||||
impl<'a, A> TermStreamerBuilder<'a, A>
|
||||
where
|
||||
A: Automaton,
|
||||
A::State: Clone,
|
||||
{
|
||||
pub(crate) fn new(term_dict: &'a TermDictionary, automaton: A) -> Self {
|
||||
TermStreamerBuilder {
|
||||
term_dict,
|
||||
automaton,
|
||||
lower: Bound::Unbounded,
|
||||
upper: Bound::Unbounded,
|
||||
}
|
||||
}
|
||||
|
||||
/// Limit the range to terms greater or equal to the bound
|
||||
pub fn ge<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
|
||||
self.lower = Bound::Included(bound.as_ref().to_owned());
|
||||
self
|
||||
}
|
||||
|
||||
/// Limit the range to terms strictly greater than the bound
|
||||
pub fn gt<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
|
||||
self.lower = Bound::Excluded(bound.as_ref().to_owned());
|
||||
self
|
||||
}
|
||||
|
||||
/// Limit the range to terms lesser or equal to the bound
|
||||
pub fn le<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
|
||||
self.upper = Bound::Included(bound.as_ref().to_owned());
|
||||
self
|
||||
}
|
||||
|
||||
/// Limit the range to terms lesser or equal to the bound
|
||||
pub fn lt<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
|
||||
self.lower = Bound::Excluded(bound.as_ref().to_owned());
|
||||
self
|
||||
}
|
||||
|
||||
pub fn backward(mut self) -> Self {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
/// Creates the stream corresponding to the range
|
||||
/// of terms defined using the `TermStreamerBuilder`.
|
||||
pub fn into_stream(self) -> io::Result<TermStreamer<'a, A>> {
|
||||
let start_state = self.automaton.start();
|
||||
let delta_reader = self.term_dict.sstable_delta_reader()?;
|
||||
Ok(TermStreamer {
|
||||
automaton: self.automaton,
|
||||
states: vec![start_state],
|
||||
delta_reader,
|
||||
key: Vec::new(),
|
||||
term_ord: 0u64,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// `TermStreamer` acts as a cursor over a range of terms of a segment.
|
||||
/// Terms are guaranteed to be sorted.
|
||||
pub struct TermStreamer<'a, A = AlwaysMatch>
|
||||
where
|
||||
A: Automaton,
|
||||
A::State: Clone,
|
||||
{
|
||||
automaton: A,
|
||||
states: Vec<A::State>,
|
||||
delta_reader: super::sstable::DeltaReader<'a, TermInfoReader>,
|
||||
key: Vec<u8>,
|
||||
term_ord: TermOrdinal,
|
||||
}
|
||||
|
||||
impl<'a, A> TermStreamer<'a, A>
|
||||
where
|
||||
A: Automaton,
|
||||
A::State: Clone,
|
||||
{
|
||||
/// Advance position the stream on the next item.
|
||||
/// Before the first call to `.advance()`, the stream
|
||||
/// is an unitialized state.
|
||||
pub fn advance(&mut self) -> bool {
|
||||
while self.delta_reader.advance().unwrap() {
|
||||
self.term_ord += 1u64;
|
||||
let common_prefix_len = self.delta_reader.common_prefix_len();
|
||||
self.states.truncate(common_prefix_len + 1);
|
||||
self.key.truncate(common_prefix_len);
|
||||
let mut state: A::State = self.states.last().unwrap().clone();
|
||||
for &b in self.delta_reader.suffix() {
|
||||
state = self.automaton.accept(&state, b);
|
||||
self.states.push(state.clone());
|
||||
}
|
||||
self.key.extend_from_slice(self.delta_reader.suffix());
|
||||
if self.automaton.is_match(&state) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
/// Returns the `TermOrdinal` of the given term.
|
||||
///
|
||||
/// May panic if the called as `.advance()` as never
|
||||
/// been called before.
|
||||
pub fn term_ord(&self) -> TermOrdinal {
|
||||
self.term_ord
|
||||
}
|
||||
|
||||
/// Accesses the current key.
|
||||
///
|
||||
/// `.key()` should return the key that was returned
|
||||
/// by the `.next()` method.
|
||||
///
|
||||
/// If the end of the stream as been reached, and `.next()`
|
||||
/// has been called and returned `None`, `.key()` remains
|
||||
/// the value of the last key encountered.
|
||||
///
|
||||
/// Before any call to `.next()`, `.key()` returns an empty array.
|
||||
pub fn key(&self) -> &[u8] {
|
||||
&self.key
|
||||
}
|
||||
|
||||
/// Accesses the current value.
|
||||
///
|
||||
/// Calling `.value()` after the end of the stream will return the
|
||||
/// last `.value()` encountered.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Calling `.value()` before the first call to `.advance()` returns
|
||||
/// `V::default()`.
|
||||
pub fn value(&self) -> &TermInfo {
|
||||
self.delta_reader.value()
|
||||
}
|
||||
|
||||
/// Return the next `(key, value)` pair.
|
||||
#[cfg_attr(feature = "cargo-clippy", allow(clippy::should_implement_trait))]
|
||||
pub fn next(&mut self) -> Option<(&[u8], &TermInfo)> {
|
||||
if self.advance() {
|
||||
Some((self.key(), self.value()))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::super::TermDictionary;
|
||||
use crate::directory::OwnedBytes;
|
||||
use crate::postings::TermInfo;
|
||||
|
||||
fn make_term_info(i: u64) -> TermInfo {
|
||||
TermInfo {
|
||||
doc_freq: 1000u32 + i as u32,
|
||||
positions_idx: i * 500,
|
||||
postings_start_offset: (i + 10) * (i * 10),
|
||||
postings_stop_offset: ((i + 1) + 10) * ((i + 1) * 10),
|
||||
}
|
||||
}
|
||||
|
||||
fn create_test_term_dictionary() -> crate::Result<TermDictionary> {
|
||||
let mut term_dict_builder = super::super::TermDictionaryBuilder::create(Vec::new())?;
|
||||
term_dict_builder.insert(b"abaisance", &make_term_info(0u64))?;
|
||||
term_dict_builder.insert(b"abalation", &make_term_info(1u64))?;
|
||||
term_dict_builder.insert(b"abalienate", &make_term_info(2u64))?;
|
||||
term_dict_builder.insert(b"abandon", &make_term_info(3u64))?;
|
||||
let buffer = term_dict_builder.finish()?;
|
||||
let owned_bytes = OwnedBytes::new(buffer);
|
||||
TermDictionary::from_bytes(owned_bytes)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_sstable_stream() -> crate::Result<()> {
|
||||
let term_dict = create_test_term_dictionary()?;
|
||||
let mut term_streamer = term_dict.stream()?;
|
||||
assert!(term_streamer.advance());
|
||||
assert_eq!(term_streamer.key(), b"abaisance");
|
||||
assert_eq!(term_streamer.value().doc_freq, 1000u32);
|
||||
assert!(term_streamer.advance());
|
||||
assert_eq!(term_streamer.key(), b"abalation");
|
||||
assert_eq!(term_streamer.value().doc_freq, 1001u32);
|
||||
assert!(term_streamer.advance());
|
||||
assert_eq!(term_streamer.key(), b"abalienate");
|
||||
assert_eq!(term_streamer.value().doc_freq, 1002u32);
|
||||
assert!(term_streamer.advance());
|
||||
assert_eq!(term_streamer.key(), b"abandon");
|
||||
assert_eq!(term_streamer.value().doc_freq, 1003u32);
|
||||
assert!(!term_streamer.advance());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_sstable_search() -> crate::Result<()> {
|
||||
let term_dict = create_test_term_dictionary()?;
|
||||
let ptn = tantivy_fst::Regex::new("ab.*t.*").unwrap();
|
||||
let mut term_streamer = term_dict.search(ptn).into_stream()?;
|
||||
assert!(term_streamer.advance());
|
||||
assert_eq!(term_streamer.key(), b"abalation");
|
||||
assert_eq!(term_streamer.value().doc_freq, 1001u32);
|
||||
assert!(term_streamer.advance());
|
||||
assert_eq!(term_streamer.key(), b"abalienate");
|
||||
assert_eq!(term_streamer.value().doc_freq, 1002u32);
|
||||
assert!(!term_streamer.advance());
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -1,228 +0,0 @@
|
||||
use std::io;
|
||||
|
||||
use crate::common::BinarySerializable;
|
||||
use crate::directory::{FileSlice, OwnedBytes};
|
||||
use crate::postings::TermInfo;
|
||||
use crate::termdict::sstable_termdict::sstable::sstable_index::BlockAddr;
|
||||
use crate::termdict::sstable_termdict::sstable::Writer;
|
||||
use crate::termdict::sstable_termdict::sstable::{DeltaReader, SSTable};
|
||||
use crate::termdict::sstable_termdict::sstable::{Reader, SSTableIndex};
|
||||
use crate::termdict::sstable_termdict::{
|
||||
TermInfoReader, TermInfoWriter, TermSSTable, TermStreamer, TermStreamerBuilder,
|
||||
};
|
||||
use crate::termdict::TermOrdinal;
|
||||
use crate::HasLen;
|
||||
use once_cell::sync::Lazy;
|
||||
use tantivy_fst::automaton::AlwaysMatch;
|
||||
use tantivy_fst::Automaton;
|
||||
|
||||
pub struct TermInfoSSTable;
|
||||
impl SSTable for TermInfoSSTable {
|
||||
type Value = TermInfo;
|
||||
type Reader = TermInfoReader;
|
||||
type Writer = TermInfoWriter;
|
||||
}
|
||||
pub struct TermDictionaryBuilder<W: io::Write> {
|
||||
sstable_writer: Writer<W, TermInfoWriter>,
|
||||
}
|
||||
|
||||
impl<W: io::Write> TermDictionaryBuilder<W> {
|
||||
/// Creates a new `TermDictionaryBuilder`
|
||||
pub fn create(w: W) -> io::Result<Self> {
|
||||
let sstable_writer = TermSSTable::writer(w);
|
||||
Ok(TermDictionaryBuilder { sstable_writer })
|
||||
}
|
||||
|
||||
/// Inserts a `(key, value)` pair in the term dictionary.
|
||||
///
|
||||
/// *Keys have to be inserted in order.*
|
||||
pub fn insert<K: AsRef<[u8]>>(&mut self, key_ref: K, value: &TermInfo) -> io::Result<()> {
|
||||
let key = key_ref.as_ref();
|
||||
self.insert_key(key)?;
|
||||
self.insert_value(value)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// # Warning
|
||||
/// Horribly dangerous internal API
|
||||
///
|
||||
/// If used, it must be used by systematically alternating calls
|
||||
/// to insert_key and insert_value.
|
||||
///
|
||||
/// Prefer using `.insert(key, value)`
|
||||
pub(crate) fn insert_key(&mut self, key: &[u8]) -> io::Result<()> {
|
||||
self.sstable_writer.write_key(key);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// # Warning
|
||||
///
|
||||
/// Horribly dangerous internal API. See `.insert_key(...)`.
|
||||
pub(crate) fn insert_value(&mut self, term_info: &TermInfo) -> io::Result<()> {
|
||||
self.sstable_writer.write_value(term_info);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Finalize writing the builder, and returns the underlying
|
||||
/// `Write` object.
|
||||
pub fn finish(self) -> io::Result<W> {
|
||||
self.sstable_writer.finalize()
|
||||
}
|
||||
}
|
||||
|
||||
static EMPTY_TERM_DICT_FILE: Lazy<FileSlice> = Lazy::new(|| {
|
||||
let term_dictionary_data: Vec<u8> = TermDictionaryBuilder::create(Vec::<u8>::new())
|
||||
.expect("Creating a TermDictionaryBuilder in a Vec<u8> should never fail")
|
||||
.finish()
|
||||
.expect("Writing in a Vec<u8> should never fail");
|
||||
FileSlice::from(term_dictionary_data)
|
||||
});
|
||||
|
||||
/// The term dictionary contains all of the terms in
|
||||
/// `tantivy index` in a sorted manner.
|
||||
///
|
||||
/// The `Fst` crate is used to associate terms to their
|
||||
/// respective `TermOrdinal`. The `TermInfoStore` then makes it
|
||||
/// possible to fetch the associated `TermInfo`.
|
||||
pub struct TermDictionary {
|
||||
sstable_slice: FileSlice,
|
||||
sstable_index: SSTableIndex,
|
||||
num_terms: u64,
|
||||
}
|
||||
|
||||
impl TermDictionary {
|
||||
pub(crate) fn sstable_reader(&self) -> io::Result<Reader<'static, TermInfoReader>> {
|
||||
let data = self.sstable_slice.read_bytes()?;
|
||||
Ok(TermInfoSSTable::reader(data))
|
||||
}
|
||||
|
||||
pub(crate) fn sstable_reader_block(
|
||||
&self,
|
||||
block_addr: BlockAddr,
|
||||
) -> io::Result<Reader<'static, TermInfoReader>> {
|
||||
let data = self.sstable_slice.read_bytes_slice(
|
||||
block_addr.start_offset as usize,
|
||||
block_addr.end_offset as usize,
|
||||
)?;
|
||||
Ok(TermInfoSSTable::reader(data))
|
||||
}
|
||||
|
||||
pub(crate) fn sstable_delta_reader(&self) -> io::Result<DeltaReader<'static, TermInfoReader>> {
|
||||
let data = self.sstable_slice.read_bytes()?;
|
||||
Ok(TermInfoSSTable::delta_reader(data))
|
||||
}
|
||||
|
||||
/// Opens a `TermDictionary`.
|
||||
pub fn open(term_dictionary_file: FileSlice) -> crate::Result<Self> {
|
||||
let (main_slice, footer_len_slice) = term_dictionary_file.split_from_end(16);
|
||||
let mut footer_len_bytes: OwnedBytes = footer_len_slice.read_bytes()?;
|
||||
let index_offset = u64::deserialize(&mut footer_len_bytes)?;
|
||||
let num_terms = u64::deserialize(&mut footer_len_bytes)?;
|
||||
let (sstable_slice, index_slice) = main_slice.split(index_offset as usize);
|
||||
// dbg!(index_slice.len());
|
||||
let sstable_index_bytes = index_slice.read_bytes()?;
|
||||
let sstable_index = SSTableIndex::load(sstable_index_bytes.as_slice());
|
||||
// dbg!(&sstable_index);
|
||||
Ok(TermDictionary {
|
||||
sstable_slice,
|
||||
sstable_index,
|
||||
num_terms,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn from_bytes(owned_bytes: OwnedBytes) -> crate::Result<TermDictionary> {
|
||||
TermDictionary::open(FileSlice::new(Box::new(owned_bytes)))
|
||||
}
|
||||
|
||||
/// Creates an empty term dictionary which contains no terms.
|
||||
pub fn empty() -> Self {
|
||||
TermDictionary::open(EMPTY_TERM_DICT_FILE.clone()).unwrap()
|
||||
}
|
||||
|
||||
/// Returns the number of terms in the dictionary.
|
||||
/// Term ordinals range from 0 to `num_terms() - 1`.
|
||||
pub fn num_terms(&self) -> usize {
|
||||
self.num_terms as usize
|
||||
}
|
||||
|
||||
/// Returns the ordinal associated to a given term.
|
||||
pub fn term_ord<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TermOrdinal>> {
|
||||
let mut term_ord = 0u64;
|
||||
let key_bytes = key.as_ref();
|
||||
let mut sstable_reader = self.sstable_reader()?;
|
||||
while sstable_reader.advance().unwrap_or(false) {
|
||||
if sstable_reader.key() == key_bytes {
|
||||
return Ok(Some(term_ord));
|
||||
}
|
||||
term_ord += 1;
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Returns the term associated to a given term ordinal.
|
||||
///
|
||||
/// Term ordinals are defined as the position of the term in
|
||||
/// the sorted list of terms.
|
||||
///
|
||||
/// Returns true iff the term has been found.
|
||||
///
|
||||
/// Regardless of whether the term is found or not,
|
||||
/// the buffer may be modified.
|
||||
pub fn ord_to_term(&self, ord: TermOrdinal, bytes: &mut Vec<u8>) -> io::Result<bool> {
|
||||
let mut sstable_reader = self.sstable_reader()?;
|
||||
bytes.clear();
|
||||
for _ in 0..(ord + 1) {
|
||||
if !sstable_reader.advance().unwrap_or(false) {
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
bytes.extend_from_slice(sstable_reader.key());
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
/// Returns the number of terms in the dictionary.
|
||||
pub fn term_info_from_ord(&self, term_ord: TermOrdinal) -> io::Result<TermInfo> {
|
||||
let mut sstable_reader = self.sstable_reader()?;
|
||||
for _ in 0..(term_ord + 1) {
|
||||
if !sstable_reader.advance().unwrap_or(false) {
|
||||
return Ok(TermInfo::default());
|
||||
}
|
||||
}
|
||||
Ok(sstable_reader.value().clone())
|
||||
}
|
||||
|
||||
/// Lookups the value corresponding to the key.
|
||||
pub fn get<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TermInfo>> {
|
||||
if let Some(block_addr) = self.sstable_index.search(key.as_ref()) {
|
||||
let mut sstable_reader = self.sstable_reader_block(block_addr)?;
|
||||
let key_bytes = key.as_ref();
|
||||
while sstable_reader.advance().unwrap_or(false) {
|
||||
if sstable_reader.key() == key_bytes {
|
||||
let term_info = sstable_reader.value().clone();
|
||||
return Ok(Some(term_info));
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
// Returns a range builder, to stream all of the terms
|
||||
// within an interval.
|
||||
pub fn range(&self) -> TermStreamerBuilder<'_> {
|
||||
TermStreamerBuilder::new(self, AlwaysMatch)
|
||||
}
|
||||
|
||||
// A stream of all the sorted terms. [See also `.stream_field()`](#method.stream_field)
|
||||
pub fn stream(&self) -> io::Result<TermStreamer<'_>> {
|
||||
self.range().into_stream()
|
||||
}
|
||||
|
||||
// Returns a search builder, to stream all of the terms
|
||||
// within the Automaton
|
||||
pub fn search<'a, A: Automaton + 'a>(&'a self, automaton: A) -> TermStreamerBuilder<'a, A>
|
||||
where
|
||||
A::State: Clone,
|
||||
{
|
||||
TermStreamerBuilder::<A>::new(self, automaton)
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,3 @@
|
||||
use std::io;
|
||||
|
||||
use super::TermDictionary;
|
||||
use crate::postings::TermInfo;
|
||||
use crate::termdict::TermOrdinal;
|
||||
@@ -61,14 +59,14 @@ where
|
||||
|
||||
/// Creates the stream corresponding to the range
|
||||
/// of terms defined using the `TermStreamerBuilder`.
|
||||
pub fn into_stream(self) -> io::Result<TermStreamer<'a, A>> {
|
||||
Ok(TermStreamer {
|
||||
pub fn into_stream(self) -> TermStreamer<'a, A> {
|
||||
TermStreamer {
|
||||
fst_map: self.fst_map,
|
||||
stream: self.stream_builder.into_stream(),
|
||||
term_ord: 0u64,
|
||||
current_key: Vec::with_capacity(100),
|
||||
current_value: TermInfo::default(),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -80,6 +80,7 @@ where
|
||||
.serialize(&mut counting_writer)?;
|
||||
let footer_size = counting_writer.written_bytes();
|
||||
(footer_size as u64).serialize(&mut counting_writer)?;
|
||||
counting_writer.flush()?;
|
||||
}
|
||||
Ok(file)
|
||||
}
|
||||
@@ -138,8 +139,8 @@ impl TermDictionary {
|
||||
}
|
||||
|
||||
/// Returns the ordinal associated to a given term.
|
||||
pub fn term_ord<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TermOrdinal>> {
|
||||
Ok(self.fst_index.get(key))
|
||||
pub fn term_ord<K: AsRef<[u8]>>(&self, key: K) -> Option<TermOrdinal> {
|
||||
self.fst_index.get(key)
|
||||
}
|
||||
|
||||
/// Returns the term associated to a given term ordinal.
|
||||
@@ -151,7 +152,7 @@ impl TermDictionary {
|
||||
///
|
||||
/// Regardless of whether the term is found or not,
|
||||
/// the buffer may be modified.
|
||||
pub fn ord_to_term(&self, mut ord: TermOrdinal, bytes: &mut Vec<u8>) -> io::Result<bool> {
|
||||
pub fn ord_to_term(&self, mut ord: TermOrdinal, bytes: &mut Vec<u8>) -> bool {
|
||||
bytes.clear();
|
||||
let fst = self.fst_index.as_fst();
|
||||
let mut node = fst.root();
|
||||
@@ -166,10 +167,10 @@ impl TermDictionary {
|
||||
let new_node_addr = transition.addr;
|
||||
node = fst.node(new_node_addr);
|
||||
} else {
|
||||
return Ok(false);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
Ok(true)
|
||||
true
|
||||
}
|
||||
|
||||
/// Returns the number of terms in the dictionary.
|
||||
@@ -178,10 +179,9 @@ impl TermDictionary {
|
||||
}
|
||||
|
||||
/// Lookups the value corresponding to the key.
|
||||
pub fn get<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TermInfo>> {
|
||||
Ok(self
|
||||
.term_ord(key)?
|
||||
.map(|term_ord| self.term_info_from_ord(term_ord)))
|
||||
pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Option<TermInfo> {
|
||||
self.term_ord(key)
|
||||
.map(|term_ord| self.term_info_from_ord(term_ord))
|
||||
}
|
||||
|
||||
/// Returns a range builder, to stream all of the terms
|
||||
@@ -191,7 +191,7 @@ impl TermDictionary {
|
||||
}
|
||||
|
||||
/// A stream of all the sorted terms. [See also `.stream_field()`](#method.stream_field)
|
||||
pub fn stream(&self) -> io::Result<TermStreamer<'_>> {
|
||||
pub fn stream(&self) -> TermStreamer<'_> {
|
||||
self.range().into_stream()
|
||||
}
|
||||
|
||||
@@ -1,393 +0,0 @@
|
||||
use super::{TermDictionary, TermDictionaryBuilder, TermStreamer};
|
||||
|
||||
use crate::directory::{Directory, FileSlice, RAMDirectory, TerminatingWrite};
|
||||
use crate::postings::TermInfo;
|
||||
|
||||
use std::path::PathBuf;
|
||||
use std::str;
|
||||
|
||||
const BLOCK_SIZE: usize = 1_500;
|
||||
|
||||
fn make_term_info(term_ord: u64) -> TermInfo {
|
||||
let offset = |term_ord: u64| term_ord * 100 + term_ord * term_ord;
|
||||
TermInfo {
|
||||
doc_freq: term_ord as u32,
|
||||
postings_start_offset: offset(term_ord),
|
||||
postings_stop_offset: offset(term_ord + 1),
|
||||
positions_idx: offset(term_ord) * 2u64,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_empty_term_dictionary() {
|
||||
let empty = TermDictionary::empty();
|
||||
assert!(empty.stream().unwrap().next().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_term_ordinals() -> crate::Result<()> {
|
||||
const COUNTRIES: [&'static str; 7] = [
|
||||
"San Marino",
|
||||
"Serbia",
|
||||
"Slovakia",
|
||||
"Slovenia",
|
||||
"Spain",
|
||||
"Sweden",
|
||||
"Switzerland",
|
||||
];
|
||||
let directory = RAMDirectory::create();
|
||||
let path = PathBuf::from("TermDictionary");
|
||||
{
|
||||
let write = directory.open_write(&path)?;
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(write)?;
|
||||
for term in COUNTRIES.iter() {
|
||||
term_dictionary_builder.insert(term.as_bytes(), &make_term_info(0u64))?;
|
||||
}
|
||||
term_dictionary_builder.finish()?.terminate()?;
|
||||
}
|
||||
let term_file = directory.open_read(&path)?;
|
||||
let term_dict: TermDictionary = TermDictionary::open(term_file)?;
|
||||
for (term_ord, term) in COUNTRIES.iter().enumerate() {
|
||||
assert_eq!(term_dict.term_ord(term)?, Some(term_ord as u64));
|
||||
let mut bytes = vec![];
|
||||
assert!(term_dict.ord_to_term(term_ord as u64, &mut bytes)?);
|
||||
assert_eq!(bytes, term.as_bytes());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_term_dictionary_simple() -> crate::Result<()> {
|
||||
let directory = RAMDirectory::create();
|
||||
let path = PathBuf::from("TermDictionary");
|
||||
{
|
||||
let write = directory.open_write(&path)?;
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(write)?;
|
||||
term_dictionary_builder.insert("abc".as_bytes(), &make_term_info(34u64))?;
|
||||
term_dictionary_builder.insert("abcd".as_bytes(), &make_term_info(346u64))?;
|
||||
term_dictionary_builder.finish()?.terminate()?;
|
||||
}
|
||||
let file = directory.open_read(&path)?;
|
||||
let term_dict: TermDictionary = TermDictionary::open(file)?;
|
||||
assert_eq!(term_dict.get("abc")?.unwrap().doc_freq, 34u32);
|
||||
assert_eq!(term_dict.get("abcd")?.unwrap().doc_freq, 346u32);
|
||||
let mut stream = term_dict.stream()?;
|
||||
{
|
||||
{
|
||||
let (k, v) = stream.next().unwrap();
|
||||
assert_eq!(k.as_ref(), "abc".as_bytes());
|
||||
assert_eq!(v.doc_freq, 34u32);
|
||||
}
|
||||
assert_eq!(stream.key(), "abc".as_bytes());
|
||||
assert_eq!(stream.value().doc_freq, 34u32);
|
||||
}
|
||||
{
|
||||
{
|
||||
let (k, v) = stream.next().unwrap();
|
||||
assert_eq!(k, "abcd".as_bytes());
|
||||
assert_eq!(v.doc_freq, 346u32);
|
||||
}
|
||||
assert_eq!(stream.key(), "abcd".as_bytes());
|
||||
assert_eq!(stream.value().doc_freq, 346u32);
|
||||
}
|
||||
assert!(!stream.advance());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_term_dictionary_stream() -> crate::Result<()> {
|
||||
let ids: Vec<_> = (0u32..10_000u32)
|
||||
.map(|i| (format!("doc{:0>6}", i), i))
|
||||
.collect();
|
||||
let buffer: Vec<u8> = {
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
|
||||
for &(ref id, ref i) in &ids {
|
||||
term_dictionary_builder
|
||||
.insert(id.as_bytes(), &make_term_info(*i as u64))
|
||||
.unwrap();
|
||||
}
|
||||
term_dictionary_builder.finish()?
|
||||
};
|
||||
let term_file = FileSlice::from(buffer);
|
||||
let term_dictionary: TermDictionary = TermDictionary::open(term_file)?;
|
||||
{
|
||||
let mut streamer = term_dictionary.stream()?;
|
||||
let mut i = 0;
|
||||
while let Some((streamer_k, streamer_v)) = streamer.next() {
|
||||
let &(ref key, ref v) = &ids[i];
|
||||
assert_eq!(streamer_k.as_ref(), key.as_bytes());
|
||||
assert_eq!(streamer_v, &make_term_info(*v as u64));
|
||||
i += 1;
|
||||
}
|
||||
}
|
||||
|
||||
let &(ref key, ref val) = &ids[2047];
|
||||
assert_eq!(
|
||||
term_dictionary.get(key.as_bytes())?,
|
||||
Some(make_term_info(*val as u64))
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_stream_high_range_prefix_suffix() -> crate::Result<()> {
|
||||
let buffer: Vec<u8> = {
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
|
||||
// term requires more than 16bits
|
||||
term_dictionary_builder.insert("abcdefghijklmnopqrstuvwxy", &make_term_info(1))?;
|
||||
term_dictionary_builder.insert("abcdefghijklmnopqrstuvwxyz", &make_term_info(2))?;
|
||||
term_dictionary_builder.insert("abr", &make_term_info(3))?;
|
||||
term_dictionary_builder.finish()?
|
||||
};
|
||||
let term_dict_file = FileSlice::from(buffer);
|
||||
let term_dictionary: TermDictionary = TermDictionary::open(term_dict_file)?;
|
||||
let mut kv_stream = term_dictionary.stream()?;
|
||||
assert!(kv_stream.advance());
|
||||
assert_eq!(kv_stream.key(), "abcdefghijklmnopqrstuvwxy".as_bytes());
|
||||
assert_eq!(kv_stream.value(), &make_term_info(1));
|
||||
assert!(kv_stream.advance());
|
||||
assert_eq!(kv_stream.key(), "abcdefghijklmnopqrstuvwxyz".as_bytes());
|
||||
assert_eq!(kv_stream.value(), &make_term_info(2));
|
||||
assert!(kv_stream.advance());
|
||||
assert_eq!(kv_stream.key(), "abr".as_bytes());
|
||||
assert_eq!(kv_stream.value(), &make_term_info(3));
|
||||
assert!(!kv_stream.advance());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_stream_range() -> crate::Result<()> {
|
||||
let ids: Vec<_> = (0u32..10_000u32)
|
||||
.map(|i| (format!("doc{:0>6}", i), i))
|
||||
.collect();
|
||||
let buffer: Vec<u8> = {
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
|
||||
for &(ref id, ref i) in &ids {
|
||||
term_dictionary_builder
|
||||
.insert(id.as_bytes(), &make_term_info(*i as u64))
|
||||
.unwrap();
|
||||
}
|
||||
term_dictionary_builder.finish()?
|
||||
};
|
||||
|
||||
let file = FileSlice::from(buffer);
|
||||
|
||||
let term_dictionary: TermDictionary = TermDictionary::open(file)?;
|
||||
{
|
||||
for i in (0..20).chain(6000..8_000) {
|
||||
let &(ref target_key, _) = &ids[i];
|
||||
let mut streamer = term_dictionary
|
||||
.range()
|
||||
.ge(target_key.as_bytes())
|
||||
.into_stream()?;
|
||||
for j in 0..3 {
|
||||
let (streamer_k, streamer_v) = streamer.next().unwrap();
|
||||
let &(ref key, ref v) = &ids[i + j];
|
||||
assert_eq!(str::from_utf8(streamer_k.as_ref()).unwrap(), key);
|
||||
assert_eq!(streamer_v.doc_freq, *v);
|
||||
assert_eq!(streamer_v, &make_term_info(*v as u64));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
for i in (0..20).chain(BLOCK_SIZE - 10..BLOCK_SIZE + 10) {
|
||||
let &(ref target_key, _) = &ids[i];
|
||||
let mut streamer = term_dictionary
|
||||
.range()
|
||||
.gt(target_key.as_bytes())
|
||||
.into_stream()?;
|
||||
for j in 0..3 {
|
||||
let (streamer_k, streamer_v) = streamer.next().unwrap();
|
||||
let &(ref key, ref v) = &ids[i + j + 1];
|
||||
assert_eq!(streamer_k.as_ref(), key.as_bytes());
|
||||
assert_eq!(streamer_v.doc_freq, *v);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
for i in (0..20).chain(BLOCK_SIZE - 10..BLOCK_SIZE + 10) {
|
||||
for j in 0..3 {
|
||||
let &(ref fst_key, _) = &ids[i];
|
||||
let &(ref last_key, _) = &ids[i + j];
|
||||
let mut streamer = term_dictionary
|
||||
.range()
|
||||
.ge(fst_key.as_bytes())
|
||||
.lt(last_key.as_bytes())
|
||||
.into_stream()?;
|
||||
for _ in 0..j {
|
||||
assert!(streamer.next().is_some());
|
||||
}
|
||||
assert!(streamer.next().is_none());
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_empty_string() -> crate::Result<()> {
|
||||
let buffer: Vec<u8> = {
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(vec![]).unwrap();
|
||||
term_dictionary_builder
|
||||
.insert(&[], &make_term_info(1 as u64))
|
||||
.unwrap();
|
||||
term_dictionary_builder
|
||||
.insert(&[1u8], &make_term_info(2 as u64))
|
||||
.unwrap();
|
||||
term_dictionary_builder.finish()?
|
||||
};
|
||||
let file = FileSlice::from(buffer);
|
||||
let term_dictionary: TermDictionary = TermDictionary::open(file)?;
|
||||
let mut stream = term_dictionary.stream()?;
|
||||
assert!(stream.advance());
|
||||
assert!(stream.key().is_empty());
|
||||
assert!(stream.advance());
|
||||
assert_eq!(stream.key(), &[1u8]);
|
||||
assert!(!stream.advance());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_stream_range_boundaries() -> crate::Result<()> {
|
||||
let buffer: Vec<u8> = {
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(Vec::new())?;
|
||||
for i in 0u8..10u8 {
|
||||
let number_arr = [i; 1];
|
||||
term_dictionary_builder.insert(&number_arr, &make_term_info(i as u64))?;
|
||||
}
|
||||
term_dictionary_builder.finish()?
|
||||
};
|
||||
let file = FileSlice::from(buffer);
|
||||
let term_dictionary: TermDictionary = TermDictionary::open(file)?;
|
||||
|
||||
let value_list = |mut streamer: TermStreamer<'_>, backwards: bool| {
|
||||
let mut res: Vec<u32> = vec![];
|
||||
while let Some((_, ref v)) = streamer.next() {
|
||||
res.push(v.doc_freq);
|
||||
}
|
||||
if backwards {
|
||||
res.reverse();
|
||||
}
|
||||
res
|
||||
};
|
||||
{
|
||||
let range = term_dictionary.range().backward().into_stream()?;
|
||||
assert_eq!(
|
||||
value_list(range, true),
|
||||
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().ge([2u8]).into_stream()?;
|
||||
assert_eq!(
|
||||
value_list(range, false),
|
||||
vec![2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().ge([2u8]).backward().into_stream()?;
|
||||
assert_eq!(
|
||||
value_list(range, true),
|
||||
vec![2u32, 3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().gt([2u8]).into_stream()?;
|
||||
assert_eq!(
|
||||
value_list(range, false),
|
||||
vec![3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().gt([2u8]).backward().into_stream()?;
|
||||
assert_eq!(
|
||||
value_list(range, true),
|
||||
vec![3u32, 4u32, 5u32, 6u32, 7u32, 8u32, 9u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().lt([6u8]).into_stream()?;
|
||||
assert_eq!(
|
||||
value_list(range, false),
|
||||
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().lt([6u8]).backward().into_stream()?;
|
||||
assert_eq!(
|
||||
value_list(range, true),
|
||||
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().le([6u8]).into_stream()?;
|
||||
assert_eq!(
|
||||
value_list(range, false),
|
||||
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().le([6u8]).backward().into_stream()?;
|
||||
assert_eq!(
|
||||
value_list(range, true),
|
||||
vec![0u32, 1u32, 2u32, 3u32, 4u32, 5u32, 6u32]
|
||||
);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary.range().ge([0u8]).lt([5u8]).into_stream()?;
|
||||
assert_eq!(value_list(range, false), vec![0u32, 1u32, 2u32, 3u32, 4u32]);
|
||||
}
|
||||
{
|
||||
let range = term_dictionary
|
||||
.range()
|
||||
.ge([0u8])
|
||||
.lt([5u8])
|
||||
.backward()
|
||||
.into_stream()?;
|
||||
assert_eq!(value_list(range, true), vec![0u32, 1u32, 2u32, 3u32, 4u32]);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_automaton_search() -> crate::Result<()> {
|
||||
use crate::query::DFAWrapper;
|
||||
use levenshtein_automata::LevenshteinAutomatonBuilder;
|
||||
|
||||
const COUNTRIES: [&'static str; 7] = [
|
||||
"San Marino",
|
||||
"Serbia",
|
||||
"Slovakia",
|
||||
"Slovenia",
|
||||
"Spain",
|
||||
"Sweden",
|
||||
"Switzerland",
|
||||
];
|
||||
|
||||
let directory = RAMDirectory::create();
|
||||
let path = PathBuf::from("TermDictionary");
|
||||
{
|
||||
let write = directory.open_write(&path)?;
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::create(write)?;
|
||||
for term in COUNTRIES.iter() {
|
||||
term_dictionary_builder.insert(term.as_bytes(), &make_term_info(0u64))?;
|
||||
}
|
||||
term_dictionary_builder.finish()?.terminate()?;
|
||||
}
|
||||
let file = directory.open_read(&path)?;
|
||||
let term_dict: TermDictionary = TermDictionary::open(file)?;
|
||||
|
||||
// We can now build an entire dfa.
|
||||
let lev_automaton_builder = LevenshteinAutomatonBuilder::new(2, true);
|
||||
let automaton = DFAWrapper(lev_automaton_builder.build_dfa("Spaen"));
|
||||
|
||||
let mut range = term_dict.search(automaton).into_stream()?;
|
||||
|
||||
// get the first finding
|
||||
assert!(range.advance());
|
||||
assert_eq!("Spain".as_bytes(), range.key());
|
||||
assert!(!range.advance());
|
||||
Ok(())
|
||||
}
|
||||
Reference in New Issue
Block a user