Compare commits

..

5 Commits

Author SHA1 Message Date
Paul Masurel
dc769b373b Closes #500 2019-02-22 08:59:11 +09:00
Paul Masurel
5f07dc35d8 32bits platforms 2019-02-14 09:12:25 +09:00
Paul Masurel
176f67a266 Refactoring 2019-01-23 10:06:40 +09:00
Paul Masurel
19babff849 Closes #476 2019-01-23 10:06:39 +09:00
Paul Masurel
bf2576adf9 Added a broken unit test 2019-01-23 10:04:27 +09:00
32 changed files with 681 additions and 894 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "tantivy"
version = "0.8.0"
version = "0.8.3"
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
license = "MIT"
categories = ["database-implementations", "data-structures"]
@@ -16,8 +16,8 @@ base64 = "0.10.0"
byteorder = "1.0"
lazy_static = "1"
regex = "1.0"
fst = {version="0.3", default-features=false}
fst-regex = { version="0.2" }
tantivy-fst = {path="../tantivy-search/fst", version="0.1"}
memmap = "0.7"
lz4 = {version="1.20", optional=true}
snap = {version="0.2"}
atomicwrites = {version="0.2.2", optional=true}
@@ -30,7 +30,7 @@ serde_derive = "1.0"
serde_json = "1.0"
num_cpus = "1.2"
itertools = "0.8"
levenshtein_automata = {version="0.1", features=["fst_automaton"]}
levenshtein_automata = {version="0.1"}
bit-set = "0.5"
uuid = { version = "0.7", features = ["v4", "serde"] }
crossbeam = "0.5"
@@ -70,7 +70,7 @@ overflow-checks = true
[features]
# by default no-fail is disabled. We manually enable it when running test.
default = ["mmap", "no_fail"]
mmap = ["fst/mmap", "atomicwrites"]
mmap = ["atomicwrites"]
lz4-compression = ["lz4"]
no_fail = ["fail/no_fail"]
unstable = [] # useful for benches.

View File

@@ -21,7 +21,7 @@
**Tantivy** is a **full text search engine library** written in rust.
It is closer to [Apache Lucene](https://lucene.apache.org/) than to [Elasticsearch](https://www.elastic.co/products/elasticsearch) and [Apache Solr](https://lucene.apache.org/solr/) in the sense it is not
It is closer to [Apache Lucene](https://lucene.apache.org/) than to [Elastic Search](https://www.elastic.co/products/elasticsearch) and [Apache Solr](https://lucene.apache.org/solr/) in the sense it is not
an off-the-shelf search engine server, but rather a crate that can be used
to build such a search engine.
@@ -76,7 +76,7 @@ It will walk you through getting a wikipedia search engine up and running in a f
Tantivy compiles on stable rust but requires `Rust >= 1.27`.
To check out and run tests, you can simply run :
git clone https://github.com/tantivy-search/tantivy.git
git clone git@github.com:tantivy-search/tantivy.git
cd tantivy
cargo build

View File

@@ -96,10 +96,7 @@ impl<TFruit: Fruit> FruitHandle<TFruit> {
/// Multicollector makes it possible to collect on more than one collector.
/// It should only be used for use cases where the Collector types is unknown
/// at compile time.
///
/// If the type of the collectors is known, you can just group yours collectors
/// in a tuple. See the
/// [Combining several collectors section of the collector documentation](./index.html#combining-several-collectors).
/// If the type of the collectors is known, you should prefer to use `ChainedCollector`.
///
/// ```rust
/// #[macro_use]

View File

@@ -104,6 +104,31 @@ where
let val_shifted = (val_unshifted_unmasked >> bit_shift) as u64;
val_shifted & mask
}
/// Reads a range of values from the fast field.
///
/// The range of values read is from
/// `[start..start + output.len()[`
pub fn get_range(&self, start: u32, output: &mut [u64]) {
if self.num_bits == 0 {
for val in output.iter_mut() {
*val = 0u64;
}
} else {
let data: &[u8] = &*self.data;
let num_bits = self.num_bits;
let mask = self.mask;
let mut addr_in_bits = (start as usize) * num_bits;
for output_val in output.iter_mut() {
let addr = addr_in_bits >> 3;
let bit_shift = addr_in_bits & 7;
let val_unshifted_unmasked: u64 = LittleEndian::read_u64(&data[addr..]);
let val_shifted = (val_unshifted_unmasked >> bit_shift) as u64;
*output_val = val_shifted & mask;
addr_in_bits += num_bits;
}
}
}
}
#[cfg(test)]
@@ -141,4 +166,17 @@ mod test {
test_bitpacker_util(6, 14);
test_bitpacker_util(1000, 14);
}
#[test]
fn test_bitpacker_range() {
let (bitunpacker, vals) = create_fastfield_bitpacker(100_000, 12);
let buffer_len = 100;
let mut buffer = vec![0u64; buffer_len];
for start in vec![0, 10, 20, 100, 1_000] {
bitunpacker.get_range(start as u32, &mut buffer[..]);
for i in 0..buffer_len {
assert_eq!(buffer[i], vals[start + i]);
}
}
}
}

View File

@@ -10,7 +10,7 @@ pub(crate) use self::bitset::TinySet;
pub(crate) use self::composite_file::{CompositeFile, CompositeWrite};
pub use self::counting_writer::CountingWriter;
pub use self::serialize::{BinarySerializable, FixedSize};
pub use self::vint::{read_u32_vint, serialize_vint_u32, write_u32_vint, VInt};
pub use self::vint::VInt;
pub use byteorder::LittleEndian as Endianness;
use std::io;

View File

