Compare commits

...

2 Commits

Author SHA1 Message Date
Paul Masurel
dc769b373b Closes #500 2019-02-22 08:59:11 +09:00
Paul Masurel
5f07dc35d8 32bits platforms 2019-02-14 09:12:25 +09:00
13 changed files with 153 additions and 181 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "tantivy"
version = "0.8.0"
version = "0.8.3"
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
license = "MIT"
categories = ["database-implementations", "data-structures"]
@@ -16,8 +16,8 @@ base64 = "0.10.0"
byteorder = "1.0"
lazy_static = "1"
regex = "1.0"
fst = {version="0.3", default-features=false}
fst-regex = { version="0.2" }
tantivy-fst = {path="../tantivy-search/fst", version="0.1"}
memmap = "0.7"
lz4 = {version="1.20", optional=true}
snap = {version="0.2"}
atomicwrites = {version="0.2.2", optional=true}
@@ -30,7 +30,7 @@ serde_derive = "1.0"
serde_json = "1.0"
num_cpus = "1.2"
itertools = "0.8"
levenshtein_automata = {version="0.1", features=["fst_automaton"]}
levenshtein_automata = {version="0.1"}
bit-set = "0.5"
uuid = { version = "0.7", features = ["v4", "serde"] }
crossbeam = "0.5"
@@ -70,7 +70,7 @@ overflow-checks = true
[features]
# by default no-fail is disabled. We manually enable it when running test.
default = ["mmap", "no_fail"]
mmap = ["fst/mmap", "atomicwrites"]
mmap = ["atomicwrites"]
lz4-compression = ["lz4"]
no_fail = ["fail/no_fail"]
unstable = [] # useful for benches.

View File

