mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-05-27 05:30:45 +00:00
Added unit test and bugfix
This commit is contained in:
@@ -14,7 +14,7 @@ impl<W: Write> CountingWriter<W> {
|
||||
written_bytes: 0,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
pub fn written_bytes(&self,) -> usize {
|
||||
self.written_bytes
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use fst::{self, IntoStreamer, Streamer};
|
||||
use fst::{IntoStreamer, Streamer};
|
||||
use fst::map::{StreamBuilder, Stream};
|
||||
use common::BinarySerializable;
|
||||
use super::TermDictionary;
|
||||
@@ -6,14 +6,14 @@ use super::TermDictionary;
|
||||
/// `TermStreamerBuilder` is an helper object used to define
|
||||
/// a range of terms that should be streamed.
|
||||
pub struct TermStreamerBuilder<'a, V>
|
||||
where V: 'a + BinarySerializable
|
||||
where V: 'a + BinarySerializable + Default
|
||||
{
|
||||
fst_map: &'a TermDictionary<V>,
|
||||
stream_builder: StreamBuilder<'a>,
|
||||
}
|
||||
|
||||
impl<'a, V> TermStreamerBuilder<'a, V>
|
||||
where V: 'a + BinarySerializable
|
||||
where V: 'a + BinarySerializable + Default
|
||||
{
|
||||
/// Limit the range to terms greater or equal to the bound
|
||||
pub fn ge<T: AsRef<[u8]>>(mut self, bound: T) -> Self {
|
||||
@@ -45,12 +45,12 @@ impl<'a, V> TermStreamerBuilder<'a, V>
|
||||
TermStreamer {
|
||||
fst_map: self.fst_map,
|
||||
stream: self.stream_builder.into_stream(),
|
||||
buffer: Vec::with_capacity(100),
|
||||
offset: 0u64,
|
||||
current_key: Vec::with_capacity(100),
|
||||
current_value: V::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Crates a new `TermStreamBuilder`
|
||||
|
||||
pub(crate) fn new(fst_map: &'a TermDictionary<V>,
|
||||
stream_builder: StreamBuilder<'a>)
|
||||
-> TermStreamerBuilder<'a, V> {
|
||||
@@ -67,41 +67,31 @@ impl<'a, V> TermStreamerBuilder<'a, V>
|
||||
/// `TermStreamer` acts as a cursor over a range of terms of a segment.
|
||||
/// Terms are guaranteed to be sorted.
|
||||
pub struct TermStreamer<'a, V>
|
||||
where V: 'a + BinarySerializable
|
||||
where V: 'a + BinarySerializable + Default
|
||||
{
|
||||
fst_map: &'a TermDictionary<V>,
|
||||
stream: Stream<'a>,
|
||||
offset: u64,
|
||||
buffer: Vec<u8>,
|
||||
current_key: Vec<u8>,
|
||||
current_value: V,
|
||||
}
|
||||
|
||||
|
||||
impl<'a, 'b, V> fst::Streamer<'b> for TermStreamer<'a, V>
|
||||
where V: 'b + BinarySerializable
|
||||
{
|
||||
type Item = (&'b [u8], V);
|
||||
|
||||
fn next(&'b mut self) -> Option<(&'b [u8], V)> {
|
||||
if self.advance() {
|
||||
let v = self.value();
|
||||
Some((&self.buffer, v))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, V> TermStreamer<'a, V>
|
||||
where V: 'a + BinarySerializable
|
||||
where V: 'a + BinarySerializable + Default
|
||||
{
|
||||
/// Advance position the stream on the next item.
|
||||
/// Before the first call to `.advance()`, the stream
|
||||
/// is an unitialized state.
|
||||
pub fn advance(&mut self) -> bool {
|
||||
if let Some((term, offset)) = self.stream.next() {
|
||||
self.buffer.clear();
|
||||
self.buffer.extend_from_slice(term);
|
||||
self.current_key.clear();
|
||||
self.current_key.extend_from_slice(term);
|
||||
self.offset = offset;
|
||||
self.current_value = self.fst_map
|
||||
.read_value(self.offset)
|
||||
.expect("Fst data is corrupted. Failed to deserialize a value.");
|
||||
true
|
||||
} else {
|
||||
false
|
||||
@@ -119,24 +109,19 @@ impl<'a, V> TermStreamer<'a, V>
|
||||
///
|
||||
/// Before any call to `.next()`, `.key()` returns an empty array.
|
||||
pub fn key(&self) -> &[u8] {
|
||||
&self.buffer
|
||||
&self.current_key
|
||||
}
|
||||
|
||||
/// Accesses the current value.
|
||||
///
|
||||
/// Values are accessed in a lazy manner, their data is fetched
|
||||
/// and deserialized only at the moment of the call to `.value()`.
|
||||
///
|
||||
///
|
||||
/// Calling `.value()` after the end of the stream will return the
|
||||
/// last `.value()` encounterred.
|
||||
///
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Calling `.value()` before the first call to `.advance()` or `.next()`
|
||||
/// is undefined behavior.
|
||||
pub fn value(&self) -> V {
|
||||
self.fst_map
|
||||
.read_value(self.offset)
|
||||
.expect("Fst data is corrupted. Failed to deserialize a value.")
|
||||
/// Calling `.value()` before the first call to `.advance()` returns
|
||||
/// `V::default()`.
|
||||
pub fn value(&self) -> &V {
|
||||
&self.current_value
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
use std::io::{self, Write};
|
||||
use fst;
|
||||
use fst::raw::Fst;
|
||||
use super::{TermStreamerBuilder, TermStreamer};
|
||||
use super::TermStreamerBuilder;
|
||||
use directory::ReadOnlySource;
|
||||
use common::BinarySerializable;
|
||||
use std::marker::PhantomData;
|
||||
use schema::{Field, Term};
|
||||
use postings::TermInfo;
|
||||
|
||||
|
||||
@@ -17,15 +16,16 @@ fn convert_fst_error(e: fst::Error) -> io::Error {
|
||||
/// Builder for the new term dictionary.
|
||||
///
|
||||
/// Just like for the fst crate, all terms must be inserted in order.
|
||||
pub struct TermDictionaryBuilder<W: Write, V = TermInfo>
|
||||
where V: BinarySerializable
|
||||
pub struct TermDictionaryBuilder<W, V = TermInfo>
|
||||
where W: Write, V: BinarySerializable + Default
|
||||
{
|
||||
fst_builder: fst::MapBuilder<W>,
|
||||
data: Vec<u8>,
|
||||
_phantom_: PhantomData<V>,
|
||||
}
|
||||
|
||||
impl<W: Write, V: BinarySerializable> TermDictionaryBuilder<W, V> {
|
||||
impl<W, V> TermDictionaryBuilder<W, V>
|
||||
where W: Write, V: BinarySerializable + Default {
|
||||
/// Creates a new `TermDictionaryBuilder`
|
||||
pub fn new(w: W) -> io::Result<TermDictionaryBuilder<W, V>> {
|
||||
let fst_builder = fst::MapBuilder::new(w).map_err(convert_fst_error)?;
|
||||
@@ -81,14 +81,6 @@ impl<W: Write, V: BinarySerializable> TermDictionaryBuilder<W, V> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Datastructure to access the `terms` of a segment.
|
||||
pub struct TermDictionary<V = TermInfo>
|
||||
where V: BinarySerializable
|
||||
{
|
||||
fst_index: fst::Map,
|
||||
values_mmap: ReadOnlySource,
|
||||
_phantom_: PhantomData<V>,
|
||||
}
|
||||
|
||||
fn open_fst_index(source: ReadOnlySource) -> io::Result<fst::Map> {
|
||||
let fst = match source {
|
||||
@@ -103,8 +95,17 @@ fn open_fst_index(source: ReadOnlySource) -> io::Result<fst::Map> {
|
||||
Ok(fst::Map::from(fst))
|
||||
}
|
||||
|
||||
/// Datastructure to access the `terms` of a segment.
|
||||
pub struct TermDictionary<V = TermInfo>
|
||||
where V: BinarySerializable + Default
|
||||
{
|
||||
fst_index: fst::Map,
|
||||
values_mmap: ReadOnlySource,
|
||||
_phantom_: PhantomData<V>,
|
||||
}
|
||||
|
||||
impl<V> TermDictionary<V>
|
||||
where V: BinarySerializable
|
||||
where V: BinarySerializable + Default
|
||||
{
|
||||
/// Opens a `TermDictionary` given a data source.
|
||||
pub fn from_source(source: ReadOnlySource) -> io::Result<TermDictionary<V>> {
|
||||
@@ -140,21 +141,6 @@ impl<V> TermDictionary<V>
|
||||
})
|
||||
}
|
||||
|
||||
/// A stream of all the sorted terms. [See also `.stream_field()`](#method.stream_field)
|
||||
pub fn stream(&self) -> TermStreamer<V> {
|
||||
self.range().into_stream()
|
||||
}
|
||||
|
||||
/// A stream of all the sorted terms in the given field.
|
||||
pub fn stream_field(&self, field: Field) -> TermStreamer<V> {
|
||||
let start_term = Term::from_field_text(field, "");
|
||||
let stop_term = Term::from_field_text(Field(field.0 + 1), "");
|
||||
self.range()
|
||||
.ge(start_term.as_slice())
|
||||
.lt(stop_term.as_slice())
|
||||
.into_stream()
|
||||
}
|
||||
|
||||
/// Returns a range builder, to stream all of the terms
|
||||
/// within an interval.
|
||||
pub fn range(&self) -> TermStreamerBuilder<V> {
|
||||
|
||||
@@ -1,30 +1,30 @@
|
||||
use std::collections::BinaryHeap;
|
||||
use core::SegmentReader;
|
||||
use super::TermStreamer;
|
||||
use termdict::TermStreamer;
|
||||
use common::BinarySerializable;
|
||||
use postings::TermInfo;
|
||||
use std::cmp::Ordering;
|
||||
use fst::Streamer;
|
||||
|
||||
pub struct HeapItem<'a, V>
|
||||
where V: 'a + BinarySerializable
|
||||
where V: 'a + BinarySerializable + Default
|
||||
{
|
||||
pub streamer: TermStreamer<'a, V>,
|
||||
pub segment_ord: usize,
|
||||
}
|
||||
|
||||
impl<'a, V> PartialEq for HeapItem<'a, V>
|
||||
where V: 'a + BinarySerializable
|
||||
where V: 'a + BinarySerializable + Default
|
||||
{
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.segment_ord == other.segment_ord
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, V> Eq for HeapItem<'a, V> where V: 'a + BinarySerializable {}
|
||||
impl<'a, V> Eq for HeapItem<'a, V> where V: 'a + BinarySerializable + Default {}
|
||||
|
||||
impl<'a, V> PartialOrd for HeapItem<'a, V>
|
||||
where V: 'a + BinarySerializable
|
||||
where V: 'a + BinarySerializable + Default
|
||||
{
|
||||
fn partial_cmp(&self, other: &HeapItem<'a, V>) -> Option<Ordering> {
|
||||
Some(self.cmp(other))
|
||||
@@ -32,7 +32,7 @@ impl<'a, V> PartialOrd for HeapItem<'a, V>
|
||||
}
|
||||
|
||||
impl<'a, V> Ord for HeapItem<'a, V>
|
||||
where V: 'a + BinarySerializable
|
||||
where V: 'a + BinarySerializable + Default
|
||||
{
|
||||
fn cmp(&self, other: &HeapItem<'a, V>) -> Ordering {
|
||||
(&other.streamer.key(), &other.segment_ord).cmp(&(&self.streamer.key(), &self.segment_ord))
|
||||
@@ -47,14 +47,14 @@ impl<'a, V> Ord for HeapItem<'a, V>
|
||||
/// - a slice with the ordinal of the segments containing
|
||||
/// the terms.
|
||||
pub struct TermMerger<'a, V>
|
||||
where V: 'a + BinarySerializable
|
||||
where V: 'a + BinarySerializable + Default
|
||||
{
|
||||
heap: BinaryHeap<HeapItem<'a, V>>,
|
||||
current_streamers: Vec<HeapItem<'a, V>>,
|
||||
}
|
||||
|
||||
impl<'a, V> TermMerger<'a, V>
|
||||
where V: 'a + BinarySerializable
|
||||
where V: 'a + BinarySerializable + Default
|
||||
{
|
||||
fn new(streams: Vec<TermStreamer<'a, V>>) -> TermMerger<'a, V> {
|
||||
TermMerger {
|
||||
@@ -131,7 +131,6 @@ impl<'a, V> TermMerger<'a, V>
|
||||
|
||||
|
||||
impl<'a> From<&'a [SegmentReader]> for TermMerger<'a, TermInfo>
|
||||
where TermInfo: BinarySerializable
|
||||
{
|
||||
fn from(segment_readers: &'a [SegmentReader]) -> TermMerger<'a, TermInfo> {
|
||||
TermMerger::new(segment_readers
|
||||
@@ -142,7 +141,7 @@ impl<'a> From<&'a [SegmentReader]> for TermMerger<'a, TermInfo>
|
||||
}
|
||||
|
||||
impl<'a, V> Streamer<'a> for TermMerger<'a, V>
|
||||
where V: BinarySerializable
|
||||
where V: BinarySerializable + Default
|
||||
{
|
||||
type Item = &'a [u8];
|
||||
|
||||
|
||||
@@ -13,6 +13,10 @@ deserializing the value at this address.
|
||||
Keys (`&[u8]`) in this datastructure are sorted.
|
||||
*/
|
||||
|
||||
use schema::{Field, Term};
|
||||
use common::BinarySerializable;
|
||||
use fst;
|
||||
|
||||
#[cfg(not(feature="streamdict"))]
|
||||
mod fstdict;
|
||||
#[cfg(not(feature="streamdict"))]
|
||||
@@ -21,15 +25,48 @@ pub use self::fstdict::{TermDictionary, TermDictionaryBuilder, TermStreamer, Ter
|
||||
#[cfg(feature="streamdict")]
|
||||
mod streamdict;
|
||||
#[cfg(feature="streamdict")]
|
||||
pub use self::termdict::{TermDictionary, TermDictionaryBuilder, TermStreamer, TermStreamerBuilder};
|
||||
pub use self::streamdict::{TermDictionary, TermDictionaryBuilder, TermStreamer, TermStreamerBuilder};
|
||||
|
||||
mod merger;
|
||||
pub use self::merger::TermMerger;
|
||||
|
||||
impl<V> TermDictionary<V>
|
||||
where V: BinarySerializable + Default {
|
||||
|
||||
/// A stream of all the sorted terms. [See also `.stream_field()`](#method.stream_field)
|
||||
pub fn stream(&self) -> TermStreamer<V> {
|
||||
self.range().into_stream()
|
||||
}
|
||||
|
||||
/// A stream of all the sorted terms in the given field.
|
||||
pub fn stream_field(&self, field: Field) -> TermStreamer<V> {
|
||||
let start_term = Term::from_field_text(field, "");
|
||||
let stop_term = Term::from_field_text(Field(field.0 + 1), "");
|
||||
self.range()
|
||||
.ge(start_term.as_slice())
|
||||
.lt(stop_term.as_slice())
|
||||
.into_stream()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
impl<'a, 'b, V: 'b> fst::Streamer<'b> for TermStreamer<'a, V>
|
||||
where V: 'a + BinarySerializable + Default
|
||||
{
|
||||
type Item = (&'b [u8], &'b V);
|
||||
|
||||
fn next(&'b mut self) -> Option<(&'b [u8], &V)> {
|
||||
if self.advance() {
|
||||
Some((self.key(), self.value()))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{TermDictionary, TermDictionaryBuilder};
|
||||
use super::{TermDictionary, TermDictionaryBuilder, TermStreamer};
|
||||
use directory::{RAMDirectory, Directory, ReadOnlySource};
|
||||
use std::path::PathBuf;
|
||||
use fst::Streamer;
|
||||
@@ -39,7 +76,6 @@ mod tests {
|
||||
const BLOCK_SIZE: usize = 1_500;
|
||||
|
||||
|
||||
|
||||
#[test]
|
||||
fn test_term_dictionary() {
|
||||
let mut directory = RAMDirectory::create();
|
||||
@@ -60,12 +96,12 @@ mod tests {
|
||||
assert_eq!(term_dict.get("abc"), Some(34u32));
|
||||
assert_eq!(term_dict.get("abcd"), Some(346u32));
|
||||
let mut stream = term_dict.stream();
|
||||
assert_eq!(stream.next().unwrap(), ("abc".as_bytes(), 34u32));
|
||||
assert_eq!(stream.next().unwrap(), ("abc".as_bytes(), &34u32));
|
||||
assert_eq!(stream.key(), "abc".as_bytes());
|
||||
assert_eq!(stream.value(), 34u32);
|
||||
assert_eq!(stream.next().unwrap(), ("abcd".as_bytes(), 346u32));
|
||||
assert_eq!(*stream.value(), 34u32);
|
||||
assert_eq!(stream.next().unwrap(), ("abcd".as_bytes(), &346u32));
|
||||
assert_eq!(stream.key(), "abcd".as_bytes());
|
||||
assert_eq!(stream.value(), 346u32);
|
||||
assert_eq!(*stream.value(), 346u32);
|
||||
assert!(!stream.advance());
|
||||
}
|
||||
|
||||
@@ -117,56 +153,54 @@ mod tests {
|
||||
|
||||
|
||||
#[test]
|
||||
fn test_stream_dictionary() {
|
||||
fn test_term_dictionary_stream() {
|
||||
let ids: Vec<_> = (0u32..10_000u32)
|
||||
.map(|i| (format!("doc{:0>6}", i), i))
|
||||
.collect();
|
||||
let buffer: Vec<u8> = {
|
||||
let mut stream_dictionary_builder = TermDictionaryBuilder::new(vec!()).unwrap();
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::new(vec!()).unwrap();
|
||||
for &(ref id, ref i) in &ids {
|
||||
stream_dictionary_builder.insert(id.as_bytes(), i).unwrap();
|
||||
term_dictionary_builder.insert(id.as_bytes(), i).unwrap();
|
||||
}
|
||||
stream_dictionary_builder.finish().unwrap()
|
||||
term_dictionary_builder.finish().unwrap()
|
||||
};
|
||||
let source = ReadOnlySource::from(buffer);
|
||||
let stream_dictionary: TermDictionary<u32> = TermDictionary::from_source(source).unwrap();
|
||||
let term_dictionary: TermDictionary<u32> = TermDictionary::from_source(source).unwrap();
|
||||
{
|
||||
let mut streamer = stream_dictionary.stream();
|
||||
let mut streamer = term_dictionary.stream();
|
||||
let mut i = 0;
|
||||
while let Some((streamer_k, streamer_v)) = streamer.next() {
|
||||
let &(ref key, ref v) = &ids[i];
|
||||
assert_eq!(streamer_k, key.as_bytes());
|
||||
assert_eq!(streamer_v, *v);
|
||||
assert_eq!(streamer_v, v);
|
||||
i += 1;
|
||||
}
|
||||
}
|
||||
|
||||
let &(ref key, ref _v) = &ids[2047];
|
||||
stream_dictionary.get(key.as_bytes());
|
||||
term_dictionary.get(key.as_bytes());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_stream_range() {
|
||||
let ids: Vec<_> = (0u32..10_000u32)
|
||||
let ids: Vec<_> = (0u32..50_000u32)
|
||||
.map(|i| (format!("doc{:0>6}", i), i))
|
||||
.collect();
|
||||
let buffer: Vec<u8> = {
|
||||
let mut stream_dictionary_builder = TermDictionaryBuilder::new(vec!()).unwrap();
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::new(vec!()).unwrap();
|
||||
for &(ref id, ref i) in &ids {
|
||||
stream_dictionary_builder.insert(id.as_bytes(), i).unwrap();
|
||||
term_dictionary_builder.insert(id.as_bytes(), i).unwrap();
|
||||
}
|
||||
stream_dictionary_builder.finish().unwrap()
|
||||
term_dictionary_builder.finish().unwrap()
|
||||
};
|
||||
|
||||
println!("a");
|
||||
let source = ReadOnlySource::from(buffer);
|
||||
|
||||
let stream_dictionary: TermDictionary<u32> = TermDictionary::from_source(source).unwrap();
|
||||
let term_dictionary: TermDictionary<u32> = TermDictionary::from_source(source).unwrap();
|
||||
{
|
||||
for i in (0..20).chain(2_000 - 10..2_000 + 10) {
|
||||
println!("i {}", i);
|
||||
for i in (0..20).chain(6000..8_000) {
|
||||
let &(ref target_key, _) = &ids[i];
|
||||
let mut streamer = stream_dictionary
|
||||
let mut streamer = term_dictionary
|
||||
.range()
|
||||
.ge(target_key.as_bytes())
|
||||
.into_stream();
|
||||
@@ -174,7 +208,7 @@ mod tests {
|
||||
let (streamer_k, streamer_v) = streamer.next().unwrap();
|
||||
let &(ref key, ref v) = &ids[i + j];
|
||||
assert_eq!(str::from_utf8(streamer_k).unwrap(), key);
|
||||
assert_eq!(streamer_v, *v);
|
||||
assert_eq!(streamer_v, v);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -182,7 +216,7 @@ mod tests {
|
||||
{
|
||||
for i in (0..20).chain((BLOCK_SIZE - 10..BLOCK_SIZE + 10)) {
|
||||
let &(ref target_key, _) = &ids[i];
|
||||
let mut streamer = stream_dictionary
|
||||
let mut streamer = term_dictionary
|
||||
.range()
|
||||
.gt(target_key.as_bytes())
|
||||
.into_stream();
|
||||
@@ -190,31 +224,89 @@ mod tests {
|
||||
let (streamer_k, streamer_v) = streamer.next().unwrap();
|
||||
let &(ref key, ref v) = &ids[i + j + 1];
|
||||
assert_eq!(streamer_k, key.as_bytes());
|
||||
assert_eq!(streamer_v, *v);
|
||||
assert_eq!(streamer_v, v);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
for i in (0..20).chain((BLOCK_SIZE - 10..BLOCK_SIZE + 10)) {
|
||||
println!("i2:{}", i);
|
||||
for j in 0..3 {
|
||||
println!("j2:{}", j);
|
||||
println!("i {} j {}", i, j);
|
||||
let &(ref fst_key, _) = &ids[i];
|
||||
let &(ref last_key, _) = &ids[i + 3];
|
||||
let mut streamer = stream_dictionary
|
||||
let &(ref last_key, _) = &ids[i + j];
|
||||
let mut streamer = term_dictionary
|
||||
.range()
|
||||
.ge(fst_key.as_bytes())
|
||||
.lt(last_key.as_bytes())
|
||||
.into_stream();
|
||||
for _ in 0..j {
|
||||
println!("ij");
|
||||
assert!(streamer.next().is_some());
|
||||
}
|
||||
assert!(streamer.next().is_some());
|
||||
assert!(streamer.next().is_none());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[test]
|
||||
fn test_stream_range_boundaries() {
|
||||
let buffer: Vec<u8> = {
|
||||
let mut term_dictionary_builder = TermDictionaryBuilder::new(vec!()).unwrap();
|
||||
for i in 0u8..10u8 {
|
||||
let number_arr = [i; 1];
|
||||
term_dictionary_builder.insert(&number_arr, &i).unwrap();
|
||||
}
|
||||
term_dictionary_builder.finish().unwrap()
|
||||
};
|
||||
let source = ReadOnlySource::from(buffer);
|
||||
let term_dictionary: TermDictionary<u8> = TermDictionary::from_source(source).unwrap();
|
||||
|
||||
let value_list = |mut streamer: TermStreamer<u8>| {
|
||||
let mut res: Vec<u8> = vec!();
|
||||
while let Some((_, &v)) = streamer.next() {
|
||||
res.push(v);
|
||||
}
|
||||
res
|
||||
};
|
||||
{
|
||||
let range = term_dictionary
|
||||
.range()
|
||||
.ge([2u8])
|
||||
.into_stream();
|
||||
assert_eq!(value_list(range), vec!(2u8, 3u8, 4u8, 5u8, 6u8, 7u8, 8u8, 9u8));
|
||||
}
|
||||
{
|
||||
let range = term_dictionary
|
||||
.range()
|
||||
.gt([2u8])
|
||||
.into_stream();
|
||||
assert_eq!(value_list(range), vec!(3u8, 4u8, 5u8, 6u8, 7u8, 8u8, 9u8));
|
||||
}
|
||||
{
|
||||
let range = term_dictionary
|
||||
.range()
|
||||
.lt([6u8])
|
||||
.into_stream();
|
||||
assert_eq!(value_list(range), vec!(0u8, 1u8, 2u8, 3u8, 4u8, 5u8));
|
||||
}
|
||||
{
|
||||
let range = term_dictionary
|
||||
.range()
|
||||
.le([6u8])
|
||||
.into_stream();
|
||||
assert_eq!(value_list(range), vec!(0u8, 1u8, 2u8, 3u8, 4u8, 5u8, 6u8));
|
||||
}
|
||||
{
|
||||
let range = term_dictionary
|
||||
.range()
|
||||
.ge([0u8])
|
||||
.lt([5u8])
|
||||
.into_stream();
|
||||
assert_eq!(value_list(range), vec!(0u8, 1u8, 2u8, 3u8, 4u8));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,258 +1,40 @@
|
||||
#![allow(should_implement_trait)]
|
||||
|
||||
use std::cmp::max;
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
use std::io::Read;
|
||||
use fst;
|
||||
use fst::raw::Fst;
|
||||
use common::VInt;
|
||||
use directory::ReadOnlySource;
|
||||
use common::BinarySerializable;
|
||||
use std::marker::PhantomData;
|
||||
use common::CountingWriter;
|
||||
use std::cmp::Ordering;
|
||||
use fst::{IntoStreamer, Streamer};
|
||||
use std::str;
|
||||
use fst::raw::Node;
|
||||
use fst::raw::CompiledAddr;
|
||||
use super::TermDictionary;
|
||||
use fst::Streamer;
|
||||
|
||||
const BLOCK_SIZE: usize = 1024;
|
||||
|
||||
fn convert_fst_error(e: fst::Error) -> io::Error {
|
||||
io::Error::new(io::ErrorKind::Other, e)
|
||||
}
|
||||
|
||||
pub struct StreamDictionaryBuilder<W: Write, V: BinarySerializable + Clone + Default> {
|
||||
write: CountingWriter<W>,
|
||||
block_index: fst::MapBuilder<Vec<u8>>,
|
||||
last_key: Vec<u8>,
|
||||
len: usize,
|
||||
_phantom_: PhantomData<V>,
|
||||
}
|
||||
|
||||
fn common_prefix_length(left: &[u8], right: &[u8]) -> usize {
|
||||
left.iter().cloned()
|
||||
.zip(right.iter().cloned())
|
||||
.take_while(|&(b1, b2)| b1 == b2)
|
||||
.count()
|
||||
}
|
||||
|
||||
fn fill_last<'a>(fst: &'a Fst, mut node: Node<'a>, buffer: &mut Vec<u8>) {
|
||||
loop {
|
||||
if let Some(transition) = node.transitions().last() {
|
||||
buffer.push(transition.inp);
|
||||
node = fst.node(transition.addr);
|
||||
}
|
||||
else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
fn strictly_previous_key<B: AsRef<[u8]>>(fst_map: &fst::Map, key_as_ref: B) -> (Vec<u8>, u64) {
|
||||
let key = key_as_ref.as_ref();
|
||||
let fst = fst_map.as_fst();
|
||||
let mut node = fst.root();
|
||||
let mut node_stack: Vec<Node> = vec!(node.clone());
|
||||
|
||||
// first check the longest prefix.
|
||||
for &b in &key[..key.len() - 1] {
|
||||
node = match node.find_input(b) {
|
||||
None => {
|
||||
break;
|
||||
},
|
||||
Some(i) => {
|
||||
fst.node(node.transition_addr(i))
|
||||
},
|
||||
};
|
||||
node_stack.push(node);
|
||||
}
|
||||
|
||||
let len_node_stack = node_stack.len();
|
||||
for i in (1..len_node_stack).rev() {
|
||||
let cur_node = &node_stack[i];
|
||||
let b: u8 = key[i];
|
||||
let last_transition_opt = cur_node
|
||||
.transitions()
|
||||
.take_while(|transition| transition.inp < b)
|
||||
.last();
|
||||
|
||||
if let Some(last_transition) = last_transition_opt {
|
||||
let mut result_buffer = Vec::from(&key[..i]);
|
||||
result_buffer.push(last_transition.inp);
|
||||
let mut result = Vec::from(&key[..i]);
|
||||
result.push(last_transition.inp);
|
||||
let fork_node = fst.node(last_transition.addr);
|
||||
fill_last(fst, fork_node, &mut result);
|
||||
let val = fst_map.get(&result).unwrap();
|
||||
return (result, val);
|
||||
}
|
||||
else if cur_node.is_final() {
|
||||
// the previous key is a prefix
|
||||
let result_buffer = Vec::from(&key[..i]);
|
||||
let val = fst_map.get(&result_buffer).unwrap();
|
||||
return (result_buffer, val);
|
||||
}
|
||||
}
|
||||
|
||||
return (vec!(), 0);
|
||||
}
|
||||
|
||||
|
||||
impl<W: Write, V: BinarySerializable + Clone + Default> StreamDictionaryBuilder<W, V> {
|
||||
|
||||
pub fn new(write: W) -> io::Result<StreamDictionaryBuilder<W, V>> {
|
||||
let buffer: Vec<u8> = vec!();
|
||||
Ok(StreamDictionaryBuilder {
|
||||
write: CountingWriter::wrap(write),
|
||||
block_index: fst::MapBuilder::new(buffer)
|
||||
.expect("This cannot fail"),
|
||||
last_key: Vec::with_capacity(128),
|
||||
len: 0,
|
||||
_phantom_: PhantomData,
|
||||
})
|
||||
}
|
||||
|
||||
fn add_index_entry(&mut self) {
|
||||
self.block_index.insert(&self.last_key, self.write.written_bytes() as u64).unwrap();
|
||||
}
|
||||
|
||||
pub fn insert(&mut self, key: &[u8], value: &V) -> io::Result<()>{
|
||||
self.insert_key(key)?;
|
||||
self.insert_value(value)
|
||||
}
|
||||
|
||||
pub fn insert_key(&mut self, key: &[u8]) -> io::Result<()>{
|
||||
if self.len % BLOCK_SIZE == 0 {
|
||||
self.add_index_entry();
|
||||
}
|
||||
self.len += 1;
|
||||
let common_len = common_prefix_length(key, &self.last_key);
|
||||
VInt(common_len as u64).serialize(&mut self.write)?;
|
||||
self.last_key.truncate(common_len);
|
||||
self.last_key.extend_from_slice(&key[common_len..]);
|
||||
VInt((key.len() - common_len) as u64).serialize(&mut self.write)?;
|
||||
self.write.write_all(&key[common_len..])?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn insert_value(&mut self, value: &V) -> io::Result<()>{
|
||||
value.serialize(&mut self.write)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn finish(mut self) -> io::Result<W> {
|
||||
self.add_index_entry();
|
||||
let (mut w, split_len) = self.write.finish()?;
|
||||
let fst_write = self.block_index
|
||||
.into_inner()
|
||||
.map_err(convert_fst_error)?;
|
||||
w.write(&fst_write)?;
|
||||
(split_len as u64).serialize(&mut w)?;
|
||||
w.flush()?;
|
||||
Ok(w)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
fn stream_before<'a, V: 'a + Clone + Default + BinarySerializable>(stream_dictionary: &'a StreamDictionary<V>, target_key: &[u8]) -> StreamDictionaryStreamer<'a, V> {
|
||||
let (prev_key, offset) = strictly_previous_key(&stream_dictionary.fst_index, target_key.as_ref());
|
||||
pub(crate) fn stream_before<'a, V>(term_dictionary: &'a TermDictionary<V>, target_key: &[u8]) -> TermStreamer<'a, V>
|
||||
where V: 'a + BinarySerializable + Default {
|
||||
let (prev_key, offset) = term_dictionary.strictly_previous_key(target_key.as_ref());
|
||||
let offset: usize = offset as usize;
|
||||
StreamDictionaryStreamer {
|
||||
cursor: &stream_dictionary.stream_data.as_slice()[offset..],
|
||||
TermStreamer {
|
||||
cursor: &term_dictionary.stream_data()[offset..],
|
||||
current_key: Vec::from(prev_key),
|
||||
current_value: V::default(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
pub struct StreamDictionary<V> where V:BinarySerializable + Default + Clone {
|
||||
stream_data: ReadOnlySource,
|
||||
fst_index: fst::Map,
|
||||
_phantom_: PhantomData<V>,
|
||||
}
|
||||
|
||||
fn open_fst_index(source: ReadOnlySource) -> io::Result<fst::Map> {
|
||||
Ok(fst::Map::from(match source {
|
||||
ReadOnlySource::Anonymous(data) => try!(Fst::from_shared_bytes(data.data, data.start, data.len).map_err(convert_fst_error)),
|
||||
ReadOnlySource::Mmap(mmap_readonly) => try!(Fst::from_mmap(mmap_readonly).map_err(convert_fst_error)),
|
||||
}))
|
||||
}
|
||||
|
||||
|
||||
impl<V: BinarySerializable + Clone + Default> StreamDictionary<V> {
|
||||
|
||||
pub fn from_source(source: ReadOnlySource) -> io::Result<StreamDictionary<V>> {
|
||||
let total_len = source.len();
|
||||
let length_offset = total_len - 8;
|
||||
let split_len: usize = {
|
||||
let mut split_len_buffer: &[u8] = &source.as_slice()[length_offset..];
|
||||
u64::deserialize(&mut split_len_buffer)? as usize
|
||||
};
|
||||
let stream_data = source.slice(0, split_len);
|
||||
let fst_data = source.slice(split_len, length_offset);
|
||||
let fst_index = open_fst_index(fst_data)?;
|
||||
|
||||
Ok(StreamDictionary {
|
||||
stream_data: stream_data,
|
||||
fst_index: fst_index,
|
||||
_phantom_: PhantomData
|
||||
})
|
||||
}
|
||||
|
||||
pub fn get<K: AsRef<[u8]>>(&self, target_key: K) -> Option<V> {
|
||||
let mut streamer = stream_before(self, target_key.as_ref());
|
||||
while let Some((iter_key, iter_val)) = streamer.next() {
|
||||
match iter_key.cmp(target_key.as_ref()) {
|
||||
Ordering::Less => {}
|
||||
Ordering::Equal => {
|
||||
let val: V = (*iter_val).clone();
|
||||
return Some(val);
|
||||
}
|
||||
Ordering::Greater => {
|
||||
return None;
|
||||
}
|
||||
}
|
||||
}
|
||||
return None;
|
||||
}
|
||||
|
||||
pub fn range(&self) -> StreamDictionaryStreamerBuilder<V> {
|
||||
let data: &[u8] = &self.stream_data;
|
||||
StreamDictionaryStreamerBuilder {
|
||||
stream_dictionary: &self,
|
||||
offset_from: 0,
|
||||
offset_to: (data.as_ptr() as usize) + data.len(),
|
||||
current_key: vec!(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn stream(&self) -> StreamDictionaryStreamer<V> {
|
||||
StreamDictionaryStreamer {
|
||||
cursor: &*self.stream_data,
|
||||
current_key: Vec::with_capacity(128),
|
||||
current_value: V::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct StreamDictionaryStreamerBuilder<'a, V: 'a + BinarySerializable + Clone + Default> {
|
||||
stream_dictionary: &'a StreamDictionary<V>,
|
||||
/// `TermStreamerBuilder` is an helper object used to define
|
||||
/// a range of terms that should be streamed.
|
||||
pub struct TermStreamerBuilder<'a, V>
|
||||
where V: 'a + BinarySerializable + Default {
|
||||
term_dictionary: &'a TermDictionary<V>,
|
||||
origin: usize,
|
||||
offset_from: usize,
|
||||
offset_to: usize,
|
||||
current_key: Vec<u8>,
|
||||
}
|
||||
|
||||
|
||||
/// Returns offset information for the first
|
||||
/// key in the stream matching a given predicate.
|
||||
///
|
||||
/// returns (start offset, the data required to load the value)
|
||||
fn get_offset<'a, V, P: Fn(&[u8])->bool>(predicate: P, mut streamer: StreamDictionaryStreamer<V>) -> (usize, Vec<u8>)
|
||||
where V: 'a + BinarySerializable + Clone + Default {
|
||||
fn get_offset<'a, V, P: Fn(&[u8])->bool>(predicate: P, mut streamer: TermStreamer<V>) -> (usize, Vec<u8>)
|
||||
where V: 'a + BinarySerializable + Default {
|
||||
let mut prev: &[u8] = streamer.cursor;
|
||||
|
||||
let mut prev_data: Vec<u8> = streamer.current_key.clone();
|
||||
@@ -268,51 +50,69 @@ fn get_offset<'a, V, P: Fn(&[u8])->bool>(predicate: P, mut streamer: StreamDicti
|
||||
return (prev.as_ptr() as usize, prev_data);
|
||||
}
|
||||
|
||||
impl<'a, V: 'a + BinarySerializable + Clone + Default> StreamDictionaryStreamerBuilder<'a, V> {
|
||||
pub fn ge<T: AsRef<[u8]>>(mut self, bound: T) -> StreamDictionaryStreamerBuilder<'a, V> {
|
||||
impl<'a, V> TermStreamerBuilder<'a, V>
|
||||
where V: 'a + BinarySerializable + Default {
|
||||
|
||||
/// Limit the range to terms greater or equal to the bound
|
||||
pub fn ge<T: AsRef<[u8]>>(mut self, bound: T) -> TermStreamerBuilder<'a, V> {
|
||||
let target_key = bound.as_ref();
|
||||
let streamer = stream_before(&self.stream_dictionary, target_key.as_ref());
|
||||
let streamer = stream_before(&self.term_dictionary, target_key.as_ref());
|
||||
let smaller_than = |k: &[u8]| { k.lt(target_key) };
|
||||
let (offset_before, current_key) = get_offset(smaller_than, streamer);
|
||||
self.current_key = current_key;
|
||||
self.offset_from = offset_before;
|
||||
self.offset_from = offset_before - self.origin;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn gt<T: AsRef<[u8]>>(mut self, bound: T) -> StreamDictionaryStreamerBuilder<'a, V> {
|
||||
/// Limit the range to terms strictly greater than the bound
|
||||
pub fn gt<T: AsRef<[u8]>>(mut self, bound: T) -> TermStreamerBuilder<'a, V> {
|
||||
let target_key = bound.as_ref();
|
||||
let streamer = stream_before(self.stream_dictionary, target_key.as_ref());
|
||||
let streamer = stream_before(self.term_dictionary, target_key.as_ref());
|
||||
let smaller_than = |k: &[u8]| { k.le(target_key) };
|
||||
let (offset_before, current_key) = get_offset(smaller_than, streamer);
|
||||
self.current_key = current_key;
|
||||
self.offset_from = offset_before;
|
||||
self.offset_from = offset_before - self.origin;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn lt<T: AsRef<[u8]>>(mut self, bound: T) -> StreamDictionaryStreamerBuilder<'a, V> {
|
||||
/// Limit the range to terms lesser or equal to the bound
|
||||
pub fn lt<T: AsRef<[u8]>>(mut self, bound: T) -> TermStreamerBuilder<'a, V> {
|
||||
let target_key = bound.as_ref();
|
||||
let streamer = stream_before(self.stream_dictionary, target_key.as_ref());
|
||||
let smaller_than = |k: &[u8]| { k.le(target_key) };
|
||||
let (offset_before, _) = get_offset(smaller_than, streamer);
|
||||
self.offset_to = offset_before;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn le<T: AsRef<[u8]>>(mut self, bound: T) -> StreamDictionaryStreamerBuilder<'a, V> {
|
||||
let target_key = bound.as_ref();
|
||||
let streamer = stream_before(self.stream_dictionary, target_key.as_ref());
|
||||
let streamer = stream_before(self.term_dictionary, target_key.as_ref());
|
||||
let smaller_than = |k: &[u8]| { k.lt(target_key) };
|
||||
let (offset_before, _) = get_offset(smaller_than, streamer);
|
||||
self.offset_to = offset_before;
|
||||
self.offset_to = offset_before - self.origin;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn into_stream(self) -> StreamDictionaryStreamer<'a, V> {
|
||||
let data: &[u8] = &self.stream_dictionary.stream_data.as_slice()[..];
|
||||
/// Limit the range to terms lesser or equal to the bound
|
||||
pub fn le<T: AsRef<[u8]>>(mut self, bound: T) -> TermStreamerBuilder<'a, V> {
|
||||
let target_key = bound.as_ref();
|
||||
let streamer = stream_before(self.term_dictionary, target_key.as_ref());
|
||||
let smaller_than = |k: &[u8]| { k.le(target_key) };
|
||||
let (offset_before, _) = get_offset(smaller_than, streamer);
|
||||
self.offset_to = offset_before - self.origin;
|
||||
self
|
||||
}
|
||||
|
||||
pub(crate) fn new(term_dictionary: &'a TermDictionary<V>) -> TermStreamerBuilder<'a, V> {
|
||||
let data = term_dictionary.stream_data();
|
||||
let origin = data.as_ptr() as usize;
|
||||
let start = self.offset_from - origin;
|
||||
let stop = max(self.offset_to - origin, start);
|
||||
StreamDictionaryStreamer {
|
||||
TermStreamerBuilder {
|
||||
term_dictionary: term_dictionary,
|
||||
origin: origin,
|
||||
offset_from: 0,
|
||||
offset_to: data.len(),
|
||||
current_key: vec!(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Build the streamer.
|
||||
pub fn into_stream(self) -> TermStreamer<'a, V> {
|
||||
let data: &[u8] = self.term_dictionary.stream_data();
|
||||
let start = self.offset_from;
|
||||
let stop = max(self.offset_to, start);
|
||||
TermStreamer {
|
||||
cursor: &data[start..stop],
|
||||
current_key: self.current_key,
|
||||
current_value: V::default(),
|
||||
@@ -320,17 +120,26 @@ impl<'a, V: 'a + BinarySerializable + Clone + Default> StreamDictionaryStreamerB
|
||||
}
|
||||
}
|
||||
|
||||
pub struct StreamDictionaryStreamer<'a, V: BinarySerializable> {
|
||||
/// `TermStreamer` acts as a cursor over a range of terms of a segment.
|
||||
/// Terms are guaranteed to be sorted.
|
||||
pub struct TermStreamer<'a, V>
|
||||
where V: 'a + BinarySerializable + Default {
|
||||
cursor: &'a [u8],
|
||||
current_key: Vec<u8>,
|
||||
current_value: V,
|
||||
}
|
||||
|
||||
impl<'a, V: BinarySerializable> StreamDictionaryStreamer<'a, V> {
|
||||
|
||||
|
||||
impl<'a, V: BinarySerializable> TermStreamer<'a, V>
|
||||
where V: 'a + BinarySerializable + Default {
|
||||
|
||||
pub fn next(&mut self) -> Option<(&[u8], &V)> {
|
||||
/// Advance position the stream on the next item.
|
||||
/// Before the first call to `.advance()`, the stream
|
||||
/// is an unitialized state.
|
||||
pub fn advance(&mut self) -> bool {
|
||||
if self.cursor.len() == 0 {
|
||||
return None;
|
||||
return false;
|
||||
}
|
||||
let common_length: usize = VInt::deserialize(&mut self.cursor).unwrap().0 as usize;
|
||||
let new_length: usize = common_length + VInt::deserialize(&mut self.cursor).unwrap().0 as usize;
|
||||
@@ -340,124 +149,37 @@ impl<'a, V: BinarySerializable> StreamDictionaryStreamer<'a, V> {
|
||||
}
|
||||
self.cursor.read_exact(&mut self.current_key[common_length..new_length]).unwrap();
|
||||
self.current_value = V::deserialize(&mut self.cursor).unwrap();
|
||||
Some((&self.current_key, &self.current_value))
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/// Accesses the current key.
|
||||
///
|
||||
/// `.key()` should return the key that was returned
|
||||
/// by the `.next()` method.
|
||||
///
|
||||
/// If the end of the stream as been reached, and `.next()`
|
||||
/// has been called and returned `None`, `.key()` remains
|
||||
/// the value of the last key encounterred.
|
||||
///
|
||||
/// Before any call to `.next()`, `.key()` returns an empty array.
|
||||
pub fn key(&self) -> &[u8] {
|
||||
&self.current_key
|
||||
}
|
||||
|
||||
/// Accesses the current value.
|
||||
///
|
||||
/// Calling `.value()` after the end of the stream will return the
|
||||
/// last `.value()` encounterred.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Calling `.value()` before the first call to `.advance()` returns
|
||||
/// `V::default()`.
|
||||
pub fn value(&self) -> &V {
|
||||
&self.current_value
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
|
||||
use std::str;
|
||||
use directory::ReadOnlySource;
|
||||
use super::CountingWriter;
|
||||
use std::io::Write;
|
||||
use super::{BLOCK_SIZE, StreamDictionary, StreamDictionaryBuilder};
|
||||
|
||||
#[test]
|
||||
fn test_stream_dictionary() {
|
||||
let ids: Vec<_> = (0u32..10_000u32)
|
||||
.map(|i| (format!("doc{:0>6}", i), i))
|
||||
.collect();
|
||||
let buffer: Vec<u8> = {
|
||||
let mut stream_dictionary_builder = StreamDictionaryBuilder::new(vec!()).unwrap();
|
||||
for &(ref id, ref i) in &ids {
|
||||
stream_dictionary_builder.insert(id.as_bytes(), i).unwrap();
|
||||
}
|
||||
stream_dictionary_builder.finish().unwrap()
|
||||
};
|
||||
let source = ReadOnlySource::from(buffer);
|
||||
let stream_dictionary: StreamDictionary<u32> = StreamDictionary::from_source(source).unwrap();
|
||||
{
|
||||
let mut streamer = stream_dictionary.stream();
|
||||
let mut i = 0;
|
||||
while let Some((streamer_k, streamer_v)) = streamer.next() {
|
||||
let &(ref key, ref v) = &ids[i];
|
||||
assert_eq!(streamer_k, key.as_bytes());
|
||||
assert_eq!(streamer_v, v);
|
||||
i += 1;
|
||||
}
|
||||
}
|
||||
|
||||
let &(ref key, ref _v) = &ids[2047];
|
||||
stream_dictionary.get(key.as_bytes());
|
||||
pub(crate) fn extract_value(self) -> V {
|
||||
self.current_value
|
||||
}
|
||||
|
||||
|
||||
|
||||
#[test]
|
||||
fn test_stream_range() {
|
||||
let ids: Vec<_> = (0u32..10_000u32)
|
||||
.map(|i| (format!("doc{:0>6}", i), i))
|
||||
.collect();
|
||||
let buffer: Vec<u8> = {
|
||||
let mut stream_dictionary_builder = StreamDictionaryBuilder::new(vec!()).unwrap();
|
||||
for &(ref id, ref i) in &ids {
|
||||
stream_dictionary_builder.insert(id.as_bytes(), i).unwrap();
|
||||
}
|
||||
stream_dictionary_builder.finish().unwrap()
|
||||
};
|
||||
let source = ReadOnlySource::from(buffer);
|
||||
|
||||
let stream_dictionary: StreamDictionary<u32> = StreamDictionary::from_source(source).unwrap();
|
||||
{
|
||||
for i in (0..20).chain((BLOCK_SIZE - 10..BLOCK_SIZE + 10)) {
|
||||
let &(ref target_key, _) = &ids[i];
|
||||
let mut streamer = stream_dictionary
|
||||
.range()
|
||||
.ge(target_key.as_bytes())
|
||||
.into_stream();
|
||||
for j in 0..3 {
|
||||
let (streamer_k, streamer_v) = streamer.next().unwrap();
|
||||
let &(ref key, ref v) = &ids[i + j];
|
||||
assert_eq!(str::from_utf8(streamer_k).unwrap(), key);
|
||||
assert_eq!(streamer_v, v);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
for i in (0..20).chain((BLOCK_SIZE - 10..BLOCK_SIZE + 10)) {
|
||||
let &(ref target_key, _) = &ids[i];
|
||||
let mut streamer = stream_dictionary
|
||||
.range()
|
||||
.gt(target_key.as_bytes())
|
||||
.into_stream();
|
||||
for j in 0..3 {
|
||||
let (streamer_k, streamer_v) = streamer.next().unwrap();
|
||||
let &(ref key, ref v) = &ids[i + j + 1];
|
||||
assert_eq!(streamer_k, key.as_bytes());
|
||||
assert_eq!(streamer_v, v);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
for i in (0..20).chain((BLOCK_SIZE - 10..BLOCK_SIZE + 10)) {
|
||||
for j in 0..3 {
|
||||
let &(ref fst_key, _) = &ids[i];
|
||||
let &(ref last_key, _) = &ids[i + 3];
|
||||
let mut streamer = stream_dictionary
|
||||
.range()
|
||||
.ge(fst_key.as_bytes())
|
||||
.lt(last_key.as_bytes())
|
||||
.into_stream();
|
||||
for _ in 0..(j + 1) {
|
||||
assert!(streamer.next().is_some());
|
||||
}
|
||||
assert!(streamer.next().is_some());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,9 +1,6 @@
|
||||
#![allow(should_implement_trait)]
|
||||
|
||||
use std::cmp::max;
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
use std::io::Read;
|
||||
use std::io::{self, Write};
|
||||
use fst;
|
||||
use fst::raw::Fst;
|
||||
use common::VInt;
|
||||
@@ -12,10 +9,10 @@ use common::BinarySerializable;
|
||||
use std::marker::PhantomData;
|
||||
use common::CountingWriter;
|
||||
use std::cmp::Ordering;
|
||||
use fst::{IntoStreamer, Streamer};
|
||||
use std::str;
|
||||
use postings::TermInfo;
|
||||
use fst::raw::Node;
|
||||
use fst::raw::CompiledAddr;
|
||||
use super::TermStreamerBuilder;
|
||||
use super::streamer::stream_before;
|
||||
|
||||
const BLOCK_SIZE: usize = 1024;
|
||||
|
||||
@@ -23,7 +20,11 @@ fn convert_fst_error(e: fst::Error) -> io::Error {
|
||||
io::Error::new(io::ErrorKind::Other, e)
|
||||
}
|
||||
|
||||
pub struct StreamDictionaryBuilder<W: Write, V: BinarySerializable + Clone + Default> {
|
||||
/// Builder for the new term dictionary.
|
||||
///
|
||||
/// All terms must be inserted in order.
|
||||
pub struct TermDictionaryBuilder<W, V=TermInfo>
|
||||
where W: Write, V: BinarySerializable + Default {
|
||||
write: CountingWriter<W>,
|
||||
block_index: fst::MapBuilder<Vec<u8>>,
|
||||
last_key: Vec<u8>,
|
||||
@@ -51,61 +52,12 @@ fn fill_last<'a>(fst: &'a Fst, mut node: Node<'a>, buffer: &mut Vec<u8>) {
|
||||
}
|
||||
|
||||
|
||||
fn strictly_previous_key<B: AsRef<[u8]>>(fst_map: &fst::Map, key_as_ref: B) -> (Vec<u8>, u64) {
|
||||
let key = key_as_ref.as_ref();
|
||||
let fst = fst_map.as_fst();
|
||||
let mut node = fst.root();
|
||||
let mut node_stack: Vec<Node> = vec!(node.clone());
|
||||
impl<W: Write, V: BinarySerializable + Default> TermDictionaryBuilder<W, V> {
|
||||
|
||||
// first check the longest prefix.
|
||||
for &b in &key[..key.len() - 1] {
|
||||
node = match node.find_input(b) {
|
||||
None => {
|
||||
break;
|
||||
},
|
||||
Some(i) => {
|
||||
fst.node(node.transition_addr(i))
|
||||
},
|
||||
};
|
||||
node_stack.push(node);
|
||||
}
|
||||
|
||||
let len_node_stack = node_stack.len();
|
||||
for i in (1..len_node_stack).rev() {
|
||||
let cur_node = &node_stack[i];
|
||||
let b: u8 = key[i];
|
||||
let last_transition_opt = cur_node
|
||||
.transitions()
|
||||
.take_while(|transition| transition.inp < b)
|
||||
.last();
|
||||
|
||||
if let Some(last_transition) = last_transition_opt {
|
||||
let mut result_buffer = Vec::from(&key[..i]);
|
||||
result_buffer.push(last_transition.inp);
|
||||
let mut result = Vec::from(&key[..i]);
|
||||
result.push(last_transition.inp);
|
||||
let fork_node = fst.node(last_transition.addr);
|
||||
fill_last(fst, fork_node, &mut result);
|
||||
let val = fst_map.get(&result).unwrap();
|
||||
return (result, val);
|
||||
}
|
||||
else if cur_node.is_final() {
|
||||
// the previous key is a prefix
|
||||
let result_buffer = Vec::from(&key[..i]);
|
||||
let val = fst_map.get(&result_buffer).unwrap();
|
||||
return (result_buffer, val);
|
||||
}
|
||||
}
|
||||
|
||||
return (vec!(), 0);
|
||||
}
|
||||
|
||||
|
||||
impl<W: Write, V: BinarySerializable + Clone + Default> StreamDictionaryBuilder<W, V> {
|
||||
|
||||
pub fn new(write: W) -> io::Result<StreamDictionaryBuilder<W, V>> {
|
||||
/// Creates a new `TermDictionaryBuilder`
|
||||
pub fn new(write: W) -> io::Result<TermDictionaryBuilder<W, V>> {
|
||||
let buffer: Vec<u8> = vec!();
|
||||
Ok(StreamDictionaryBuilder {
|
||||
Ok(TermDictionaryBuilder {
|
||||
write: CountingWriter::wrap(write),
|
||||
block_index: fst::MapBuilder::new(buffer)
|
||||
.expect("This cannot fail"),
|
||||
@@ -119,12 +71,22 @@ impl<W: Write, V: BinarySerializable + Clone + Default> StreamDictionaryBuilder<
|
||||
self.block_index.insert(&self.last_key, self.write.written_bytes() as u64).unwrap();
|
||||
}
|
||||
|
||||
/// Inserts a `(key, value)` pair in the term dictionary.
|
||||
///
|
||||
/// *Keys have to be inserted in order.*
|
||||
pub fn insert(&mut self, key: &[u8], value: &V) -> io::Result<()>{
|
||||
self.insert_key(key)?;
|
||||
self.insert_value(value)
|
||||
}
|
||||
|
||||
pub fn insert_key(&mut self, key: &[u8]) -> io::Result<()>{
|
||||
/// # Warning
|
||||
/// Horribly dangerous internal API
|
||||
///
|
||||
/// If used, it must be used by systematically alternating calls
|
||||
/// to insert_key and insert_value.
|
||||
///
|
||||
/// Prefer using `.insert(key, value)`
|
||||
pub(crate) fn insert_key(&mut self, key: &[u8]) -> io::Result<()>{
|
||||
if self.len % BLOCK_SIZE == 0 {
|
||||
self.add_index_entry();
|
||||
}
|
||||
@@ -138,11 +100,13 @@ impl<W: Write, V: BinarySerializable + Clone + Default> StreamDictionaryBuilder<
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn insert_value(&mut self, value: &V) -> io::Result<()>{
|
||||
pub(crate) fn insert_value(&mut self, value: &V) -> io::Result<()>{
|
||||
value.serialize(&mut self.write)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Finalize writing the builder, and returns the underlying
|
||||
/// `Write` object.
|
||||
pub fn finish(mut self) -> io::Result<W> {
|
||||
self.add_index_entry();
|
||||
let (mut w, split_len) = self.write.finish()?;
|
||||
@@ -158,22 +122,6 @@ impl<W: Write, V: BinarySerializable + Clone + Default> StreamDictionaryBuilder<
|
||||
|
||||
|
||||
|
||||
fn stream_before<'a, V: 'a + Clone + Default + BinarySerializable>(stream_dictionary: &'a StreamDictionary<V>, target_key: &[u8]) -> StreamDictionaryStreamer<'a, V> {
|
||||
let (prev_key, offset) = strictly_previous_key(&stream_dictionary.fst_index, target_key.as_ref());
|
||||
let offset: usize = offset as usize;
|
||||
StreamDictionaryStreamer {
|
||||
cursor: &stream_dictionary.stream_data.as_slice()[offset..],
|
||||
current_key: Vec::from(prev_key),
|
||||
current_value: V::default(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
pub struct StreamDictionary<V> where V:BinarySerializable + Default + Clone {
|
||||
stream_data: ReadOnlySource,
|
||||
fst_index: fst::Map,
|
||||
_phantom_: PhantomData<V>,
|
||||
}
|
||||
|
||||
fn open_fst_index(source: ReadOnlySource) -> io::Result<fst::Map> {
|
||||
Ok(fst::Map::from(match source {
|
||||
@@ -182,10 +130,18 @@ fn open_fst_index(source: ReadOnlySource) -> io::Result<fst::Map> {
|
||||
}))
|
||||
}
|
||||
|
||||
/// Datastructure to access the `terms` of a segment.
|
||||
pub struct TermDictionary<V=TermInfo> where V: BinarySerializable + Default {
|
||||
stream_data: ReadOnlySource,
|
||||
fst_index: fst::Map,
|
||||
_phantom_: PhantomData<V>,
|
||||
}
|
||||
|
||||
impl<V: BinarySerializable + Clone + Default> StreamDictionary<V> {
|
||||
impl<V> TermDictionary<V>
|
||||
where V: BinarySerializable + Default {
|
||||
|
||||
pub fn from_source(source: ReadOnlySource) -> io::Result<StreamDictionary<V>> {
|
||||
/// Opens a `TermDictionary` given a data source.
|
||||
pub fn from_source(source: ReadOnlySource) -> io::Result<TermDictionary<V>> {
|
||||
let total_len = source.len();
|
||||
let length_offset = total_len - 8;
|
||||
let split_len: usize = {
|
||||
@@ -196,21 +152,75 @@ impl<V: BinarySerializable + Clone + Default> StreamDictionary<V> {
|
||||
let fst_data = source.slice(split_len, length_offset);
|
||||
let fst_index = open_fst_index(fst_data)?;
|
||||
|
||||
Ok(StreamDictionary {
|
||||
Ok(TermDictionary {
|
||||
stream_data: stream_data,
|
||||
fst_index: fst_index,
|
||||
_phantom_: PhantomData
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn stream_data(&self) -> &[u8] {
|
||||
self.stream_data.as_slice()
|
||||
}
|
||||
|
||||
pub(crate) fn strictly_previous_key(&self, key: &[u8]) -> (Vec<u8>, u64) {
|
||||
let fst_map = &self.fst_index;
|
||||
let fst = fst_map.as_fst();
|
||||
let mut node = fst.root();
|
||||
let mut node_stack: Vec<Node> = vec!(node.clone());
|
||||
|
||||
// first check the longest prefix.
|
||||
for &b in &key[..key.len() - 1] {
|
||||
node = match node.find_input(b) {
|
||||
None => {
|
||||
break;
|
||||
},
|
||||
Some(i) => {
|
||||
fst.node(node.transition_addr(i))
|
||||
},
|
||||
};
|
||||
node_stack.push(node);
|
||||
}
|
||||
|
||||
let len_node_stack = node_stack.len();
|
||||
for i in (1..len_node_stack).rev() {
|
||||
let cur_node = &node_stack[i];
|
||||
let b: u8 = key[i];
|
||||
let last_transition_opt = cur_node
|
||||
.transitions()
|
||||
.take_while(|transition| transition.inp < b)
|
||||
.last();
|
||||
|
||||
if let Some(last_transition) = last_transition_opt {
|
||||
let mut result_buffer = Vec::from(&key[..i]);
|
||||
result_buffer.push(last_transition.inp);
|
||||
let mut result = Vec::from(&key[..i]);
|
||||
result.push(last_transition.inp);
|
||||
let fork_node = fst.node(last_transition.addr);
|
||||
fill_last(fst, fork_node, &mut result);
|
||||
let val = fst_map.get(&result).unwrap();
|
||||
return (result, val);
|
||||
}
|
||||
else if cur_node.is_final() {
|
||||
// the previous key is a prefix
|
||||
let result_buffer = Vec::from(&key[..i]);
|
||||
let val = fst_map.get(&result_buffer).unwrap();
|
||||
return (result_buffer, val);
|
||||
}
|
||||
}
|
||||
|
||||
return (vec!(), 0);
|
||||
}
|
||||
|
||||
/// Lookups the value corresponding to the key.
|
||||
pub fn get<K: AsRef<[u8]>>(&self, target_key: K) -> Option<V> {
|
||||
let mut streamer = stream_before(self, target_key.as_ref());
|
||||
while let Some((iter_key, iter_val)) = streamer.next() {
|
||||
match iter_key.cmp(target_key.as_ref()) {
|
||||
while streamer.advance() {
|
||||
let position = streamer.key().cmp(target_key.as_ref());
|
||||
match position {
|
||||
Ordering::Less => {}
|
||||
Ordering::Equal => {
|
||||
let val: V = (*iter_val).clone();
|
||||
return Some(val);
|
||||
return Some(streamer.extract_value())
|
||||
}
|
||||
Ordering::Greater => {
|
||||
return None;
|
||||
@@ -219,136 +229,11 @@ impl<V: BinarySerializable + Clone + Default> StreamDictionary<V> {
|
||||
}
|
||||
return None;
|
||||
}
|
||||
|
||||
pub fn range(&self) -> StreamDictionaryStreamerBuilder<V> {
|
||||
let data: &[u8] = &self.stream_data;
|
||||
StreamDictionaryStreamerBuilder {
|
||||
stream_dictionary: &self,
|
||||
offset_from: 0,
|
||||
offset_to: (data.as_ptr() as usize) + data.len(),
|
||||
current_key: vec!(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn stream(&self) -> StreamDictionaryStreamer<V> {
|
||||
StreamDictionaryStreamer {
|
||||
cursor: &*self.stream_data,
|
||||
current_key: Vec::with_capacity(128),
|
||||
current_value: V::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct StreamDictionaryStreamerBuilder<'a, V: 'a + BinarySerializable + Clone + Default> {
|
||||
stream_dictionary: &'a StreamDictionary<V>,
|
||||
offset_from: usize,
|
||||
offset_to: usize,
|
||||
current_key: Vec<u8>,
|
||||
}
|
||||
|
||||
|
||||
/// Returns offset information for the first
|
||||
/// key in the stream matching a given predicate.
|
||||
///
|
||||
/// returns (start offset, the data required to load the value)
|
||||
fn get_offset<'a, V, P: Fn(&[u8])->bool>(predicate: P, mut streamer: StreamDictionaryStreamer<V>) -> (usize, Vec<u8>)
|
||||
where V: 'a + BinarySerializable + Clone + Default {
|
||||
let mut prev: &[u8] = streamer.cursor;
|
||||
|
||||
let mut prev_data: Vec<u8> = streamer.current_key.clone();
|
||||
|
||||
while let Some((iter_key, _)) = streamer.next() {
|
||||
if !predicate(iter_key) {
|
||||
return (prev.as_ptr() as usize, prev_data);
|
||||
}
|
||||
prev = streamer.cursor;
|
||||
prev_data.clear();
|
||||
prev_data.extend_from_slice(iter_key);
|
||||
}
|
||||
return (prev.as_ptr() as usize, prev_data);
|
||||
}
|
||||
|
||||
impl<'a, V: 'a + BinarySerializable + Clone + Default> StreamDictionaryStreamerBuilder<'a, V> {
|
||||
pub fn ge<T: AsRef<[u8]>>(mut self, bound: T) -> StreamDictionaryStreamerBuilder<'a, V> {
|
||||
let target_key = bound.as_ref();
|
||||
let streamer = stream_before(&self.stream_dictionary, target_key.as_ref());
|
||||
let smaller_than = |k: &[u8]| { k.lt(target_key) };
|
||||
let (offset_before, current_key) = get_offset(smaller_than, streamer);
|
||||
self.current_key = current_key;
|
||||
self.offset_from = offset_before;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn gt<T: AsRef<[u8]>>(mut self, bound: T) -> StreamDictionaryStreamerBuilder<'a, V> {
|
||||
let target_key = bound.as_ref();
|
||||
let streamer = stream_before(self.stream_dictionary, target_key.as_ref());
|
||||
let smaller_than = |k: &[u8]| { k.le(target_key) };
|
||||
let (offset_before, current_key) = get_offset(smaller_than, streamer);
|
||||
self.current_key = current_key;
|
||||
self.offset_from = offset_before;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn lt<T: AsRef<[u8]>>(mut self, bound: T) -> StreamDictionaryStreamerBuilder<'a, V> {
|
||||
let target_key = bound.as_ref();
|
||||
let streamer = stream_before(self.stream_dictionary, target_key.as_ref());
|
||||
let smaller_than = |k: &[u8]| { k.le(target_key) };
|
||||
let (offset_before, _) = get_offset(smaller_than, streamer);
|
||||
self.offset_to = offset_before;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn le<T: AsRef<[u8]>>(mut self, bound: T) -> StreamDictionaryStreamerBuilder<'a, V> {
|
||||
let target_key = bound.as_ref();
|
||||
let streamer = stream_before(self.stream_dictionary, target_key.as_ref());
|
||||
let smaller_than = |k: &[u8]| { k.lt(target_key) };
|
||||
let (offset_before, _) = get_offset(smaller_than, streamer);
|
||||
self.offset_to = offset_before;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn into_stream(self) -> StreamDictionaryStreamer<'a, V> {
|
||||
let data: &[u8] = &self.stream_dictionary.stream_data.as_slice()[..];
|
||||
let origin = data.as_ptr() as usize;
|
||||
let start = self.offset_from - origin;
|
||||
let stop = max(self.offset_to - origin, start);
|
||||
StreamDictionaryStreamer {
|
||||
cursor: &data[start..stop],
|
||||
current_key: self.current_key,
|
||||
current_value: V::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct StreamDictionaryStreamer<'a, V: BinarySerializable> {
|
||||
cursor: &'a [u8],
|
||||
current_key: Vec<u8>,
|
||||
current_value: V,
|
||||
}
|
||||
|
||||
impl<'a, V: BinarySerializable> StreamDictionaryStreamer<'a, V> {
|
||||
|
||||
pub fn next(&mut self) -> Option<(&[u8], &V)> {
|
||||
if self.cursor.len() == 0 {
|
||||
return None;
|
||||
}
|
||||
let common_length: usize = VInt::deserialize(&mut self.cursor).unwrap().0 as usize;
|
||||
let new_length: usize = common_length + VInt::deserialize(&mut self.cursor).unwrap().0 as usize;
|
||||
self.current_key.reserve(new_length);
|
||||
unsafe {
|
||||
self.current_key.set_len(new_length);
|
||||
}
|
||||
self.cursor.read_exact(&mut self.current_key[common_length..new_length]).unwrap();
|
||||
self.current_value = V::deserialize(&mut self.cursor).unwrap();
|
||||
Some((&self.current_key, &self.current_value))
|
||||
}
|
||||
|
||||
|
||||
pub fn key(&self) -> &[u8] {
|
||||
&self.current_key
|
||||
}
|
||||
|
||||
pub fn value(&self) -> &V {
|
||||
&self.current_value
|
||||
|
||||
/// Returns a range builder, to stream all of the terms
|
||||
/// within an interval.
|
||||
pub fn range(&self) -> TermStreamerBuilder<V> {
|
||||
TermStreamerBuilder::new(self)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user