@@ -1,5 +1,4 @@
use super::BinarySerializable;
use byteorder::{ByteOrder, LittleEndian};
use std::io;
use std::io::Read;
use std::io::Write;
@@ -10,100 +9,6 @@ pub struct VInt(pub u64);
const STOP_BIT: u8 = 128;
pub fn serialize_vint_u32(val: u32) -> (u64, usize) {
const START_2: u64 = 1 << 7;
const START_3: u64 = 1 << 14;
const START_4: u64 = 1 << 21;
const START_5: u64 = 1 << 28;
const STOP_1: u64 = START_2 - 1;
const STOP_2: u64 = START_3 - 1;
const STOP_3: u64 = START_4 - 1;
const STOP_4: u64 = START_5 - 1;
const MASK_1: u64 = 127;
const MASK_2: u64 = MASK_1 << 7;
const MASK_3: u64 = MASK_2 << 7;
const MASK_4: u64 = MASK_3 << 7;
const MASK_5: u64 = MASK_4 << 7;
let val = u64::from(val);
const STOP_BIT: u64 = 128u64;
match val {
0...STOP_1 => (val | STOP_BIT, 1),
START_2...STOP_2 => (
(val & MASK_1) | ((val & MASK_2) << 1) | (STOP_BIT << (8)),
2,
),
START_3...STOP_3 => (
(val & MASK_1) | ((val & MASK_2) << 1) | ((val & MASK_3) << 2) | (STOP_BIT << (8 * 2)),
3,
),
START_4...STOP_4 => (
(val & MASK_1)
| ((val & MASK_2) << 1)
| ((val & MASK_3) << 2)
| ((val & MASK_4) << 3)
| (STOP_BIT << (8 * 3)),
4,
),
_ => (
(val & MASK_1)
| ((val & MASK_2) << 1)
| ((val & MASK_3) << 2)
| ((val & MASK_4) << 3)
| ((val & MASK_5) << 4)
| (STOP_BIT << (8 * 4)),
5,
),
}
}
/// Returns the number of bytes covered by a
/// serialized vint `u32`.
///
/// Expects a buffer data that starts
/// by the serialized `vint`, scans at most 5 bytes ahead until
/// it finds the vint final byte.
///
/// # May Panic
/// If the payload does not start by a valid `vint`
fn vint_len(data: &[u8]) -> usize {
for i in 0..5.min(data.len()) {
if data[i] >= STOP_BIT {
return i + 1;
}
}
panic!("Corrupted data. Invalid VInt 32");
}
/// Reads a vint `u32` from a buffer, and
/// consumes its payload data.
///
/// # Panics
///
/// If the buffer does not start by a valid
/// vint payload
pub fn read_u32_vint(data: &mut &[u8]) -> u32 {
let vlen = vint_len(*data);
let mut result = 0u32;
let mut shift = 0u64;
for &b in &data[..vlen] {
result |= u32::from(b & 127u8) << shift;
shift += 7;
}
*data = &data[vlen..];
result
}
/// Write a `u32` as a vint payload.
pub fn write_u32_vint<W: io::Write>(val: u32, writer: &mut W) -> io::Result<()> {
let (val, num_bytes) = serialize_vint_u32(val);
let mut buffer = [0u8; 8];
LittleEndian::write_u64(&mut buffer, val);
writer.write_all(&buffer[..num_bytes])
}
impl VInt {
pub fn val(&self) -> u64 {
self.0
@@ -119,7 +24,7 @@ impl VInt {
output.extend(&buffer[0..num_bytes]);
}
pub fn serialize_into(&self, buffer: &mut [u8; 10]) -> usize {
fn serialize_into(&self, buffer: &mut [u8; 10]) -> usize {
let mut remaining = self.0;
for (i, b) in buffer.iter_mut().enumerate() {
let next_byte: u8 = (remaining % 128u64) as u8;
@@ -159,7 +64,7 @@ impl BinarySerializable for VInt {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Reach end of buffer while reading VInt",
));
))
}
}
}
@@ -169,9 +74,7 @@ impl BinarySerializable for VInt {
#[cfg(test)]
mod tests {
use super::serialize_vint_u32;
use super::VInt;
use byteorder::{ByteOrder, LittleEndian};
use common::BinarySerializable;
fn aux_test_vint(val: u64) {
@@ -205,28 +108,4 @@ mod tests {
}
aux_test_vint(10);
}
fn aux_test_serialize_vint_u32(val: u32) {
let mut buffer = [0u8; 10];
let mut buffer2 = [0u8; 10];
let len_vint = VInt(val as u64).serialize_into(&mut buffer);
let (vint, len) = serialize_vint_u32(val);
assert_eq!(len, len_vint, "len wrong for val {}", val);
LittleEndian::write_u64(&mut buffer2, vint);
assert_eq!(&buffer[..len], &buffer2[..len], "array wrong for {}", val);
}
#[test]
fn test_vint_u32() {
aux_test_serialize_vint_u32(0);
aux_test_serialize_vint_u32(1);
aux_test_serialize_vint_u32(5);
for i in 1..3 {
let power_of_128 = 1u32 << (7 * i);
aux_test_serialize_vint_u32(power_of_128 - 1u32);
aux_test_serialize_vint_u32(power_of_128);
aux_test_serialize_vint_u32(power_of_128 + 1u32);
}
aux_test_serialize_vint_u32(u32::max_value());
}
}

View File

@@ -64,7 +64,7 @@ impl Executor {
// This is important as it makes it possible for the fruit_receiver iteration to
// terminate.
};
// This is lame, but safe.
// This is lame, but it does not use unsafe code.
let mut results_with_position = Vec::with_capacity(num_fruits);
for (pos, fruit_res) in fruit_receiver {
let fruit = fruit_res?;

View File

@@ -150,7 +150,7 @@ impl Index {
///
/// This will overwrite existing meta.json
fn from_directory(mut directory: ManagedDirectory, schema: Schema) -> Result<Index> {
save_new_metas(schema.clone(), 0, directory.borrow_mut())?;
save_new_metas(schema.clone(), directory.borrow_mut())?;
let metas = IndexMeta::with_schema(schema);
Index::create_from_metas(directory, &metas)
}

View File

@@ -1,12 +1,9 @@
use atomicwrites;
use common::make_io_err;
use directory::error::{DeleteError, IOError, OpenDirectoryError, OpenReadError, OpenWriteError};
use directory::shared_vec_slice::SharedVecSlice;
use directory::Directory;
use directory::ReadOnlySource;
use directory::WritePtr;
use fst::raw::MmapReadOnly;
use std::collections::hash_map::Entry as HashMapEntry;
use std::collections::HashMap;
use std::convert::From;
use std::fmt;
@@ -19,11 +16,14 @@ use std::result;
use std::sync::Arc;
use std::sync::RwLock;
use tempdir::TempDir;
use memmap::Mmap;
use std::sync::Weak;
use std::ops::Deref;
/// Returns None iff the file exists, can be read, but is empty (and hence
/// cannot be mmapped).
///
fn open_mmap(full_path: &Path) -> result::Result<Option<MmapReadOnly>, OpenReadError> {
fn open_mmap(full_path: &Path) -> result::Result<Option<Mmap>, OpenReadError> {
let file = File::open(full_path).map_err(|e| {
if e.kind() == io::ErrorKind::NotFound {
OpenReadError::FileDoesNotExist(full_path.to_owned())
@@ -42,7 +42,7 @@ fn open_mmap(full_path: &Path) -> result::Result<Option<MmapReadOnly>, OpenReadE
return Ok(None);
}
unsafe {
MmapReadOnly::open(&file)
memmap::Mmap::map(&file)
.map(Some)
.map_err(|e| From::from(IOError::with_path(full_path.to_owned(), e)))
}
@@ -65,7 +65,7 @@ pub struct CacheInfo {
struct MmapCache {
counters: CacheCounters,
cache: HashMap<PathBuf, MmapReadOnly>,
cache: HashMap<PathBuf, Weak<Box<Deref<Target=[u8]> + Send + Sync>>>,
}
impl Default for MmapCache {
@@ -78,10 +78,6 @@ impl Default for MmapCache {
}
impl MmapCache {
/// Removes a `MmapReadOnly` entry from the mmap cache.
fn discard_from_cache(&mut self, full_path: &Path) -> bool {
self.cache.remove(full_path).is_some()
}
fn get_info(&mut self) -> CacheInfo {
let paths: Vec<PathBuf> = self.cache.keys().cloned().collect();
@@ -91,23 +87,27 @@ impl MmapCache {
}
}
fn get_mmap(&mut self, full_path: &Path) -> Result<Option<MmapReadOnly>, OpenReadError> {
Ok(match self.cache.entry(full_path.to_owned()) {
HashMapEntry::Occupied(occupied_entry) => {
let mmap = occupied_entry.get();
self.counters.hit += 1;
Some(mmap.clone())
}
HashMapEntry::Vacant(vacant_entry) => {
self.counters.miss += 1;
if let Some(mmap) = open_mmap(full_path)? {
vacant_entry.insert(mmap.clone());
Some(mmap)
} else {
None
// Returns None if the file exists but as a len of 0 (and hence is not mmappable).
fn get_mmap(&mut self, full_path: &Path) -> Result<Option<Arc<Box<Deref<Target=[u8]> + Send + Sync>>>, OpenReadError> {
let path_in_cache = self.cache.contains_key(full_path);
if path_in_cache {
{
let mmap_weak_opt = self.cache.get(full_path);
if let Some(mmap_arc) = mmap_weak_opt.and_then(|mmap_weak| mmap_weak.upgrade()) {
self.counters.hit += 1;
return Ok(Some(mmap_arc));
}
}
})
self.cache.remove(full_path);
}
self.counters.miss += 1;
if let Some(mmap) = open_mmap(full_path)? {
let res: Arc<Box<Deref<Target=[u8]> + Send + Sync>> = Arc::new(Box::new(mmap));
self.cache.insert(full_path.to_owned(), Arc::downgrade(&res));
Ok(Some(res))
} else {
Ok(None)
}
}
}
@@ -253,11 +253,10 @@ impl Directory for MmapDirectory {
);
IOError::with_path(path.to_owned(), make_io_err(msg))
})?;
Ok(mmap_cache
.get_mmap(&full_path)?
.map(ReadOnlySource::Mmap)
.unwrap_or_else(|| ReadOnlySource::Anonymous(SharedVecSlice::empty())))
.map(ReadOnlySource::from)
.unwrap_or_else(|| ReadOnlySource::empty()))
}
fn open_write(&mut self, path: &Path) -> Result<WritePtr, OpenWriteError> {
@@ -295,20 +294,6 @@ impl Directory for MmapDirectory {
fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
debug!("Deleting file {:?}", path);
let full_path = self.resolve_path(path);
let mut mmap_cache = self.mmap_cache.write().map_err(|_| {
let msg = format!(
"Failed to acquired write lock \
on mmap cache while deleting {:?}",
path
);
IOError::with_path(path.to_owned(), make_io_err(msg))
})?;
mmap_cache.discard_from_cache(path);
// Removing the entry in the MMap cache.
// The munmap will appear on Drop,
// when the last reference is gone.
mmap_cache.cache.remove(&full_path);
match fs::remove_file(&full_path) {
Ok(_) => self
.sync_directory()
@@ -403,25 +388,50 @@ mod tests {
w.flush().unwrap();
}
}
{
for (i, path) in paths.iter().enumerate() {
let _r = mmap_directory.open_read(path).unwrap();
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), i + 1);
}
for path in paths.iter() {
let _r = mmap_directory.open_read(path).unwrap();
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), num_paths);
}
for (i, path) in paths.iter().enumerate() {
mmap_directory.delete(path).unwrap();
assert_eq!(
mmap_directory.get_cache_info().mmapped.len(),
num_paths - i - 1
);
}
let mut keep = vec![];
for (i, path) in paths.iter().enumerate() {
keep.push(mmap_directory.open_read(path).unwrap());
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), i + 1);
}
assert_eq!(mmap_directory.get_cache_info().counters.hit, 0);
assert_eq!(mmap_directory.get_cache_info().counters.miss, 10);
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 10);
for path in paths.iter() {
let _r = mmap_directory.open_read(path).unwrap();
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), num_paths);
}
assert_eq!(mmap_directory.get_cache_info().counters.hit, 10);
assert_eq!(mmap_directory.get_cache_info().counters.miss, 10);
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 10);
for path in paths.iter() {
let _r = mmap_directory.open_read(path).unwrap();
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), num_paths);
}
assert_eq!(mmap_directory.get_cache_info().counters.hit, 20);
assert_eq!(mmap_directory.get_cache_info().counters.miss, 10);
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 10);
drop(keep);
for path in paths.iter() {
let _r = mmap_directory.open_read(path).unwrap();
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), num_paths);
}
assert_eq!(mmap_directory.get_cache_info().counters.hit, 20);
assert_eq!(mmap_directory.get_cache_info().counters.miss, 20);
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 10);
for path in &paths {
mmap_directory.delete(path).unwrap();
}
assert_eq!(mmap_directory.get_cache_info().counters.hit, 20);
assert_eq!(mmap_directory.get_cache_info().counters.miss, 20);
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 10);
for path in paths.iter() {
assert!(mmap_directory.open_read(path).is_err());
}
assert_eq!(mmap_directory.get_cache_info().counters.hit, 20);
assert_eq!(mmap_directory.get_cache_info().counters.miss, 30);
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 0);
}

View File

@@ -11,7 +11,6 @@ mod directory;
mod managed_directory;
mod ram_directory;
mod read_only_source;
mod shared_vec_slice;
/// Errors specific to the directory module.
pub mod error;

View File

@@ -1,4 +1,3 @@
use super::shared_vec_slice::SharedVecSlice;
use common::make_io_err;
use directory::error::{DeleteError, IOError, OpenReadError, OpenWriteError};
use directory::WritePtr;
@@ -71,7 +70,7 @@ impl Write for VecWriter {
}
#[derive(Clone)]
struct InnerDirectory(Arc<RwLock<HashMap<PathBuf, Arc<Vec<u8>>>>>);
struct InnerDirectory(Arc<RwLock<HashMap<PathBuf, ReadOnlySource>>>);
impl InnerDirectory {
fn new() -> InnerDirectory {
@@ -85,7 +84,7 @@ impl InnerDirectory {
path
))
})?;
let prev_value = map.insert(path, Arc::new(Vec::from(data)));
let prev_value = map.insert(path, ReadOnlySource::new(Vec::from(data)));
Ok(prev_value.is_some())
}
@@ -105,8 +104,7 @@ impl InnerDirectory {
readable_map
.get(path)
.ok_or_else(|| OpenReadError::FileDoesNotExist(PathBuf::from(path)))
.map(Arc::clone)
.map(|data| ReadOnlySource::Anonymous(SharedVecSlice::new(data)))
.map(|el| el.clone())
})
}

View File