@@ -1,12 +1,9 @@
use atomicwrites;
use common::make_io_err;
use directory::error::{DeleteError, IOError, OpenDirectoryError, OpenReadError, OpenWriteError};
use directory::shared_vec_slice::SharedVecSlice;
use directory::Directory;
use directory::ReadOnlySource;
use directory::WritePtr;
use fst::raw::MmapReadOnly;
use std::collections::hash_map::Entry as HashMapEntry;
use std::collections::HashMap;
use std::convert::From;
use std::fmt;
@@ -19,11 +16,14 @@ use std::result;
use std::sync::Arc;
use std::sync::RwLock;
use tempdir::TempDir;
use memmap::Mmap;
use std::sync::Weak;
use std::ops::Deref;
/// Returns None iff the file exists, can be read, but is empty (and hence
/// cannot be mmapped).
///
fn open_mmap(full_path: &Path) -> result::Result<Option<MmapReadOnly>, OpenReadError> {
fn open_mmap(full_path: &Path) -> result::Result<Option<Mmap>, OpenReadError> {
let file = File::open(full_path).map_err(|e| {
if e.kind() == io::ErrorKind::NotFound {
OpenReadError::FileDoesNotExist(full_path.to_owned())
@@ -42,7 +42,7 @@ fn open_mmap(full_path: &Path) -> result::Result<Option<MmapReadOnly>, OpenReadE
return Ok(None);
}
unsafe {
MmapReadOnly::open(&file)
memmap::Mmap::map(&file)
.map(Some)
.map_err(|e| From::from(IOError::with_path(full_path.to_owned(), e)))
}
@@ -65,7 +65,7 @@ pub struct CacheInfo {
struct MmapCache {
counters: CacheCounters,
cache: HashMap<PathBuf, MmapReadOnly>,
cache: HashMap<PathBuf, Weak<Box<Deref<Target=[u8]> + Send + Sync>>>,
}
impl Default for MmapCache {
@@ -78,10 +78,6 @@ impl Default for MmapCache {
}
impl MmapCache {
/// Removes a `MmapReadOnly` entry from the mmap cache.
fn discard_from_cache(&mut self, full_path: &Path) -> bool {
self.cache.remove(full_path).is_some()
}
fn get_info(&mut self) -> CacheInfo {
let paths: Vec<PathBuf> = self.cache.keys().cloned().collect();
@@ -91,23 +87,27 @@ impl MmapCache {
}
}
fn get_mmap(&mut self, full_path: &Path) -> Result<Option<MmapReadOnly>, OpenReadError> {
Ok(match self.cache.entry(full_path.to_owned()) {
HashMapEntry::Occupied(occupied_entry) => {
let mmap = occupied_entry.get();
self.counters.hit += 1;
Some(mmap.clone())
}
HashMapEntry::Vacant(vacant_entry) => {
self.counters.miss += 1;
if let Some(mmap) = open_mmap(full_path)? {
vacant_entry.insert(mmap.clone());
Some(mmap)
} else {
None
// Returns None if the file exists but as a len of 0 (and hence is not mmappable).
fn get_mmap(&mut self, full_path: &Path) -> Result<Option<Arc<Box<Deref<Target=[u8]> + Send + Sync>>>, OpenReadError> {
let path_in_cache = self.cache.contains_key(full_path);
if path_in_cache {
{
let mmap_weak_opt = self.cache.get(full_path);
if let Some(mmap_arc) = mmap_weak_opt.and_then(|mmap_weak| mmap_weak.upgrade()) {
self.counters.hit += 1;
return Ok(Some(mmap_arc));
}
}
})
self.cache.remove(full_path);
}
self.counters.miss += 1;
if let Some(mmap) = open_mmap(full_path)? {
let res: Arc<Box<Deref<Target=[u8]> + Send + Sync>> = Arc::new(Box::new(mmap));
self.cache.insert(full_path.to_owned(), Arc::downgrade(&res));
Ok(Some(res))
} else {
Ok(None)
}
}
}
@@ -253,11 +253,10 @@ impl Directory for MmapDirectory {
);
IOError::with_path(path.to_owned(), make_io_err(msg))
})?;
Ok(mmap_cache
.get_mmap(&full_path)?
.map(ReadOnlySource::Mmap)
.unwrap_or_else(|| ReadOnlySource::Anonymous(SharedVecSlice::empty())))
.map(ReadOnlySource::from)
.unwrap_or_else(|| ReadOnlySource::empty()))
}
fn open_write(&mut self, path: &Path) -> Result<WritePtr, OpenWriteError> {
@@ -295,20 +294,6 @@ impl Directory for MmapDirectory {
fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
debug!("Deleting file {:?}", path);
let full_path = self.resolve_path(path);
let mut mmap_cache = self.mmap_cache.write().map_err(|_| {
let msg = format!(
"Failed to acquired write lock \
on mmap cache while deleting {:?}",
path
);
IOError::with_path(path.to_owned(), make_io_err(msg))
})?;
mmap_cache.discard_from_cache(path);
// Removing the entry in the MMap cache.
// The munmap will appear on Drop,
// when the last reference is gone.
mmap_cache.cache.remove(&full_path);
match fs::remove_file(&full_path) {
Ok(_) => self
.sync_directory()
@@ -403,25 +388,50 @@ mod tests {
w.flush().unwrap();
}
}
{
for (i, path) in paths.iter().enumerate() {
let _r = mmap_directory.open_read(path).unwrap();
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), i + 1);
}
for path in paths.iter() {
let _r = mmap_directory.open_read(path).unwrap();
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), num_paths);
}
for (i, path) in paths.iter().enumerate() {
mmap_directory.delete(path).unwrap();
assert_eq!(
mmap_directory.get_cache_info().mmapped.len(),
num_paths - i - 1
);
}
let mut keep = vec![];
for (i, path) in paths.iter().enumerate() {
keep.push(mmap_directory.open_read(path).unwrap());
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), i + 1);
}
assert_eq!(mmap_directory.get_cache_info().counters.hit, 0);
assert_eq!(mmap_directory.get_cache_info().counters.miss, 10);
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 10);
for path in paths.iter() {
let _r = mmap_directory.open_read(path).unwrap();
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), num_paths);
}
assert_eq!(mmap_directory.get_cache_info().counters.hit, 10);
assert_eq!(mmap_directory.get_cache_info().counters.miss, 10);
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 10);
for path in paths.iter() {
let _r = mmap_directory.open_read(path).unwrap();
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), num_paths);
}
assert_eq!(mmap_directory.get_cache_info().counters.hit, 20);
assert_eq!(mmap_directory.get_cache_info().counters.miss, 10);
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 10);
drop(keep);
for path in paths.iter() {
let _r = mmap_directory.open_read(path).unwrap();
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), num_paths);
}
assert_eq!(mmap_directory.get_cache_info().counters.hit, 20);
assert_eq!(mmap_directory.get_cache_info().counters.miss, 20);
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 10);
for path in &paths {
mmap_directory.delete(path).unwrap();
}
assert_eq!(mmap_directory.get_cache_info().counters.hit, 20);
assert_eq!(mmap_directory.get_cache_info().counters.miss, 20);
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 10);
for path in paths.iter() {
assert!(mmap_directory.open_read(path).is_err());
}
assert_eq!(mmap_directory.get_cache_info().counters.hit, 20);
assert_eq!(mmap_directory.get_cache_info().counters.miss, 30);
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 0);
}