@@ -1,9 +1,8 @@
use super::shared_vec_slice::SharedVecSlice;
use common::HasLen;
#[cfg(feature = "mmap")]
use fst::raw::MmapReadOnly;
use stable_deref_trait::{CloneStableDeref, StableDeref};
use std::ops::Deref;
use std::sync::Arc;
/// Read object that represents files in tantivy.
///
@@ -11,12 +10,10 @@ use std::ops::Deref;
/// the data in the form of a constant read-only `&[u8]`.
/// Whatever happens to the directory file, the data
/// hold by this object should never be altered or destroyed.
pub enum ReadOnlySource {
/// Mmap source of data
#[cfg(feature = "mmap")]
Mmap(MmapReadOnly),
/// Wrapping a `Vec<u8>`
Anonymous(SharedVecSlice),
pub struct ReadOnlySource {
data: Arc<Box<Deref<Target=[u8]> + Send + Sync + 'static>>,
start: usize,
stop: usize
}
unsafe impl StableDeref for ReadOnlySource {}
@@ -30,19 +27,41 @@ impl Deref for ReadOnlySource {
}
}
impl From<Arc<Box<Deref<Target=[u8]> + Send + Sync>>> for ReadOnlySource {
fn from(data: Arc<Box<Deref<Target=[u8]> + Send + Sync>>) -> Self {
let len = data.len();
ReadOnlySource {
data,
start: 0,
stop: len
}
}
}
const EMPTY_ARRAY: [u8; 0] = [0u8; 0];
impl ReadOnlySource {
/// Creates a new `ReadOnlySource`.
pub fn new<D>(data: D) -> ReadOnlySource
where D: Deref<Target=[u8]> + Send + Sync + 'static {
let len = data.len();
ReadOnlySource {
data: Arc::new(Box::new(data)),
start: 0,
stop: len
}
}
/// Creates an empty ReadOnlySource
pub fn empty() -> ReadOnlySource {
ReadOnlySource::Anonymous(SharedVecSlice::empty())
ReadOnlySource::new(&EMPTY_ARRAY[..])
}
/// Returns the data underlying the ReadOnlySource object.
pub fn as_slice(&self) -> &[u8] {
match *self {
#[cfg(feature = "mmap")]
ReadOnlySource::Mmap(ref mmap_read_only) => mmap_read_only.as_slice(),
ReadOnlySource::Anonymous(ref shared_vec) => shared_vec.as_slice(),
}
&self.data[self.start..self.stop]
}
/// Splits into 2 `ReadOnlySource`, at the offset given
@@ -63,22 +82,18 @@ impl ReadOnlySource {
/// worth of data in anonymous memory, and only a
/// 1KB slice is remaining, the whole `500MBs`
/// are retained in memory.
pub fn slice(&self, from_offset: usize, to_offset: usize) -> ReadOnlySource {
pub fn slice(&self, start: usize, stop: usize) -> ReadOnlySource {
assert!(
from_offset <= to_offset,
start <= stop,
"Requested negative slice [{}..{}]",
from_offset,
to_offset
start,
stop
);
match *self {
#[cfg(feature = "mmap")]
ReadOnlySource::Mmap(ref mmap_read_only) => {
let sliced_mmap = mmap_read_only.range(from_offset, to_offset - from_offset);
ReadOnlySource::Mmap(sliced_mmap)
}
ReadOnlySource::Anonymous(ref shared_vec) => {
ReadOnlySource::Anonymous(shared_vec.slice(from_offset, to_offset))
}
assert!(stop <= self.len());
ReadOnlySource {
data: self.data.clone(),
start: self.start + start,
stop: self.start + stop
}
}
@@ -87,8 +102,7 @@ impl ReadOnlySource {
///
/// Equivalent to `.slice(from_offset, self.len())`
pub fn slice_from(&self, from_offset: usize) -> ReadOnlySource {
let len = self.len();
self.slice(from_offset, len)
self.slice(from_offset, self.len())
}
/// Like `.slice(...)` but enforcing only the `to`
@@ -102,19 +116,18 @@ impl ReadOnlySource {
impl HasLen for ReadOnlySource {
fn len(&self) -> usize {
self.as_slice().len()
self.stop - self.start
}
}
impl Clone for ReadOnlySource {
fn clone(&self) -> Self {
self.slice(0, self.len())
self.slice_from(0)
}
}
impl From<Vec<u8>> for ReadOnlySource {
fn from(data: Vec<u8>) -> ReadOnlySource {
let shared_data = SharedVecSlice::from(data);
ReadOnlySource::Anonymous(shared_data)
ReadOnlySource::new(data)
}
}
}

View File

@@ -1,41 +0,0 @@
use std::sync::Arc;
#[derive(Clone)]
pub struct SharedVecSlice {
pub data: Arc<Vec<u8>>,
pub start: usize,
pub len: usize,
}
impl SharedVecSlice {
pub fn empty() -> SharedVecSlice {
SharedVecSlice::new(Arc::new(Vec::new()))
}
pub fn new(data: Arc<Vec<u8>>) -> SharedVecSlice {
let data_len = data.len();
SharedVecSlice {
data,
start: 0,
len: data_len,
}
}
pub fn as_slice(&self) -> &[u8] {
&self.data[self.start..self.start + self.len]
}
pub fn slice(&self, from_offset: usize, to_offset: usize) -> SharedVecSlice {
SharedVecSlice {
data: Arc::clone(&self.data),
start: self.start + from_offset,
len: to_offset - from_offset,
}
}
}
impl From<Vec<u8>> for SharedVecSlice {
fn from(data: Vec<u8>) -> SharedVecSlice {
SharedVecSlice::new(Arc::new(data))
}
}

View File

@@ -79,8 +79,11 @@ impl<Item: FastValue> FastFieldReader<Item> {
// TODO change start to `u64`.
// For multifastfield, start is an index in a second fastfield, not a `DocId`
pub fn get_range(&self, start: u32, output: &mut [Item]) {
for (i, out) in output.iter_mut().enumerate() {
*out = self.get(start + i as u32);
// ok: Item is either `u64` or `i64`
let output_u64: &mut [u64] = unsafe { &mut *(output as *mut [Item] as *mut [u64]) };
self.bit_unpacker.get_range(start, output_u64);
for out in output_u64.iter_mut() {
*out = Item::from_u64(*out + self.min_value_u64).as_u64();
}
}

View File

@@ -26,6 +26,7 @@ use schema::Document;
use schema::IndexRecordOption;
use schema::Term;
use std::mem;
use std::mem::swap;
use std::thread;
use std::thread::JoinHandle;
use Result;
@@ -51,19 +52,17 @@ type DocumentReceiver = channel::Receiver<AddOperation>;
///
/// Returns (the heap size in bytes, the hash table size in number of bits)
fn initial_table_size(per_thread_memory_budget: usize) -> usize {
assert!(per_thread_memory_budget > 1_000);
let table_size_limit: usize = per_thread_memory_budget / 3;
if let Some(limit) = (1..)
(1..)
.take_while(|num_bits: &usize| compute_table_size(*num_bits) < table_size_limit)
.last()
{
limit.min(19) // we cap it at 2^19 = 512K.
} else {
unreachable!(
"Per thread memory is too small: {}",
per_thread_memory_budget
);
}
.unwrap_or_else(|| {
panic!(
"Per thread memory is too small: {}",
per_thread_memory_budget
)
})
.min(19) // we cap it at 512K
}
/// `IndexWriter` is the user entry-point to add document to an index.
@@ -303,7 +302,7 @@ fn index_documents(
let last_docstamp: u64 = *(doc_opstamps.last().unwrap());
let delete_bitset_opt = if delete_cursor.get().is_some() {
let segment_entry: SegmentEntry = if delete_cursor.get().is_some() {
let doc_to_opstamps = DocToOpstampMapping::from(doc_opstamps);
let segment_reader = SegmentReader::open(segment)?;
let mut deleted_bitset = BitSet::with_capacity(num_docs as usize);
@@ -314,17 +313,18 @@ fn index_documents(
&doc_to_opstamps,
last_docstamp,
)?;
if may_have_deletes {
Some(deleted_bitset)
} else {
None
}
SegmentEntry::new(segment_meta, delete_cursor, {
if may_have_deletes {
Some(deleted_bitset)
} else {
None
}
})
} else {
// if there are no delete operation in the queue, no need
// to even open the segment.
None
SegmentEntry::new(segment_meta, delete_cursor, None)
};
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor, delete_bitset_opt);
Ok(segment_updater.add_segment(generation, segment_entry))
}
@@ -467,10 +467,11 @@ impl IndexWriter {
///
/// Returns the former segment_ready channel.
fn recreate_document_channel(&mut self) -> DocumentReceiver {
let (document_sender, document_receiver): (DocumentSender, DocumentReceiver) =
let (mut document_sender, mut document_receiver): (DocumentSender, DocumentReceiver) =
channel::bounded(PIPELINE_MAX_SIZE_IN_DOCS);
mem::replace(&mut self.document_sender, document_sender);
mem::replace(&mut self.document_receiver, document_receiver)
swap(&mut self.document_sender, &mut document_sender);
swap(&mut self.document_receiver, &mut document_receiver);
document_receiver
}
/// Rollback to the last commit
@@ -564,6 +565,7 @@ impl IndexWriter {
let indexing_worker_result = worker_handle
.join()
.map_err(|e| TantivyError::ErrorInThread(format!("{:?}", e)))?;
indexing_worker_result?;
// add a new worker for the next generation.
self.add_indexing_worker()?;
@@ -734,7 +736,7 @@ mod tests {
index_writer.add_document(doc!(text_field=>"b"));
index_writer.add_document(doc!(text_field=>"c"));
}
assert_eq!(index_writer.commit().unwrap(), 2u64);
assert_eq!(index_writer.commit().unwrap(), 3u64);
index.load_searchers().unwrap();
assert_eq!(num_docs_containing("a"), 0);
assert_eq!(num_docs_containing("b"), 1);
@@ -797,7 +799,6 @@ mod tests {
{
let mut prepared_commit = index_writer.prepare_commit().expect("commit failed");
prepared_commit.set_payload("first commit");
assert_eq!(prepared_commit.opstamp(), 100);
prepared_commit.commit().expect("commit failed");
}
{
@@ -831,7 +832,6 @@ mod tests {
{
let mut prepared_commit = index_writer.prepare_commit().expect("commit failed");
prepared_commit.set_payload("first commit");
assert_eq!(prepared_commit.opstamp(), 100);
prepared_commit.abort().expect("commit failed");
}
{

View File

@@ -654,6 +654,7 @@ mod tests {
use schema::IntOptions;
use schema::Term;
use schema::TextFieldIndexing;
use schema::INT_INDEXED;
use std::io::Cursor;
use DocAddress;
use IndexWriter;
@@ -983,7 +984,7 @@ mod tests {
.wait()
.expect("Merging failed");
index.load_searchers().unwrap();
let ref searcher = *index.searcher();
let searcher = index.searcher();
assert_eq!(searcher.segment_readers().len(), 1);
assert_eq!(searcher.num_docs(), 3);
assert_eq!(searcher.segment_readers()[0].num_docs(), 3);
@@ -1029,7 +1030,7 @@ mod tests {
index_writer.commit().unwrap();
index.load_searchers().unwrap();
let ref searcher = *index.searcher();
let searcher = index.searcher();
assert_eq!(searcher.segment_readers().len(), 1);
assert_eq!(searcher.num_docs(), 2);
assert_eq!(searcher.segment_readers()[0].num_docs(), 2);
@@ -1125,6 +1126,7 @@ mod tests {
{
// Test removing all docs
index_writer.delete_term(Term::from_field_text(text_field, "g"));
index_writer.commit().unwrap();
let segment_ids = index
.searchable_segment_ids()
.expect("Searchable segments failed.");
@@ -1255,8 +1257,6 @@ mod tests {
}
}
use schema::INT_INDEXED;
#[test]
fn test_bug_merge() {
let mut schema_builder = schema::Schema::builder();

View File

@@ -18,7 +18,6 @@ use indexer::delete_queue::DeleteCursor;
use indexer::index_writer::advance_deletes;
use indexer::merger::IndexMerger;
use indexer::stamper::Stamper;
use indexer::MergeCandidate;
use indexer::SegmentEntry;
use indexer::SegmentSerializer;
use indexer::{DefaultMergePolicy, MergePolicy};
@@ -45,8 +44,15 @@ use Result;
/// and flushed.
///
/// This method is not part of tantivy's public API
pub fn save_new_metas(schema: Schema, opstamp: u64, directory: &mut Directory) -> Result<()> {
save_metas(vec![], schema, opstamp, None, directory)
pub fn save_new_metas(schema: Schema, directory: &mut Directory) -> Result<()> {
save_metas(
&IndexMeta {
segments: Vec::new(),
schema,
opstamp: 0u64,
payload: None
},
directory)
}
/// Save the index meta file.
@@ -58,20 +64,17 @@ pub fn save_new_metas(schema: Schema, opstamp: u64, directory: &mut Directory) -
/// and flushed.
///
/// This method is not part of tantivy's public API
pub fn save_metas(
segment_metas: Vec<SegmentMeta>,
schema: Schema,
opstamp: u64,
payload: Option<String>,
fn save_metas(
metas: &IndexMeta,
directory: &mut Directory,
) -> Result<()> {
let metas = IndexMeta {
segments: segment_metas,
schema,
opstamp,
payload,
};
let mut buffer = serde_json::to_vec_pretty(&metas)?;
// let metas = IndexMeta {
// segments: segment_metas,
// schema,
// opstamp,
// payload,
// };
let mut buffer = serde_json::to_vec_pretty(metas)?;
writeln!(&mut buffer)?;
directory.atomic_write(&META_FILEPATH, &buffer[..])?;
debug!("Saved metas {:?}", serde_json::to_string_pretty(&metas));
@@ -86,6 +89,11 @@ pub fn save_metas(
#[derive(Clone)]
pub struct SegmentUpdater(Arc<InnerSegmentUpdater>);
struct MergeOperation {
pub target_opstamp: u64,
pub segment_ids: Vec<SegmentId>,
}
fn perform_merge(
index: &Index,
mut segment_entries: Vec<SegmentEntry>,
@@ -126,6 +134,13 @@ fn perform_merge(
}
struct InnerSegmentUpdater {
// we keep a copy of the current active IndexMeta to
// avoid loading the file everytime we need it in the
// `SegmentUpdater`.
//
// This should be up to date as all update happen through
// the unique active `SegmentUpdater`.
active_metas: RwLock<Arc<IndexMeta>>,
pool: CpuPool,
index: Index,
segment_manager: SegmentManager,
@@ -149,7 +164,9 @@ impl SegmentUpdater {
.name_prefix("segment_updater")
.pool_size(1)
.create();
let index_meta = index.load_metas()?;
Ok(SegmentUpdater(Arc::new(InnerSegmentUpdater {
active_metas: RwLock::new(Arc::new(index_meta)),
pool,
index,
segment_manager,
@@ -244,14 +261,18 @@ impl SegmentUpdater {
//
// Segment 1 from disk 1, Segment 1 from disk 2, etc.
commited_segment_metas.sort_by_key(|segment_meta| -(segment_meta.max_doc() as i32));
save_metas(
commited_segment_metas,
index.schema(),
let index_meta = IndexMeta {
segments: commited_segment_metas,
schema: index.schema(),
opstamp,
commit_message,
payload: commit_message
};
save_metas(
&index_meta,
directory.box_clone().borrow_mut(),
)
.expect("Could not save metas.");
self.store_meta(&index_meta);
}
}
@@ -286,16 +307,27 @@ impl SegmentUpdater {
}
pub fn start_merge(&self, segment_ids: &[SegmentId]) -> Result<Receiver<SegmentMeta>> {
//let future_merged_segment = */
let segment_ids_vec = segment_ids.to_vec();
let commit_opstamp = self.load_metas().opstamp;
self.run_async(move |segment_updater| {
segment_updater.start_merge_impl(&segment_ids_vec[..])
segment_updater.start_merge_impl(&segment_ids_vec[..], commit_opstamp)
})
.wait()?
}
fn store_meta(&self, index_meta: &IndexMeta) {
*self.0.active_metas.write().unwrap() = Arc::new(index_meta.clone());
}
fn load_metas(&self) -> Arc<IndexMeta> {
self.0.active_metas.read().unwrap().clone()
}
// `segment_ids` is required to be non-empty.
fn start_merge_impl(&self, segment_ids: &[SegmentId]) -> Result<Receiver<SegmentMeta>> {
fn start_merge_impl(
&self,
segment_ids: &[SegmentId],
target_opstamp: u64,
) -> Result<Receiver<SegmentMeta>> {
assert!(!segment_ids.is_empty(), "Segment_ids cannot be empty.");
let segment_updater_clone = self.clone();
@@ -310,8 +342,6 @@ impl SegmentUpdater {
);
let (merging_future_send, merging_future_recv) = oneshot();
let target_opstamp = self.0.stamper.stamp();
// first we need to apply deletes to our segment.
let merging_join_handle = thread::Builder::new()
.name(format!("mergingthread-{}", merging_thread_id))
@@ -373,11 +403,32 @@ impl SegmentUpdater {
// Committed segments cannot be merged with uncommitted_segments.
// We therefore consider merges using these two sets of segments independently.
let merge_policy = self.get_merge_policy();
let mut merge_candidates = merge_policy.compute_merge_candidates(&uncommitted_segments);
let committed_merge_candidates = merge_policy.compute_merge_candidates(&committed_segments);
merge_candidates.extend_from_slice(&committed_merge_candidates[..]);
for MergeCandidate(segment_metas) in merge_candidates {
match self.start_merge_impl(&segment_metas) {
let current_opstamp = self.0.stamper.stamp();
let mut merge_candidates = merge_policy
.compute_merge_candidates(&uncommitted_segments)
.into_iter()
.map(|merge_candidate| MergeOperation {
target_opstamp: current_opstamp,
segment_ids: merge_candidate.0,
})
.collect::<Vec<_>>();
let commit_opstamp = self.load_metas().opstamp;
let committed_merge_candidates = merge_policy
.compute_merge_candidates(&committed_segments)
.into_iter()
.map(|merge_candidate| MergeOperation {
target_opstamp: commit_opstamp,
segment_ids: merge_candidate.0,
})
.collect::<Vec<_>>();
merge_candidates.extend(committed_merge_candidates.into_iter());
for MergeOperation {
target_opstamp,
segment_ids,
} in merge_candidates
{
match self.start_merge_impl(&segment_ids, target_opstamp) {
Ok(merge_future) => {
if let Err(e) = merge_future.fuse().poll() {
error!("The merge task failed quickly after starting: {:?}", e);
@@ -412,12 +463,7 @@ impl SegmentUpdater {
info!("End merge {:?}", after_merge_segment_entry.meta());
let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone();
if let Some(delete_operation) = delete_cursor.get() {
let committed_opstamp = segment_updater
.0
.index
.load_metas()
.expect("Failed to read opstamp")
.opstamp;
let committed_opstamp = segment_updater.load_metas().opstamp;
if delete_operation.opstamp < committed_opstamp {
let index = &segment_updater.0.index;
let segment = index.segment(after_merge_segment_entry.meta().clone());
@@ -446,8 +492,8 @@ impl SegmentUpdater {
.end_merge(&before_merge_segment_ids, after_merge_segment_entry);
segment_updater.consider_merge_options();
info!("save metas");
let previous_metas = segment_updater.0.index.load_metas().unwrap();
segment_updater.save_metas(previous_metas.opstamp, previous_metas.payload);
let previous_metas = segment_updater.load_metas();
segment_updater.save_metas(previous_metas.opstamp, previous_metas.payload.clone());
segment_updater.garbage_collect_files_exec();
})
.wait()

View File

@@ -1,50 +1,68 @@
use std::sync::Arc;
use std::sync::atomic::Ordering;
// AtomicU64 have not landed in stable.
// For the moment let's just use AtomicUsize on
// x86/64 bit platform, and a mutex on other platform.
#[cfg(target = "x86_64")]
#[cfg(target_arch = "x86_64")]
mod archicture_impl {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
#[derive(Clone, Default)]
pub struct Stamper(Arc<AtomicU64>);
#[derive(Default)]
pub struct AtomicU64Ersatz(AtomicUsize);
impl Stamper {
pub fn new(first_opstamp: u64) -> Stamper {
Stamper(Arc::new(AtomicU64::new(first_opstamp)))
impl AtomicU64Ersatz {
pub fn new(first_opstamp: u64) -> AtomicU64Ersatz {
AtomicU64Ersatz(AtomicUsize::new(first_opstamp as usize))
}
pub fn stamp(&self) -> u64 {
self.0.fetch_add(1u64, Ordering::SeqCst) as u64
pub fn fetch_add(&self, val: u64, order: Ordering) -> u64 {
self.0.fetch_add(val as usize, order) as u64
}
}
}
#[cfg(not(target = "x86_64"))]
#[cfg(not(target_arch = "x86_64"))]
mod archicture_impl {
use std::sync::{Arc, Mutex};
use std::sync::atomic::Ordering;
/// Under other architecture, we rely on a mutex.
use std::sync::RwLock;
#[derive(Clone, Default)]
pub struct Stamper(Arc<Mutex<u64>>);
#[derive(Default)]
pub struct AtomicU64Ersatz(RwLock<u64>);
impl Stamper {
pub fn new(first_opstamp: u64) -> Stamper {
Stamper(Arc::new(Mutex::new(first_opstamp)))
impl AtomicU64Ersatz {
pub fn new(first_opstamp: u64) -> AtomicU64Ersatz {
AtomicU64Ersatz(RwLock::new(first_opstamp))
}
pub fn stamp(&self) -> u64 {
let mut guard = self.0.lock().expect("Failed to lock the stamper");
let previous_val = *guard;
*guard = previous_val + 1;
pub fn fetch_add(&self, incr: u64, _order: Ordering) -> u64 {
let mut lock = self.0.write().unwrap();
let previous_val = *lock;
*lock = previous_val + incr;
previous_val
}
}
}
pub use self::archicture_impl::Stamper;
use self::archicture_impl::AtomicU64Ersatz;
#[derive(Clone, Default)]
pub struct Stamper(Arc<AtomicU64Ersatz>);
impl Stamper {
pub fn new(first_opstamp: u64) -> Stamper {
Stamper(Arc::new(AtomicU64Ersatz::new(first_opstamp)))
}
pub fn stamp(&self) -> u64 {
self.0.fetch_add(1u64, Ordering::SeqCst) as u64
}
}
#[cfg(test)]
mod test {

View File

@@ -123,6 +123,8 @@ extern crate log;
#[macro_use]
extern crate failure;
#[cfg(feature = "mmap")]
extern crate memmap;
#[cfg(feature = "mmap")]
extern crate atomicwrites;
extern crate base64;
@@ -135,8 +137,7 @@ extern crate combine;
extern crate crossbeam;
extern crate fnv;
extern crate fst;
extern crate fst_regex;
extern crate tantivy_fst;
extern crate futures;
extern crate futures_cpupool;
extern crate htmlescape;

View File

@@ -1,8 +1,6 @@
use super::stacker::{Addr, MemoryArena, TermHashMap};
use postings::recorder::{
BufferLender, NothingRecorder, Recorder, TFAndPositionRecorder, TermFrequencyRecorder,
};
use postings::recorder::{NothingRecorder, Recorder, TFAndPositionRecorder, TermFrequencyRecorder};
use postings::UnorderedTermId;
use postings::{FieldSerializer, InvertedIndexSerializer};
use schema::IndexRecordOption;
@@ -215,7 +213,7 @@ pub trait PostingsWriter {
/// The `SpecializedPostingsWriter` is just here to remove dynamic
/// dispatch to the recorder information.
pub(crate) struct SpecializedPostingsWriter<Rec: Recorder + 'static> {
pub struct SpecializedPostingsWriter<Rec: Recorder + 'static> {
total_num_tokens: u64,
_recorder_type: PhantomData<Rec>,
}
@@ -247,7 +245,8 @@ impl<Rec: Recorder + 'static> PostingsWriter for SpecializedPostingsWriter<Rec>
debug_assert!(term.as_slice().len() >= 4);
self.total_num_tokens += 1;
term_index.mutate_or_create(term, |opt_recorder: Option<Rec>| {
if let Some(mut recorder) = opt_recorder {
if opt_recorder.is_some() {
let mut recorder = opt_recorder.unwrap();
let current_doc = recorder.current_doc();
if current_doc != doc {
recorder.close_doc(heap);
@@ -256,7 +255,7 @@ impl<Rec: Recorder + 'static> PostingsWriter for SpecializedPostingsWriter<Rec>
recorder.record_position(position, heap);
recorder
} else {
let mut recorder = Rec::new();
let mut recorder = Rec::new(heap);
recorder.new_doc(doc, heap);
recorder.record_position(position, heap);
recorder
@@ -271,11 +270,10 @@ impl<Rec: Recorder + 'static> PostingsWriter for SpecializedPostingsWriter<Rec>
termdict_heap: &MemoryArena,
heap: &MemoryArena,
) -> io::Result<()> {
let mut buffer_lender = BufferLender::default();
for &(term_bytes, addr, _) in term_addrs {
let recorder: Rec = termdict_heap.read(addr);
let recorder: Rec = unsafe { termdict_heap.read(addr) };
serializer.new_term(&term_bytes[4..])?;
recorder.serialize(&mut buffer_lender, serializer, heap)?;
recorder.serialize(serializer, heap)?;
serializer.close_term()?;
}
Ok(())

View File

@@ -1,51 +1,10 @@
use super::stacker::{ExpUnrolledLinkedList, MemoryArena};
use common::{read_u32_vint, write_u32_vint};
use postings::FieldSerializer;
use std::io;
use std::{self, io};
use DocId;
const EMPTY_ARRAY: [u32; 0] = [0u32; 0];
const POSITION_END: u32 = 0;
#[derive(Default)]
pub(crate) struct BufferLender {
buffer_u8: Vec<u8>,
buffer_u32: Vec<u32>,
}
impl BufferLender {
pub fn lend_u8(&mut self) -> &mut Vec<u8> {
self.buffer_u8.clear();
&mut self.buffer_u8
}
pub fn lend_all(&mut self) -> (&mut Vec<u8>, &mut Vec<u32>) {
self.buffer_u8.clear();
self.buffer_u32.clear();
(&mut self.buffer_u8, &mut self.buffer_u32)
}
}
pub struct VInt32Reader<'a> {
data: &'a [u8],
}
impl<'a> VInt32Reader<'a> {
fn new(data: &'a [u8]) -> VInt32Reader<'a> {
VInt32Reader { data }
}
}
impl<'a> Iterator for VInt32Reader<'a> {
type Item = u32;
fn next(&mut self) -> Option<u32> {
if self.data.is_empty() {
None
} else {
Some(read_u32_vint(&mut self.data))
}
}
}
const POSITION_END: u32 = std::u32::MAX;
/// Recorder is in charge of recording relevant information about
/// the presence of a term in a document.
@@ -56,9 +15,9 @@ impl<'a> Iterator for VInt32Reader<'a> {
/// * the document id
/// * the term frequency
/// * the term positions
pub(crate) trait Recorder: Copy + 'static {
pub trait Recorder: Copy {
///
fn new() -> Self;
fn new(heap: &mut MemoryArena) -> Self;
/// Returns the current document
fn current_doc(&self) -> u32;
/// Starts recording information about a new document
@@ -70,12 +29,7 @@ pub(crate) trait Recorder: Copy + 'static {
/// Close the document. It will help record the term frequency.
fn close_doc(&mut self, heap: &mut MemoryArena);
/// Pushes the postings information to the serializer.
fn serialize(
&self,
buffer_lender: &mut BufferLender,
serializer: &mut FieldSerializer,
heap: &MemoryArena,
) -> io::Result<()>;
fn serialize(&self, serializer: &mut FieldSerializer, heap: &MemoryArena) -> io::Result<()>;
}
/// Only records the doc ids
@@ -86,9 +40,9 @@ pub struct NothingRecorder {
}
impl Recorder for NothingRecorder {
fn new() -> Self {
fn new(heap: &mut MemoryArena) -> Self {
NothingRecorder {
stack: ExpUnrolledLinkedList::new(),
stack: ExpUnrolledLinkedList::new(heap),
current_doc: u32::max_value(),
}
}
@@ -99,23 +53,16 @@ impl Recorder for NothingRecorder {
fn new_doc(&mut self, doc: DocId, heap: &mut MemoryArena) {
self.current_doc = doc;
let _ = write_u32_vint(doc, &mut self.stack.writer(heap));
self.stack.push(doc, heap);
}
fn record_position(&mut self, _position: u32, _heap: &mut MemoryArena) {}
fn close_doc(&mut self, _heap: &mut MemoryArena) {}
fn serialize(
&self,
buffer_lender: &mut BufferLender,
serializer: &mut FieldSerializer,
heap: &MemoryArena,
) -> io::Result<()> {
let buffer = buffer_lender.lend_u8();
self.stack.read_to_end(heap, buffer);
for doc in VInt32Reader::new(&buffer[..]) {
serializer.write_doc(doc as u32, 0u32, &EMPTY_ARRAY)?;
fn serialize(&self, serializer: &mut FieldSerializer, heap: &MemoryArena) -> io::Result<()> {
for doc in self.stack.iter(heap) {
serializer.write_doc(doc, 0u32, &EMPTY_ARRAY)?;
}
Ok(())
}
@@ -130,9 +77,9 @@ pub struct TermFrequencyRecorder {
}
impl Recorder for TermFrequencyRecorder {
fn new() -> Self {
fn new(heap: &mut MemoryArena) -> Self {
TermFrequencyRecorder {
stack: ExpUnrolledLinkedList::new(),
stack: ExpUnrolledLinkedList::new(heap),
current_doc: u32::max_value(),
current_tf: 0u32,
}
@@ -144,7 +91,7 @@ impl Recorder for TermFrequencyRecorder {
fn new_doc(&mut self, doc: DocId, heap: &mut MemoryArena) {
self.current_doc = doc;
let _ = write_u32_vint(doc, &mut self.stack.writer(heap));
self.stack.push(doc, heap);
}
fn record_position(&mut self, _position: u32, _heap: &mut MemoryArena) {
@@ -153,24 +100,24 @@ impl Recorder for TermFrequencyRecorder {
fn close_doc(&mut self, heap: &mut MemoryArena) {
debug_assert!(self.current_tf > 0);
let _ = write_u32_vint(self.current_tf, &mut self.stack.writer(heap));
self.stack.push(self.current_tf, heap);
self.current_tf = 0;
}
fn serialize(
&self,
buffer_lender: &mut BufferLender,
serializer: &mut FieldSerializer,
heap: &MemoryArena,
) -> io::Result<()> {
let buffer = buffer_lender.lend_u8();
self.stack.read_to_end(heap, buffer);
let mut u32_it = VInt32Reader::new(&buffer[..]);
while let Some(doc) = u32_it.next() {
let term_freq = u32_it.next().unwrap_or(self.current_tf);
serializer.write_doc(doc as u32, term_freq, &EMPTY_ARRAY)?;
}
fn serialize(&self, serializer: &mut FieldSerializer, heap: &MemoryArena) -> io::Result<()> {
// the last document has not been closed...
// its term freq is self.current_tf.
let mut doc_iter = self
.stack
.iter(heap)
.chain(Some(self.current_tf).into_iter());
while let Some(doc) = doc_iter.next() {
let term_freq = doc_iter
.next()
.expect("The IndexWriter recorded a doc without a term freq.");
serializer.write_doc(doc, term_freq, &EMPTY_ARRAY)?;
}
Ok(())
}
}
@@ -181,10 +128,11 @@ pub struct TFAndPositionRecorder {
stack: ExpUnrolledLinkedList,
current_doc: DocId,
}
impl Recorder for TFAndPositionRecorder {
fn new() -> Self {
fn new(heap: &mut MemoryArena) -> Self {
TFAndPositionRecorder {
stack: ExpUnrolledLinkedList::new(),
stack: ExpUnrolledLinkedList::new(heap),
current_doc: u32::max_value(),
}
}
@@ -195,88 +143,33 @@ impl Recorder for TFAndPositionRecorder {
fn new_doc(&mut self, doc: DocId, heap: &mut MemoryArena) {
self.current_doc = doc;
let _ = write_u32_vint(doc, &mut self.stack.writer(heap));
self.stack.push(doc, heap);
}
fn record_position(&mut self, position: u32, heap: &mut MemoryArena) {
let _ = write_u32_vint(position + 1u32, &mut self.stack.writer(heap));
self.stack.push(position, heap);
}
fn close_doc(&mut self, heap: &mut MemoryArena) {
let _ = write_u32_vint(POSITION_END, &mut self.stack.writer(heap));
self.stack.push(POSITION_END, heap);
}
fn serialize(
&self,
buffer_lender: &mut BufferLender,
serializer: &mut FieldSerializer,
heap: &MemoryArena,
) -> io::Result<()> {
let (buffer_u8, buffer_positions) = buffer_lender.lend_all();
self.stack.read_to_end(heap, buffer_u8);
let mut u32_it = VInt32Reader::new(&buffer_u8[..]);
while let Some(doc) = u32_it.next() {
let mut prev_position_plus_one = 1u32;
buffer_positions.clear();
loop {
match u32_it.next() {
Some(POSITION_END) | None => {
break;
}
Some(position_plus_one) => {
let delta_position = position_plus_one - prev_position_plus_one;
buffer_positions.push(delta_position);
prev_position_plus_one = position_plus_one;
}
fn serialize(&self, serializer: &mut FieldSerializer, heap: &MemoryArena) -> io::Result<()> {
let mut doc_positions = Vec::with_capacity(100);
let mut positions_iter = self.stack.iter(heap);
while let Some(doc) = positions_iter.next() {
let mut prev_position = 0;
doc_positions.clear();
for position in &mut positions_iter {
if position == POSITION_END {
break;
} else {
doc_positions.push(position - prev_position);
prev_position = position;
}
}
serializer.write_doc(doc, buffer_positions.len() as u32, &buffer_positions)?;
serializer.write_doc(doc, doc_positions.len() as u32, &doc_positions)?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::write_u32_vint;
use super::BufferLender;
use super::VInt32Reader;
#[test]
fn test_buffer_lender() {
let mut buffer_lender = BufferLender::default();
{
let buf = buffer_lender.lend_u8();
assert!(buf.is_empty());
buf.push(1u8);
}
{
let buf = buffer_lender.lend_u8();
assert!(buf.is_empty());
buf.push(1u8);
}
{
let (_, buf) = buffer_lender.lend_all();
assert!(buf.is_empty());
buf.push(1u32);
}
{
let (_, buf) = buffer_lender.lend_all();
assert!(buf.is_empty());
buf.push(1u32);
}
}
#[test]
fn test_vint_u32() {
let mut buffer = vec![];
let vals = [0, 1, 324_234_234, u32::max_value()];
for &i in &vals {
assert!(write_u32_vint(i, &mut buffer).is_ok());
}
assert_eq!(buffer.len(), 1 + 1 + 5 + 5);
let res: Vec<u32> = VInt32Reader::new(&buffer[..]).collect();
assert_eq!(&res[..], &vals[..]);
}
}

View File

@@ -2,7 +2,7 @@ use common::BitSet;
use common::HasLen;
use common::{BinarySerializable, VInt};
use docset::{DocSet, SkipResult};
use fst::Streamer;
use tantivy_fst::Streamer;
use owned_read::OwnedRead;
use positions::PositionReader;
use postings::compression::compressed_block_size;
@@ -628,7 +628,7 @@ mod tests {
use common::HasLen;
use core::Index;
use docset::DocSet;
use fst::Streamer;
use tantivy_fst::Streamer;
use schema::IndexRecordOption;
use schema::Schema;
use schema::Term;

View File

@@ -1,37 +1,28 @@
use super::{Addr, MemoryArena};
use postings::stacker::memory_arena::load;
use postings::stacker::memory_arena::store;
use std::io;
use common::is_power_of_2;
use std::mem;
const MAX_BLOCK_LEN: u32 = 1u32 << 15;
const FIRST_BLOCK: usize = 16;
const INLINED_BLOCK_LEN: usize = FIRST_BLOCK + mem::size_of::<Addr>();
enum CapacityResult {
Available(u32),
NeedAlloc(u32),
}
const FIRST_BLOCK: u32 = 4u32;
fn len_to_capacity(len: u32) -> CapacityResult {
#[inline]
pub fn jump_needed(len: u32) -> Option<usize> {
match len {
0...15 => CapacityResult::Available(FIRST_BLOCK as u32 - len),
16...MAX_BLOCK_LEN => {
let cap = 1 << (32u32 - (len - 1u32).leading_zeros());
let available = cap - len;
if available == 0 {
CapacityResult::NeedAlloc(len)
0...3 => None,
4...MAX_BLOCK_LEN => {
if is_power_of_2(len as usize) {
Some(len as usize)
} else {
CapacityResult::Available(available)
None
}
}
n => {
let available = n % MAX_BLOCK_LEN;
if available == 0 {
CapacityResult::NeedAlloc(MAX_BLOCK_LEN)
if n % MAX_BLOCK_LEN == 0 {
Some(MAX_BLOCK_LEN as usize)
} else {
CapacityResult::Available(MAX_BLOCK_LEN - available)
None
}
}
}
@@ -61,119 +52,82 @@ fn len_to_capacity(len: u32) -> CapacityResult {
#[derive(Debug, Clone, Copy)]
pub struct ExpUnrolledLinkedList {
len: u32,
head: Addr,
tail: Addr,
inlined_data: [u8; INLINED_BLOCK_LEN as usize],
}
pub struct ExpUnrolledLinkedListWriter<'a> {
eull: &'a mut ExpUnrolledLinkedList,
heap: &'a mut MemoryArena,
}
fn ensure_capacity<'a>(
eull: &'a mut ExpUnrolledLinkedList,
heap: &'a mut MemoryArena,
) -> &'a mut [u8] {
if eull.len <= FIRST_BLOCK as u32 {
// We are still hitting the inline block.
if eull.len < FIRST_BLOCK as u32 {
return &mut eull.inlined_data[eull.len as usize..FIRST_BLOCK];
}
// We need to allocate a new block!
let new_block_addr: Addr = heap.allocate_space(FIRST_BLOCK + mem::size_of::<Addr>());
store(&mut eull.inlined_data[FIRST_BLOCK..], new_block_addr);
eull.tail = new_block_addr;
return heap.slice_mut(eull.tail, FIRST_BLOCK);
}
let len = match len_to_capacity(eull.len) {
CapacityResult::NeedAlloc(new_block_len) => {
let new_block_addr: Addr =
heap.allocate_space(new_block_len as usize + mem::size_of::<Addr>());
heap.write_at(eull.tail, new_block_addr);
eull.tail = new_block_addr;
new_block_len
}
CapacityResult::Available(available) => available,
};
heap.slice_mut(eull.tail, len as usize)
}
impl<'a> ExpUnrolledLinkedListWriter<'a> {
pub fn extend_from_slice(&mut self, mut buf: &[u8]) {
if buf.is_empty() {
// we need to cut early, because `ensure_capacity`
// allocates if there is no capacity at all right now.
return;
}
while !buf.is_empty() {
let add_len: usize;
{
let output_buf = ensure_capacity(self.eull, self.heap);
add_len = buf.len().min(output_buf.len());
output_buf[..add_len].copy_from_slice(&buf[..add_len]);
}
self.eull.len += add_len as u32;
self.eull.tail = self.eull.tail.offset(add_len as u32);
buf = &buf[add_len..];
}
}
}
impl<'a> io::Write for ExpUnrolledLinkedListWriter<'a> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
// There is no use case to only write the capacity.
// This is not IO after all, so we write the whole
// buffer even if the contract of `.write` is looser.
self.extend_from_slice(buf);
Ok(buf.len())
}
fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
self.extend_from_slice(buf);
Ok(())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl ExpUnrolledLinkedList {
pub fn new() -> ExpUnrolledLinkedList {
pub fn new(heap: &mut MemoryArena) -> ExpUnrolledLinkedList {
let addr = heap.allocate_space((FIRST_BLOCK as usize) * mem::size_of::<u32>());
ExpUnrolledLinkedList {
len: 0u32,
tail: Addr::null_pointer(),
inlined_data: [0u8; INLINED_BLOCK_LEN as usize],
head: addr,
tail: addr,
}
}
#[inline(always)]
pub fn writer<'a>(&'a mut self, heap: &'a mut MemoryArena) -> ExpUnrolledLinkedListWriter<'a> {
ExpUnrolledLinkedListWriter { eull: self, heap }
pub fn iter<'a>(&self, heap: &'a MemoryArena) -> ExpUnrolledLinkedListIterator<'a> {
ExpUnrolledLinkedListIterator {
heap,
addr: self.head,
len: self.len,
consumed: 0,
}
}
pub fn read_to_end(&self, heap: &MemoryArena, output: &mut Vec<u8>) {
let len = self.len as usize;
if len <= FIRST_BLOCK {
output.extend_from_slice(&self.inlined_data[..len]);
return;
/// Appends a new element to the current stack.
///
/// If the current block end is reached, a new block is allocated.
pub fn push(&mut self, val: u32, heap: &mut MemoryArena) {
self.len += 1;
if let Some(new_block_len) = jump_needed(self.len) {
// We need to allocate another block.
// We also allocate an extra `u32` to store the pointer
// to the future next block.
let new_block_size: usize = (new_block_len + 1) * mem::size_of::<u32>();
let new_block_addr: Addr = heap.allocate_space(new_block_size);
unsafe {
// logic
heap.write(self.tail, new_block_addr)
};
self.tail = new_block_addr;
}
output.extend_from_slice(&self.inlined_data[..FIRST_BLOCK]);
let mut cur = FIRST_BLOCK;
let mut addr = load(&self.inlined_data[FIRST_BLOCK..]);
loop {
let cap = match len_to_capacity(cur as u32) {
CapacityResult::Available(capacity) => capacity,
CapacityResult::NeedAlloc(capacity) => capacity,
} as usize;
let data = heap.slice(addr, cap);
if cur + cap >= len {
output.extend_from_slice(&data[..(len - cur)]);
return;
}
output.extend_from_slice(data);
cur += cap;
addr = heap.read(addr.offset(cap as u32));
unsafe {
// logic
heap.write(self.tail, val);
self.tail = self.tail.offset(mem::size_of::<u32>() as u32);
}
}
}
pub struct ExpUnrolledLinkedListIterator<'a> {
heap: &'a MemoryArena,
addr: Addr,
len: u32,
consumed: u32,
}
impl<'a> Iterator for ExpUnrolledLinkedListIterator<'a> {
type Item = u32;
fn next(&mut self) -> Option<u32> {
if self.consumed == self.len {
None
} else {
self.consumed += 1;
let addr: Addr = if jump_needed(self.consumed).is_some() {
unsafe {
// logic
self.heap.read(self.addr)
}
} else {
self.addr
};
self.addr = addr.offset(mem::size_of::<u32>() as u32);
Some(unsafe {
// logic
self.heap.read(addr)
})
}
}
}
@@ -182,134 +136,46 @@ impl ExpUnrolledLinkedList {
mod tests {
use super::super::MemoryArena;
use super::len_to_capacity;
use super::jump_needed;
use super::*;
use byteorder::{ByteOrder, LittleEndian, WriteBytesExt};
#[test]
#[test]
fn test_stack() {
let mut heap = MemoryArena::new();
let mut stack = ExpUnrolledLinkedList::new();
stack.writer(&mut heap).extend_from_slice(&[1u8]);
stack.writer(&mut heap).extend_from_slice(&[2u8]);
stack.writer(&mut heap).extend_from_slice(&[3u8, 4u8]);
stack.writer(&mut heap).extend_from_slice(&[5u8]);
let mut stack = ExpUnrolledLinkedList::new(&mut heap);
stack.push(1u32, &mut heap);
stack.push(2u32, &mut heap);
stack.push(4u32, &mut heap);
stack.push(8u32, &mut heap);
{
let mut buffer = Vec::new();
stack.read_to_end(&heap, &mut buffer);
assert_eq!(&buffer[..], &[1u8, 2u8, 3u8, 4u8, 5u8]);
let mut it = stack.iter(&heap);
assert_eq!(it.next().unwrap(), 1u32);
assert_eq!(it.next().unwrap(), 2u32);
assert_eq!(it.next().unwrap(), 4u32);
assert_eq!(it.next().unwrap(), 8u32);
assert!(it.next().is_none());
}
}
#[test]
fn test_stack_long() {
let mut heap = MemoryArena::new();
let mut stack = ExpUnrolledLinkedList::new();
let source: Vec<u32> = (0..100).collect();
for &el in &source {
assert!(stack
.writer(&mut heap)
.write_u32::<LittleEndian>(el)
.is_ok());
}
let mut buffer = Vec::new();
stack.read_to_end(&heap, &mut buffer);
let mut result = vec![];
let mut remaining = &buffer[..];
while !remaining.is_empty() {
result.push(LittleEndian::read_u32(&remaining[..4]));
remaining = &remaining[4..];
}
assert_eq!(&result[..], &source[..]);
}
#[test]
fn test_stack_interlaced() {
let mut heap = MemoryArena::new();
let mut stack = ExpUnrolledLinkedList::new();
let mut stack2 = ExpUnrolledLinkedList::new();
let mut vec1: Vec<u8> = vec![];
let mut vec2: Vec<u8> = vec![];
for i in 0..9 {
assert!(stack.writer(&mut heap).write_u32::<LittleEndian>(i).is_ok());
assert!(vec1.write_u32::<LittleEndian>(i).is_ok());
if i % 2 == 0 {
assert!(stack2
.writer(&mut heap)
.write_u32::<LittleEndian>(i)
.is_ok());
assert!(vec2.write_u32::<LittleEndian>(i).is_ok());
}
}
let mut res1 = vec![];
let mut res2 = vec![];
stack.read_to_end(&heap, &mut res1);
stack2.read_to_end(&heap, &mut res2);
assert_eq!(&vec1[..], &res1[..]);
assert_eq!(&vec2[..], &res2[..]);
}
#[test]
fn test_jump_if_needed() {
let mut available = 16u32;
for i in 0..10_000_000 {
match len_to_capacity(i) {
CapacityResult::NeedAlloc(cap) => {
assert_eq!(available, 0, "Failed len={}: Expected 0 got {}", i, cap);
available = cap;
}
CapacityResult::Available(cap) => {
assert_eq!(
available, cap,
"Failed len={}: Expected {} Got {}",
i, available, cap
);
}
}
available -= 1;
let mut block_len = 4u32;
let mut i = 0;
while i < 10_000_000 {
assert!(jump_needed(i + block_len - 1).is_none());
assert!(jump_needed(i + block_len + 1).is_none());
assert!(jump_needed(i + block_len).is_some());
let new_block_len = jump_needed(i + block_len).unwrap();
i += block_len;
block_len = new_block_len as u32;
}
}
#[test]
fn test_jump_if_needed_progression() {
let mut v = vec![];
for i in 0.. {
if v.len() >= 10 {
break;
}
match len_to_capacity(i) {
CapacityResult::NeedAlloc(cap) => {
v.push((i, cap));
}
_ => {}
}
}
assert_eq!(
&v[..],
&[
(16, 16),
(32, 32),
(64, 64),
(128, 128),
(256, 256),
(512, 512),
(1024, 1024),
(2048, 2048),
(4096, 4096),
(8192, 8192)
]
);
}
}
#[cfg(all(test, feature = "unstable"))]
mod bench {
use super::super::MemoryArena;
use super::ExpUnrolledLinkedList;
use byteorder::{NativeEndian, WriteBytesExt};
use test::Bencher;
const NUM_STACK: usize = 10_000;
@@ -337,13 +203,13 @@ mod bench {
let mut heap = MemoryArena::new();
let mut stacks = Vec::with_capacity(100);
for _ in 0..NUM_STACK {
let mut stack = ExpUnrolledLinkedList::new();
let mut stack = ExpUnrolledLinkedList::new(&mut heap);
stacks.push(stack);
}
for s in 0..NUM_STACK {
for i in 0u32..STACK_SIZE {
let t = s * 392017 % NUM_STACK;
let _ = stacks[t].writer(&mut heap).write_u32::<NativeEndian>(i);
stacks[t].push(i, &mut heap);
}
}
});

View File

@@ -37,7 +37,7 @@ const PAGE_SIZE: usize = 1 << NUM_BITS_PAGE_ADDR; // pages are 1 MB large
/// page of memory.
///
/// The last 20 bits are an address within this page of memory.
#[derive(Copy, Clone, Debug)]
#[derive(Clone, Copy, Debug)]
pub struct Addr(u32);
impl Addr {
@@ -69,16 +69,32 @@ impl Addr {
}
}
pub fn store<Item: Copy + 'static>(dest: &mut [u8], val: Item) {
assert_eq!(dest.len(), std::mem::size_of::<Item>());
unsafe {
ptr::write_unaligned(dest.as_mut_ptr() as *mut Item, val);
}
/// Trait required for an object to be `storable`.
///
/// # Warning
///
/// Most of the time you should not implement this trait,
/// and only use the `MemoryArena` with object implementing `Copy`.
///
/// `ArenaStorable` is used in `tantivy` to force
/// a `Copy` object and a `slice` of data to be stored contiguously.
pub trait ArenaStorable {
fn num_bytes(&self) -> usize;
unsafe fn write_into(self, arena: &mut MemoryArena, addr: Addr);
}
pub fn load<Item: Copy + 'static>(data: &[u8]) -> Item {
assert_eq!(data.len(), std::mem::size_of::<Item>());
unsafe { ptr::read_unaligned(data.as_ptr() as *const Item) }
impl<V> ArenaStorable for V
where
V: Copy,
{
fn num_bytes(&self) -> usize {
mem::size_of::<V>()
}
unsafe fn write_into(self, arena: &mut MemoryArena, addr: Addr) {
let dst_ptr = arena.get_mut_ptr(addr) as *mut V;
ptr::write_unaligned(dst_ptr, self);
}
}
/// The `MemoryArena`
@@ -110,9 +126,47 @@ impl MemoryArena {
self.pages.len() * PAGE_SIZE
}
pub fn write_at<Item: Copy + 'static>(&mut self, addr: Addr, val: Item) {
let dest = self.slice_mut(addr, std::mem::size_of::<Item>());
store(dest, val);
/// Writes a slice at the given address, assuming the
/// memory was allocated beforehands.
///
/// # Panics
///
/// May panic or corrupt the heap if he space was not
/// properly allocated beforehands.
pub fn write_bytes<B: AsRef<[u8]>>(&mut self, addr: Addr, data: B) {
let bytes = data.as_ref();
self.pages[addr.page_id()]
.get_mut_slice(addr.page_local_addr(), bytes.len())
.copy_from_slice(bytes);
}
/// Returns the `len` bytes starting at `addr`
///
/// # Panics
///
/// Panics if the memory has not been allocated beforehands.
pub fn read_slice(&self, addr: Addr, len: usize) -> &[u8] {
self.pages[addr.page_id()].get_slice(addr.page_local_addr(), len)
}
unsafe fn get_mut_ptr(&mut self, addr: Addr) -> *mut u8 {
self.pages[addr.page_id()].get_mut_ptr(addr.page_local_addr())
}
/// Stores an item's data in the heap
///
/// It allocates the `Item` beforehands.
pub fn store<Item: ArenaStorable>(&mut self, val: Item) -> Addr {
let num_bytes = val.num_bytes();
let addr = self.allocate_space(num_bytes);
unsafe {
self.write(addr, val);
};
addr
}
pub unsafe fn write<Item: ArenaStorable>(&mut self, addr: Addr, val: Item) {
val.write_into(self, addr)
}
/// Read an item in the heap at the given `address`.
@@ -120,21 +174,9 @@ impl MemoryArena {
/// # Panics
///
/// If the address is erroneous
pub fn read<Item: Copy + 'static>(&self, addr: Addr) -> Item {
load(self.slice(addr, mem::size_of::<Item>()))
}
pub fn slice(&self, addr: Addr, len: usize) -> &[u8] {
self.pages[addr.page_id()].slice(addr.page_local_addr(), len)
}
pub fn slice_from(&self, addr: Addr) -> &[u8] {
self.pages[addr.page_id()].slice_from(addr.page_local_addr())
}
#[inline(always)]
pub fn slice_mut(&mut self, addr: Addr, len: usize) -> &mut [u8] {
self.pages[addr.page_id()].slice_mut(addr.page_local_addr(), len)
pub unsafe fn read<Item: Copy>(&self, addr: Addr) -> Item {
let ptr = self.pages[addr.page_id()].get_ptr(addr.page_local_addr());
ptr::read_unaligned(ptr as *const Item)
}
/// Allocates `len` bytes and returns the allocated address.
@@ -155,10 +197,14 @@ struct Page {
impl Page {
fn new(page_id: usize) -> Page {
let mut data: Vec<u8> = Vec::with_capacity(PAGE_SIZE);
unsafe {
data.set_len(PAGE_SIZE);
} // avoid initializing page
Page {
page_id,
len: 0,
data: vec![0u8; PAGE_SIZE].into_boxed_slice(),
data: data.into_boxed_slice(),
}
}
@@ -167,18 +213,14 @@ impl Page {
len + self.len <= PAGE_SIZE
}
fn slice(&self, local_addr: usize, len: usize) -> &[u8] {
&self.slice_from(local_addr)[..len]
}
fn slice_from(&self, local_addr: usize) -> &[u8] {
&self.data[local_addr..]
}
fn slice_mut(&mut self, local_addr: usize, len: usize) -> &mut [u8] {
fn get_mut_slice(&mut self, local_addr: usize, len: usize) -> &mut [u8] {
&mut self.data[local_addr..][..len]
}
fn get_slice(&self, local_addr: usize, len: usize) -> &[u8] {
&self.data[local_addr..][..len]
}
fn allocate_space(&mut self, len: usize) -> Option<Addr> {
if self.is_available(len) {
let addr = Addr::new(self.page_id, self.len);
@@ -188,6 +230,16 @@ impl Page {
None
}
}
#[inline(always)]
pub(crate) unsafe fn get_ptr(&self, addr: usize) -> *const u8 {
self.data.as_ptr().add(addr)
}
#[inline(always)]
pub(crate) unsafe fn get_mut_ptr(&mut self, addr: usize) -> *mut u8 {
self.data.as_mut_ptr().add(addr)
}
}
#[cfg(test)]
@@ -202,13 +254,13 @@ mod tests {
let b = b"happy tax payer";
let addr_a = arena.allocate_space(a.len());
arena.slice_mut(addr_a, a.len()).copy_from_slice(a);
arena.write_bytes(addr_a, a);
let addr_b = arena.allocate_space(b.len());
arena.slice_mut(addr_b, b.len()).copy_from_slice(b);
arena.write_bytes(addr_b, b);
assert_eq!(arena.slice(addr_a, a.len()), a);
assert_eq!(arena.slice(addr_b, b.len()), b);
assert_eq!(arena.read_slice(addr_a, a.len()), a);
assert_eq!(arena.read_slice(addr_b, b.len()), b);
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
@@ -231,15 +283,9 @@ mod tests {
b: 221,
c: 12,
};
let num_bytes = std::mem::size_of::<MyTest>();
let addr_a = arena.allocate_space(num_bytes);
arena.write_at(addr_a, a);
let addr_b = arena.allocate_space(num_bytes);
arena.write_at(addr_b, b);
assert_eq!(arena.read::<MyTest>(addr_a), a);
assert_eq!(arena.read::<MyTest>(addr_b), b);
let addr_a = arena.store(a);
let addr_b = arena.store(b);
assert_eq!(unsafe { arena.read::<MyTest>(addr_a) }, a);
assert_eq!(unsafe { arena.read::<MyTest>(addr_b) }, b);
}
}

View File

@@ -3,5 +3,5 @@ mod memory_arena;
mod term_hashmap;
pub use self::expull::ExpUnrolledLinkedList;
pub use self::memory_arena::{Addr, MemoryArena};
pub use self::memory_arena::{Addr, ArenaStorable, MemoryArena};
pub use self::term_hashmap::{compute_table_size, TermHashMap};

View File

@@ -2,15 +2,39 @@ extern crate murmurhash32;
use self::murmurhash32::murmurhash2;
use super::{Addr, MemoryArena};
use byteorder::{ByteOrder, NativeEndian};
use postings::stacker::memory_arena::store;
use super::{Addr, ArenaStorable, MemoryArena};
use std::iter;
use std::mem;
use std::slice;
pub type BucketId = usize;
struct KeyBytesValue<'a, V> {
key: &'a [u8],
value: V,
}
impl<'a, V> KeyBytesValue<'a, V> {
fn new(key: &'a [u8], value: V) -> KeyBytesValue<'a, V> {
KeyBytesValue { key, value }
}
}
impl<'a, V> ArenaStorable for KeyBytesValue<'a, V>
where
V: ArenaStorable,
{
fn num_bytes(&self) -> usize {
0u16.num_bytes() + self.key.len() + self.value.num_bytes()
}
unsafe fn write_into(self, arena: &mut MemoryArena, addr: Addr) {
arena.write(addr, self.key.len() as u16);
arena.write_bytes(addr.offset(2), self.key);
arena.write(addr.offset(2 + self.key.len() as u32), self.value);
}
}
/// Returns the actual memory size in bytes
/// required to create a table of size $2^num_bits$.
pub fn compute_table_size(num_bits: usize) -> usize {
@@ -90,7 +114,8 @@ impl<'a> Iterator for Iter<'a> {
fn next(&mut self) -> Option<Self::Item> {
self.inner.next().cloned().map(move |bucket: usize| {
let kv = self.hashmap.table[bucket];
let (key, offset): (&'a [u8], Addr) = self.hashmap.get_key_value(kv.key_value_addr);
let (key, offset): (&'a [u8], Addr) =
unsafe { self.hashmap.get_key_value(kv.key_value_addr) };
(key, offset, bucket as BucketId)
})
}
@@ -121,22 +146,12 @@ impl TermHashMap {
self.table.len() < self.occupied.len() * 3
}
#[inline(always)]
fn get_key_value(&self, addr: Addr) -> (&[u8], Addr) {
let data = self.heap.slice_from(addr);
let key_bytes_len = NativeEndian::read_u16(data) as usize;
let key_bytes: &[u8] = &data[2..][..key_bytes_len];
(key_bytes, addr.offset(2u32 + key_bytes_len as u32))
}
#[inline(always)]
fn get_value_addr_if_key_match(&self, target_key: &[u8], addr: Addr) -> Option<Addr> {
let (stored_key, value_addr) = self.get_key_value(addr);
if stored_key == target_key {
Some(value_addr)
} else {
None
}
unsafe fn get_key_value(&self, addr: Addr) -> (&[u8], Addr) {
let key_bytes_len = self.heap.read::<u16>(addr) as usize;
let key_addr = addr.offset(2u32);
let key_bytes: &[u8] = self.heap.read_slice(key_addr, key_bytes_len);
let val_addr: Addr = key_addr.offset(key_bytes.len() as u32);
(key_bytes, val_addr)
}
pub fn set_bucket(&mut self, hash: u32, key_value_addr: Addr, bucket: usize) {
@@ -187,7 +202,7 @@ impl TermHashMap {
pub fn mutate_or_create<S, V, TMutator>(&mut self, key: S, mut updater: TMutator) -> BucketId
where
S: AsRef<[u8]>,
V: Copy + 'static,
V: Copy,
TMutator: FnMut(Option<V>) -> V,
{
if self.is_saturated() {
@@ -201,25 +216,22 @@ impl TermHashMap {
let kv: KeyValue = self.table[bucket];
if kv.is_empty() {
let val = updater(None);
let num_bytes =
std::mem::size_of::<u16>() + key_bytes.len() + std::mem::size_of::<V>();
let key_addr = self.heap.allocate_space(num_bytes);
{
let data = self.heap.slice_mut(key_addr, num_bytes);
NativeEndian::write_u16(data, key_bytes.len() as u16);
let stop = 2 + key_bytes.len();
data[2..stop].copy_from_slice(key_bytes);
store(&mut data[stop..], val);
}
let key_addr = self.heap.store(KeyBytesValue::new(key_bytes, val));
self.set_bucket(hash, key_addr, bucket);
return bucket as BucketId;
} else if kv.hash == hash {
if let Some(val_addr) =
self.get_value_addr_if_key_match(key_bytes, kv.key_value_addr)
{
let v = self.heap.read(val_addr);
let new_v = updater(Some(v));
self.heap.write_at(val_addr, new_v);
let (key_matches, val_addr) = {
let (stored_key, val_addr): (&[u8], Addr) =
unsafe { self.get_key_value(kv.key_value_addr) };
(stored_key == key_bytes, val_addr)
};
if key_matches {
unsafe {
// logic
let v = self.heap.read(val_addr);
let new_v = updater(Some(v));
self.heap.write(val_addr, new_v);
};
return bucket as BucketId;
}
}
@@ -227,6 +239,24 @@ impl TermHashMap {
}
}
#[cfg(all(test, feature = "unstable"))]
mod bench {
use super::murmurhash2::murmurhash2;
use test::Bencher;
#[bench]
fn bench_murmurhash2(b: &mut Bencher) {
let keys: [&'static str; 3] = ["wer qwe qwe qwe ", "werbq weqweqwe2 ", "weraq weqweqwe3 "];
b.iter(|| {
let mut s = 0;
for &key in &keys {
s ^= murmurhash2(key.as_bytes());
}
s
});
}
}
#[cfg(test)]
mod tests {
@@ -258,7 +288,10 @@ mod tests {
let mut vanilla_hash_map = HashMap::new();
let mut iter_values = hash_map.iter();
while let Some((key, addr, _)) = iter_values.next() {
let val: u32 = hash_map.heap.read(addr);
let val: u32 = unsafe {
// test
hash_map.heap.read(addr)
};
vanilla_hash_map.insert(key.to_owned(), val);
}
assert_eq!(vanilla_hash_map.len(), 2);

View File

@@ -1,6 +1,6 @@
use common::BitSet;
use core::SegmentReader;
use fst::Automaton;
use tantivy_fst::Automaton;
use query::BitSetDocSet;
use query::ConstScorer;
use query::{Scorer, Weight};

View File

@@ -1,5 +1,5 @@
use error::TantivyError;
use fst_regex::Regex;
use tantivy_fst::Regex;
use query::{AutomatonWeight, Query, Weight};
use schema::Field;
use std::clone::Clone;

View File

@@ -133,7 +133,7 @@ impl<'a, T: ?Sized + AsRef<str>> From<&'a T> for Facet {
}
let path: &str = path_asref.as_ref();
assert!(!path.is_empty());
assert!(path.starts_with('/'));
assert!(path.starts_with("/"));
let mut facet_encoded = String::new();
let mut state = State::Idle;
let path_bytes = path.as_bytes();

View File

@@ -1,8 +1,8 @@
use super::TermDictionary;
use fst::automaton::AlwaysMatch;
use fst::map::{Stream, StreamBuilder};
use fst::Automaton;
use fst::{IntoStreamer, Streamer};
use tantivy_fst::automaton::AlwaysMatch;
use tantivy_fst::map::{Stream, StreamBuilder};
use tantivy_fst::Automaton;
use tantivy_fst::{IntoStreamer, Streamer};
use postings::TermInfo;
use termdict::TermOrdinal;

View File

@@ -1,4 +1,4 @@
use byteorder::{ByteOrder, LittleEndian};
use byteorder::ByteOrder;
use common::bitpacker::BitPacker;
use common::compute_num_bits;
use common::Endianness;
@@ -7,6 +7,7 @@ use directory::ReadOnlySource;
use postings::TermInfo;
use std::cmp;
use std::io::{self, Read, Write};
use std::ptr;
use termdict::TermOrdinal;
const BLOCK_LEN: usize = 256;
@@ -87,17 +88,13 @@ fn extract_bits(data: &[u8], addr_bits: usize, num_bits: u8) -> u64 {
assert!(num_bits <= 56);
let addr_byte = addr_bits / 8;
let bit_shift = (addr_bits % 8) as u64;
let val_unshifted_unmasked: u64 = if data.len() >= addr_byte + 8 {
LittleEndian::read_u64(&data[addr_byte..][..8])
} else {
// the buffer is not large enough.
// Let's copy the few remaining bytes to a 8 byte buffer
// padded with 0s.
let mut buf = [0u8; 8];
let data_to_copy = &data[addr_byte..];
let nbytes = data_to_copy.len();
buf[..nbytes].copy_from_slice(data_to_copy);
LittleEndian::read_u64(&buf)
assert!(data.len() >= addr_byte + 7);
let val_unshifted_unmasked: u64 = unsafe {
// ok because the pointer is only accessed using `ptr::read_unaligned`
#[cfg_attr(feature = "cargo-clippy", allow(clippy::cast_ptr_alignment))]
let addr = data.as_ptr().add(addr_byte) as *const u64;
// ok thanks to the 7 byte padding
ptr::read_unaligned(addr)
};
let val_shifted_unmasked = val_unshifted_unmasked >> bit_shift;
let mask = (1u64 << u64::from(num_bits)) - 1;
@@ -249,6 +246,7 @@ impl TermInfoStoreWriter {
self.num_terms.serialize(write)?;
write.write_all(&self.buffer_block_metas)?;
write.write_all(&self.buffer_term_infos)?;
write.write_all(&[0u8; 7])?;
Ok(())
}
}

View File

@@ -3,15 +3,15 @@ use super::{TermStreamer, TermStreamerBuilder};
use common::BinarySerializable;
use common::CountingWriter;
use directory::ReadOnlySource;
use fst;
use fst::raw::Fst;
use fst::Automaton;
use tantivy_fst;
use tantivy_fst::raw::Fst;
use tantivy_fst::Automaton;
use postings::TermInfo;
use schema::FieldType;
use std::io::{self, Write};
use termdict::TermOrdinal;
fn convert_fst_error(e: fst::Error) -> io::Error {
fn convert_fst_error(e: tantivy_fst::Error) -> io::Error {
io::Error::new(io::ErrorKind::Other, e)
}
@@ -19,7 +19,7 @@ fn convert_fst_error(e: fst::Error) -> io::Error {
///
/// Inserting must be done in the order of the `keys`.
pub struct TermDictionaryBuilder<W> {
fst_builder: fst::MapBuilder<W>,
fst_builder: tantivy_fst::MapBuilder<W>,
term_info_store_writer: TermInfoStoreWriter,
term_ord: u64,
}
@@ -30,7 +30,7 @@ where
{
/// Creates a new `TermDictionaryBuilder`
pub fn create(w: W, _field_type: &FieldType) -> io::Result<Self> {
let fst_builder = fst::MapBuilder::new(w).map_err(convert_fst_error)?;
let fst_builder = tantivy_fst::MapBuilder::new(w).map_err(convert_fst_error)?;
Ok(TermDictionaryBuilder {
fst_builder,
term_info_store_writer: TermInfoStoreWriter::new(),
@@ -87,17 +87,9 @@ where
}
}
fn open_fst_index(source: ReadOnlySource) -> fst::Map {
let fst = match source {
ReadOnlySource::Anonymous(data) => {
Fst::from_shared_bytes(data.data, data.start, data.len).expect("FST data is corrupted")
}
#[cfg(feature = "mmap")]
ReadOnlySource::Mmap(mmap_readonly) => {
Fst::from_mmap(mmap_readonly).expect("FST data is corrupted")
}
};
fst::Map::from(fst)
fn open_fst_index(source: ReadOnlySource) -> tantivy_fst::Map<ReadOnlySource> {
let fst = Fst::new(source).expect("FST data is corrupted");
tantivy_fst::Map::from(fst)
}
/// The term dictionary contains all of the terms in
@@ -107,7 +99,7 @@ fn open_fst_index(source: ReadOnlySource) -> fst::Map {
/// respective `TermOrdinal`. The `TermInfoStore` then makes it
/// possible to fetch the associated `TermInfo`.
pub struct TermDictionary {
fst_index: fst::Map,
fst_index: tantivy_fst::Map<ReadOnlySource>,
term_info_store: TermInfoStore,
}
@@ -136,7 +128,7 @@ impl TermDictionary {
.expect("Creating a TermDictionaryBuilder in a Vec<u8> should never fail")
.finish()
.expect("Writing in a Vec<u8> should never fail");
let source = ReadOnlySource::from(term_dictionary_data);
let source = ReadOnlySource::new(term_dictionary_data);
Self::from_source(&source)
}