View File

@@ -11,7 +11,6 @@ mod directory;
mod managed_directory;
mod ram_directory;
mod read_only_source;
mod shared_vec_slice;
/// Errors specific to the directory module.
pub mod error;

View File

@@ -1,4 +1,3 @@
use super::shared_vec_slice::SharedVecSlice;
use common::make_io_err;
use directory::error::{DeleteError, IOError, OpenReadError, OpenWriteError};
use directory::WritePtr;
@@ -71,7 +70,7 @@ impl Write for VecWriter {
}
#[derive(Clone)]
struct InnerDirectory(Arc<RwLock<HashMap<PathBuf, Arc<Vec<u8>>>>>);
struct InnerDirectory(Arc<RwLock<HashMap<PathBuf, ReadOnlySource>>>);
impl InnerDirectory {
fn new() -> InnerDirectory {
@@ -85,7 +84,7 @@ impl InnerDirectory {
path
))
})?;
let prev_value = map.insert(path, Arc::new(Vec::from(data)));
let prev_value = map.insert(path, ReadOnlySource::new(Vec::from(data)));
Ok(prev_value.is_some())
}
@@ -105,8 +104,7 @@ impl InnerDirectory {
readable_map
.get(path)
.ok_or_else(|| OpenReadError::FileDoesNotExist(PathBuf::from(path)))
.map(Arc::clone)
.map(|data| ReadOnlySource::Anonymous(SharedVecSlice::new(data)))
.map(|el| el.clone())
})
}

View File

@@ -1,9 +1,8 @@
use super::shared_vec_slice::SharedVecSlice;
use common::HasLen;
#[cfg(feature = "mmap")]
use fst::raw::MmapReadOnly;
use stable_deref_trait::{CloneStableDeref, StableDeref};
use std::ops::Deref;
use std::sync::Arc;
/// Read object that represents files in tantivy.
///
@@ -11,12 +10,10 @@ use std::ops::Deref;
/// the data in the form of a constant read-only `&[u8]`.
/// Whatever happens to the directory file, the data
/// hold by this object should never be altered or destroyed.
pub enum ReadOnlySource {
/// Mmap source of data
#[cfg(feature = "mmap")]
Mmap(MmapReadOnly),
/// Wrapping a `Vec<u8>`
Anonymous(SharedVecSlice),
pub struct ReadOnlySource {
data: Arc<Box<Deref<Target=[u8]> + Send + Sync + 'static>>,
start: usize,
stop: usize
}
unsafe impl StableDeref for ReadOnlySource {}
@@ -30,19 +27,41 @@ impl Deref for ReadOnlySource {
}
}
impl From<Arc<Box<Deref<Target=[u8]> + Send + Sync>>> for ReadOnlySource {
fn from(data: Arc<Box<Deref<Target=[u8]> + Send + Sync>>) -> Self {
let len = data.len();
ReadOnlySource {
data,
start: 0,
stop: len
}
}
}
const EMPTY_ARRAY: [u8; 0] = [0u8; 0];
impl ReadOnlySource {
/// Creates a new `ReadOnlySource`.
pub fn new<D>(data: D) -> ReadOnlySource
where D: Deref<Target=[u8]> + Send + Sync + 'static {
let len = data.len();
ReadOnlySource {
data: Arc::new(Box::new(data)),
start: 0,
stop: len
}
}
/// Creates an empty ReadOnlySource
pub fn empty() -> ReadOnlySource {
ReadOnlySource::Anonymous(SharedVecSlice::empty())
ReadOnlySource::new(&EMPTY_ARRAY[..])
}
/// Returns the data underlying the ReadOnlySource object.
pub fn as_slice(&self) -> &[u8] {
match *self {
#[cfg(feature = "mmap")]
ReadOnlySource::Mmap(ref mmap_read_only) => mmap_read_only.as_slice(),
ReadOnlySource::Anonymous(ref shared_vec) => shared_vec.as_slice(),
}
&self.data[self.start..self.stop]
}
/// Splits into 2 `ReadOnlySource`, at the offset given
@@ -63,22 +82,18 @@ impl ReadOnlySource {
/// worth of data in anonymous memory, and only a
/// 1KB slice is remaining, the whole `500MBs`
/// are retained in memory.
pub fn slice(&self, from_offset: usize, to_offset: usize) -> ReadOnlySource {
pub fn slice(&self, start: usize, stop: usize) -> ReadOnlySource {
assert!(
from_offset <= to_offset,
start <= stop,
"Requested negative slice [{}..{}]",
from_offset,
to_offset
start,
stop
);
match *self {
#[cfg(feature = "mmap")]
ReadOnlySource::Mmap(ref mmap_read_only) => {
let sliced_mmap = mmap_read_only.range(from_offset, to_offset - from_offset);
ReadOnlySource::Mmap(sliced_mmap)
}
ReadOnlySource::Anonymous(ref shared_vec) => {
ReadOnlySource::Anonymous(shared_vec.slice(from_offset, to_offset))
}
assert!(stop <= self.len());
ReadOnlySource {
data: self.data.clone(),
start: self.start + start,
stop: self.start + stop
}
}
@@ -87,8 +102,7 @@ impl ReadOnlySource {
///
/// Equivalent to `.slice(from_offset, self.len())`
pub fn slice_from(&self, from_offset: usize) -> ReadOnlySource {
let len = self.len();
self.slice(from_offset, len)
self.slice(from_offset, self.len())
}
/// Like `.slice(...)` but enforcing only the `to`
@@ -102,19 +116,18 @@ impl ReadOnlySource {
impl HasLen for ReadOnlySource {
fn len(&self) -> usize {
self.as_slice().len()
self.stop - self.start
}
}
impl Clone for ReadOnlySource {
fn clone(&self) -> Self {
self.slice(0, self.len())
self.slice_from(0)
}
}
impl From<Vec<u8>> for ReadOnlySource {
fn from(data: Vec<u8>) -> ReadOnlySource {
let shared_data = SharedVecSlice::from(data);
ReadOnlySource::Anonymous(shared_data)
ReadOnlySource::new(data)
}
}
}

View File

@@ -1,41 +0,0 @@
use std::sync::Arc;
#[derive(Clone)]
pub struct SharedVecSlice {
pub data: Arc<Vec<u8>>,
pub start: usize,
pub len: usize,
}
impl SharedVecSlice {
pub fn empty() -> SharedVecSlice {
SharedVecSlice::new(Arc::new(Vec::new()))
}
pub fn new(data: Arc<Vec<u8>>) -> SharedVecSlice {
let data_len = data.len();
SharedVecSlice {
data,
start: 0,
len: data_len,
}
}
pub fn as_slice(&self) -> &[u8] {
&self.data[self.start..self.start + self.len]
}
pub fn slice(&self, from_offset: usize, to_offset: usize) -> SharedVecSlice {
SharedVecSlice {
data: Arc::clone(&self.data),
start: self.start + from_offset,
len: to_offset - from_offset,
}
}
}
impl From<Vec<u8>> for SharedVecSlice {
fn from(data: Vec<u8>) -> SharedVecSlice {
SharedVecSlice::new(Arc::new(data))
}
}

View File

@@ -27,22 +27,22 @@ mod archicture_impl {
#[cfg(not(target_arch = "x86_64"))]
mod archicture_impl {
/// Under other architecture, we rely on a mutex.
use std::sync::Mutex;
use std::sync::atomic::Ordering;
/// Under other architecture, we rely on a mutex.
use std::sync::RwLock;
#[derive(Default)]
pub struct AtomicU64Ersatz(Mutex<u64>);
pub struct AtomicU64Ersatz(RwLock<u64>);
impl AtomicU64Ersatz {
pub fn new(first_opstamp: u64) -> AtomicU64Ersatz {
AtomicU64Ersatz(AtomicUsize::new(first_opstamp))
AtomicU64Ersatz(RwLock::new(first_opstamp))
}
pub fn fetch_add(&self, val: u64, _order: Ordering) -> u64 {
let lock = self.0.lock().unwrap();
pub fn fetch_add(&self, incr: u64, _order: Ordering) -> u64 {
let mut lock = self.0.write().unwrap();
let previous_val = *lock;
*lock = previous_val + 1;
*lock = previous_val + incr;
previous_val
}
}

View File

@@ -123,6 +123,8 @@ extern crate log;
#[macro_use]
extern crate failure;
#[cfg(feature = "mmap")]
extern crate memmap;
#[cfg(feature = "mmap")]
extern crate atomicwrites;
extern crate base64;
@@ -135,8 +137,7 @@ extern crate combine;
extern crate crossbeam;
extern crate fnv;
extern crate fst;
extern crate fst_regex;
extern crate tantivy_fst;
extern crate futures;
extern crate futures_cpupool;
extern crate htmlescape;

View File

@@ -2,7 +2,7 @@ use common::BitSet;
use common::HasLen;
use common::{BinarySerializable, VInt};
use docset::{DocSet, SkipResult};
use fst::Streamer;
use tantivy_fst::Streamer;
use owned_read::OwnedRead;
use positions::PositionReader;
use postings::compression::compressed_block_size;
@@ -628,7 +628,7 @@ mod tests {
use common::HasLen;
use core::Index;
use docset::DocSet;
use fst::Streamer;
use tantivy_fst::Streamer;
use schema::IndexRecordOption;
use schema::Schema;
use schema::Term;

View File

@@ -1,6 +1,6 @@
use common::BitSet;
use core::SegmentReader;
use fst::Automaton;
use tantivy_fst::Automaton;
use query::BitSetDocSet;
use query::ConstScorer;
use query::{Scorer, Weight};

View File

@@ -1,5 +1,5 @@
use error::TantivyError;
use fst_regex::Regex;
use tantivy_fst::Regex;
use query::{AutomatonWeight, Query, Weight};
use schema::Field;
use std::clone::Clone;

View File

@@ -1,8 +1,8 @@
use super::TermDictionary;
use fst::automaton::AlwaysMatch;
use fst::map::{Stream, StreamBuilder};
use fst::Automaton;
use fst::{IntoStreamer, Streamer};
use tantivy_fst::automaton::AlwaysMatch;
use tantivy_fst::map::{Stream, StreamBuilder};
use tantivy_fst::Automaton;
use tantivy_fst::{IntoStreamer, Streamer};
use postings::TermInfo;
use termdict::TermOrdinal;

View File

@@ -3,15 +3,15 @@ use super::{TermStreamer, TermStreamerBuilder};
use common::BinarySerializable;
use common::CountingWriter;
use directory::ReadOnlySource;
use fst;
use fst::raw::Fst;
use fst::Automaton;
use tantivy_fst;
use tantivy_fst::raw::Fst;
use tantivy_fst::Automaton;
use postings::TermInfo;
use schema::FieldType;
use std::io::{self, Write};
use termdict::TermOrdinal;
fn convert_fst_error(e: fst::Error) -> io::Error {
fn convert_fst_error(e: tantivy_fst::Error) -> io::Error {
io::Error::new(io::ErrorKind::Other, e)
}
@@ -19,7 +19,7 @@ fn convert_fst_error(e: fst::Error) -> io::Error {
///
/// Inserting must be done in the order of the `keys`.
pub struct TermDictionaryBuilder<W> {
fst_builder: fst::MapBuilder<W>,
fst_builder: tantivy_fst::MapBuilder<W>,
term_info_store_writer: TermInfoStoreWriter,
term_ord: u64,
}
@@ -30,7 +30,7 @@ where
{
/// Creates a new `TermDictionaryBuilder`
pub fn create(w: W, _field_type: &FieldType) -> io::Result<Self> {
let fst_builder = fst::MapBuilder::new(w).map_err(convert_fst_error)?;
let fst_builder = tantivy_fst::MapBuilder::new(w).map_err(convert_fst_error)?;
Ok(TermDictionaryBuilder {
fst_builder,
term_info_store_writer: TermInfoStoreWriter::new(),
@@ -87,17 +87,9 @@ where
}
}
fn open_fst_index(source: ReadOnlySource) -> fst::Map {
let fst = match source {
ReadOnlySource::Anonymous(data) => {
Fst::from_shared_bytes(data.data, data.start, data.len).expect("FST data is corrupted")
}
#[cfg(feature = "mmap")]
ReadOnlySource::Mmap(mmap_readonly) => {
Fst::from_mmap(mmap_readonly).expect("FST data is corrupted")
}
};
fst::Map::from(fst)
fn open_fst_index(source: ReadOnlySource) -> tantivy_fst::Map<ReadOnlySource> {
let fst = Fst::new(source).expect("FST data is corrupted");
tantivy_fst::Map::from(fst)
}
/// The term dictionary contains all of the terms in
@@ -107,7 +99,7 @@ fn open_fst_index(source: ReadOnlySource) -> fst::Map {
/// respective `TermOrdinal`. The `TermInfoStore` then makes it
/// possible to fetch the associated `TermInfo`.
pub struct TermDictionary {
fst_index: fst::Map,
fst_index: tantivy_fst::Map<ReadOnlySource>,
term_info_store: TermInfoStore,
}
@@ -136,7 +128,7 @@ impl TermDictionary {
.expect("Creating a TermDictionaryBuilder in a Vec<u8> should never fail")
.finish()
.expect("Writing in a Vec<u8> should never fail");
let source = ReadOnlySource::from(term_dictionary_data);
let source = ReadOnlySource::new(term_dictionary_data);
Self::from_source(&source)
}