diff --git a/Cargo.toml b/Cargo.toml index 6cf5e4ba2..d40d42efb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tantivy" -version = "0.8.2" +version = "0.8.3" authors = ["Paul Masurel "] license = "MIT" categories = ["database-implementations", "data-structures"] @@ -16,8 +16,8 @@ base64 = "0.10.0" byteorder = "1.0" lazy_static = "1" regex = "1.0" -fst = {version="0.3", default-features=false} -fst-regex = { version="0.2" } +tantivy-fst = {path="../tantivy-search/fst", version="0.1"} +memmap = "0.7" lz4 = {version="1.20", optional=true} snap = {version="0.2"} atomicwrites = {version="0.2.2", optional=true} @@ -30,7 +30,7 @@ serde_derive = "1.0" serde_json = "1.0" num_cpus = "1.2" itertools = "0.8" -levenshtein_automata = {version="0.1", features=["fst_automaton"]} +levenshtein_automata = {version="0.1"} bit-set = "0.5" uuid = { version = "0.7", features = ["v4", "serde"] } crossbeam = "0.5" @@ -70,7 +70,7 @@ overflow-checks = true [features] # by default no-fail is disabled. We manually enable it when running test. default = ["mmap", "no_fail"] -mmap = ["fst/mmap", "atomicwrites"] +mmap = ["atomicwrites"] lz4-compression = ["lz4"] no_fail = ["fail/no_fail"] unstable = [] # useful for benches. diff --git a/src/directory/mmap_directory.rs b/src/directory/mmap_directory.rs index 03031562f..0abefc044 100644 --- a/src/directory/mmap_directory.rs +++ b/src/directory/mmap_directory.rs @@ -1,12 +1,9 @@ use atomicwrites; use common::make_io_err; use directory::error::{DeleteError, IOError, OpenDirectoryError, OpenReadError, OpenWriteError}; -use directory::shared_vec_slice::SharedVecSlice; use directory::Directory; use directory::ReadOnlySource; use directory::WritePtr; -use fst::raw::MmapReadOnly; -use std::collections::hash_map::Entry as HashMapEntry; use std::collections::HashMap; use std::convert::From; use std::fmt; @@ -19,11 +16,14 @@ use std::result; use std::sync::Arc; use std::sync::RwLock; use tempdir::TempDir; +use memmap::Mmap; +use std::sync::Weak; +use std::ops::Deref; /// Returns None iff the file exists, can be read, but is empty (and hence /// cannot be mmapped). /// -fn open_mmap(full_path: &Path) -> result::Result, OpenReadError> { +fn open_mmap(full_path: &Path) -> result::Result, OpenReadError> { let file = File::open(full_path).map_err(|e| { if e.kind() == io::ErrorKind::NotFound { OpenReadError::FileDoesNotExist(full_path.to_owned()) @@ -42,7 +42,7 @@ fn open_mmap(full_path: &Path) -> result::Result, OpenReadE return Ok(None); } unsafe { - MmapReadOnly::open(&file) + memmap::Mmap::map(&file) .map(Some) .map_err(|e| From::from(IOError::with_path(full_path.to_owned(), e))) } @@ -65,7 +65,7 @@ pub struct CacheInfo { struct MmapCache { counters: CacheCounters, - cache: HashMap, + cache: HashMap + Send + Sync>>>, } impl Default for MmapCache { @@ -78,10 +78,6 @@ impl Default for MmapCache { } impl MmapCache { - /// Removes a `MmapReadOnly` entry from the mmap cache. - fn discard_from_cache(&mut self, full_path: &Path) -> bool { - self.cache.remove(full_path).is_some() - } fn get_info(&mut self) -> CacheInfo { let paths: Vec = self.cache.keys().cloned().collect(); @@ -91,23 +87,27 @@ impl MmapCache { } } - fn get_mmap(&mut self, full_path: &Path) -> Result, OpenReadError> { - Ok(match self.cache.entry(full_path.to_owned()) { - HashMapEntry::Occupied(occupied_entry) => { - let mmap = occupied_entry.get(); - self.counters.hit += 1; - Some(mmap.clone()) - } - HashMapEntry::Vacant(vacant_entry) => { - self.counters.miss += 1; - if let Some(mmap) = open_mmap(full_path)? { - vacant_entry.insert(mmap.clone()); - Some(mmap) - } else { - None + // Returns None if the file exists but as a len of 0 (and hence is not mmappable). + fn get_mmap(&mut self, full_path: &Path) -> Result + Send + Sync>>>, OpenReadError> { + let path_in_cache = self.cache.contains_key(full_path); + if path_in_cache { + { + let mmap_weak_opt = self.cache.get(full_path); + if let Some(mmap_arc) = mmap_weak_opt.and_then(|mmap_weak| mmap_weak.upgrade()) { + self.counters.hit += 1; + return Ok(Some(mmap_arc)); } } - }) + self.cache.remove(full_path); + } + self.counters.miss += 1; + if let Some(mmap) = open_mmap(full_path)? { + let res: Arc + Send + Sync>> = Arc::new(Box::new(mmap)); + self.cache.insert(full_path.to_owned(), Arc::downgrade(&res)); + Ok(Some(res)) + } else { + Ok(None) + } } } @@ -253,11 +253,10 @@ impl Directory for MmapDirectory { ); IOError::with_path(path.to_owned(), make_io_err(msg)) })?; - Ok(mmap_cache .get_mmap(&full_path)? - .map(ReadOnlySource::Mmap) - .unwrap_or_else(|| ReadOnlySource::Anonymous(SharedVecSlice::empty()))) + .map(ReadOnlySource::from) + .unwrap_or_else(|| ReadOnlySource::empty())) } fn open_write(&mut self, path: &Path) -> Result { @@ -295,20 +294,6 @@ impl Directory for MmapDirectory { fn delete(&self, path: &Path) -> result::Result<(), DeleteError> { debug!("Deleting file {:?}", path); let full_path = self.resolve_path(path); - let mut mmap_cache = self.mmap_cache.write().map_err(|_| { - let msg = format!( - "Failed to acquired write lock \ - on mmap cache while deleting {:?}", - path - ); - IOError::with_path(path.to_owned(), make_io_err(msg)) - })?; - mmap_cache.discard_from_cache(path); - - // Removing the entry in the MMap cache. - // The munmap will appear on Drop, - // when the last reference is gone. - mmap_cache.cache.remove(&full_path); match fs::remove_file(&full_path) { Ok(_) => self .sync_directory() @@ -403,25 +388,50 @@ mod tests { w.flush().unwrap(); } } - { - for (i, path) in paths.iter().enumerate() { - let _r = mmap_directory.open_read(path).unwrap(); - assert_eq!(mmap_directory.get_cache_info().mmapped.len(), i + 1); - } - for path in paths.iter() { - let _r = mmap_directory.open_read(path).unwrap(); - assert_eq!(mmap_directory.get_cache_info().mmapped.len(), num_paths); - } - for (i, path) in paths.iter().enumerate() { - mmap_directory.delete(path).unwrap(); - assert_eq!( - mmap_directory.get_cache_info().mmapped.len(), - num_paths - i - 1 - ); - } + + let mut keep = vec![]; + for (i, path) in paths.iter().enumerate() { + keep.push(mmap_directory.open_read(path).unwrap()); + assert_eq!(mmap_directory.get_cache_info().mmapped.len(), i + 1); + } + assert_eq!(mmap_directory.get_cache_info().counters.hit, 0); + assert_eq!(mmap_directory.get_cache_info().counters.miss, 10); + assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 10); + for path in paths.iter() { + let _r = mmap_directory.open_read(path).unwrap(); + assert_eq!(mmap_directory.get_cache_info().mmapped.len(), num_paths); } assert_eq!(mmap_directory.get_cache_info().counters.hit, 10); assert_eq!(mmap_directory.get_cache_info().counters.miss, 10); + assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 10); + + for path in paths.iter() { + let _r = mmap_directory.open_read(path).unwrap(); + assert_eq!(mmap_directory.get_cache_info().mmapped.len(), num_paths); + } + assert_eq!(mmap_directory.get_cache_info().counters.hit, 20); + assert_eq!(mmap_directory.get_cache_info().counters.miss, 10); + assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 10); + drop(keep); + for path in paths.iter() { + let _r = mmap_directory.open_read(path).unwrap(); + assert_eq!(mmap_directory.get_cache_info().mmapped.len(), num_paths); + } + assert_eq!(mmap_directory.get_cache_info().counters.hit, 20); + assert_eq!(mmap_directory.get_cache_info().counters.miss, 20); + assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 10); + + for path in &paths { + mmap_directory.delete(path).unwrap(); + } + assert_eq!(mmap_directory.get_cache_info().counters.hit, 20); + assert_eq!(mmap_directory.get_cache_info().counters.miss, 20); + assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 10); + for path in paths.iter() { + assert!(mmap_directory.open_read(path).is_err()); + } + assert_eq!(mmap_directory.get_cache_info().counters.hit, 20); + assert_eq!(mmap_directory.get_cache_info().counters.miss, 30); assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 0); } diff --git a/src/directory/mod.rs b/src/directory/mod.rs index c4627b88b..de08d1ed3 100644 --- a/src/directory/mod.rs +++ b/src/directory/mod.rs @@ -11,7 +11,6 @@ mod directory; mod managed_directory; mod ram_directory; mod read_only_source; -mod shared_vec_slice; /// Errors specific to the directory module. pub mod error; diff --git a/src/directory/ram_directory.rs b/src/directory/ram_directory.rs index 2f1733e0f..9423affff 100644 --- a/src/directory/ram_directory.rs +++ b/src/directory/ram_directory.rs @@ -1,4 +1,3 @@ -use super::shared_vec_slice::SharedVecSlice; use common::make_io_err; use directory::error::{DeleteError, IOError, OpenReadError, OpenWriteError}; use directory::WritePtr; @@ -71,7 +70,7 @@ impl Write for VecWriter { } #[derive(Clone)] -struct InnerDirectory(Arc>>>>); +struct InnerDirectory(Arc>>); impl InnerDirectory { fn new() -> InnerDirectory { @@ -85,7 +84,7 @@ impl InnerDirectory { path )) })?; - let prev_value = map.insert(path, Arc::new(Vec::from(data))); + let prev_value = map.insert(path, ReadOnlySource::new(Vec::from(data))); Ok(prev_value.is_some()) } @@ -105,8 +104,7 @@ impl InnerDirectory { readable_map .get(path) .ok_or_else(|| OpenReadError::FileDoesNotExist(PathBuf::from(path))) - .map(Arc::clone) - .map(|data| ReadOnlySource::Anonymous(SharedVecSlice::new(data))) + .map(|el| el.clone()) }) } diff --git a/src/directory/read_only_source.rs b/src/directory/read_only_source.rs index 6ed2049e5..37d46d189 100644 --- a/src/directory/read_only_source.rs +++ b/src/directory/read_only_source.rs @@ -1,9 +1,8 @@ -use super::shared_vec_slice::SharedVecSlice; use common::HasLen; -#[cfg(feature = "mmap")] -use fst::raw::MmapReadOnly; use stable_deref_trait::{CloneStableDeref, StableDeref}; use std::ops::Deref; +use std::sync::Arc; + /// Read object that represents files in tantivy. /// @@ -11,12 +10,10 @@ use std::ops::Deref; /// the data in the form of a constant read-only `&[u8]`. /// Whatever happens to the directory file, the data /// hold by this object should never be altered or destroyed. -pub enum ReadOnlySource { - /// Mmap source of data - #[cfg(feature = "mmap")] - Mmap(MmapReadOnly), - /// Wrapping a `Vec` - Anonymous(SharedVecSlice), +pub struct ReadOnlySource { + data: Arc + Send + Sync + 'static>>, + start: usize, + stop: usize } unsafe impl StableDeref for ReadOnlySource {} @@ -30,19 +27,41 @@ impl Deref for ReadOnlySource { } } + +impl From + Send + Sync>>> for ReadOnlySource { + fn from(data: Arc + Send + Sync>>) -> Self { + let len = data.len(); + ReadOnlySource { + data, + start: 0, + stop: len + } + } +} + +const EMPTY_ARRAY: [u8; 0] = [0u8; 0]; + impl ReadOnlySource { + + /// Creates a new `ReadOnlySource`. + pub fn new(data: D) -> ReadOnlySource + where D: Deref + Send + Sync + 'static { + let len = data.len(); + ReadOnlySource { + data: Arc::new(Box::new(data)), + start: 0, + stop: len + } + } + /// Creates an empty ReadOnlySource pub fn empty() -> ReadOnlySource { - ReadOnlySource::Anonymous(SharedVecSlice::empty()) + ReadOnlySource::new(&EMPTY_ARRAY[..]) } /// Returns the data underlying the ReadOnlySource object. pub fn as_slice(&self) -> &[u8] { - match *self { - #[cfg(feature = "mmap")] - ReadOnlySource::Mmap(ref mmap_read_only) => mmap_read_only.as_slice(), - ReadOnlySource::Anonymous(ref shared_vec) => shared_vec.as_slice(), - } + &self.data[self.start..self.stop] } /// Splits into 2 `ReadOnlySource`, at the offset given @@ -63,22 +82,18 @@ impl ReadOnlySource { /// worth of data in anonymous memory, and only a /// 1KB slice is remaining, the whole `500MBs` /// are retained in memory. - pub fn slice(&self, from_offset: usize, to_offset: usize) -> ReadOnlySource { + pub fn slice(&self, start: usize, stop: usize) -> ReadOnlySource { assert!( - from_offset <= to_offset, + start <= stop, "Requested negative slice [{}..{}]", - from_offset, - to_offset + start, + stop ); - match *self { - #[cfg(feature = "mmap")] - ReadOnlySource::Mmap(ref mmap_read_only) => { - let sliced_mmap = mmap_read_only.range(from_offset, to_offset - from_offset); - ReadOnlySource::Mmap(sliced_mmap) - } - ReadOnlySource::Anonymous(ref shared_vec) => { - ReadOnlySource::Anonymous(shared_vec.slice(from_offset, to_offset)) - } + assert!(stop <= self.len()); + ReadOnlySource { + data: self.data.clone(), + start: self.start + start, + stop: self.start + stop } } @@ -87,8 +102,7 @@ impl ReadOnlySource { /// /// Equivalent to `.slice(from_offset, self.len())` pub fn slice_from(&self, from_offset: usize) -> ReadOnlySource { - let len = self.len(); - self.slice(from_offset, len) + self.slice(from_offset, self.len()) } /// Like `.slice(...)` but enforcing only the `to` @@ -102,19 +116,18 @@ impl ReadOnlySource { impl HasLen for ReadOnlySource { fn len(&self) -> usize { - self.as_slice().len() + self.stop - self.start } } impl Clone for ReadOnlySource { fn clone(&self) -> Self { - self.slice(0, self.len()) + self.slice_from(0) } } impl From> for ReadOnlySource { fn from(data: Vec) -> ReadOnlySource { - let shared_data = SharedVecSlice::from(data); - ReadOnlySource::Anonymous(shared_data) + ReadOnlySource::new(data) } -} +} \ No newline at end of file diff --git a/src/directory/shared_vec_slice.rs b/src/directory/shared_vec_slice.rs deleted file mode 100644 index 1a9157e14..000000000 --- a/src/directory/shared_vec_slice.rs +++ /dev/null @@ -1,41 +0,0 @@ -use std::sync::Arc; - -#[derive(Clone)] -pub struct SharedVecSlice { - pub data: Arc>, - pub start: usize, - pub len: usize, -} - -impl SharedVecSlice { - pub fn empty() -> SharedVecSlice { - SharedVecSlice::new(Arc::new(Vec::new())) - } - - pub fn new(data: Arc>) -> SharedVecSlice { - let data_len = data.len(); - SharedVecSlice { - data, - start: 0, - len: data_len, - } - } - - pub fn as_slice(&self) -> &[u8] { - &self.data[self.start..self.start + self.len] - } - - pub fn slice(&self, from_offset: usize, to_offset: usize) -> SharedVecSlice { - SharedVecSlice { - data: Arc::clone(&self.data), - start: self.start + from_offset, - len: to_offset - from_offset, - } - } -} - -impl From> for SharedVecSlice { - fn from(data: Vec) -> SharedVecSlice { - SharedVecSlice::new(Arc::new(data)) - } -} diff --git a/src/lib.rs b/src/lib.rs index 49fb977e4..4451aa84f 100755 --- a/src/lib.rs +++ b/src/lib.rs @@ -123,6 +123,8 @@ extern crate log; #[macro_use] extern crate failure; +#[cfg(feature = "mmap")] +extern crate memmap; #[cfg(feature = "mmap")] extern crate atomicwrites; extern crate base64; @@ -135,8 +137,7 @@ extern crate combine; extern crate crossbeam; extern crate fnv; -extern crate fst; -extern crate fst_regex; +extern crate tantivy_fst; extern crate futures; extern crate futures_cpupool; extern crate htmlescape; diff --git a/src/postings/segment_postings.rs b/src/postings/segment_postings.rs index 88f13ab7f..7e3a869f5 100644 --- a/src/postings/segment_postings.rs +++ b/src/postings/segment_postings.rs @@ -2,7 +2,7 @@ use common::BitSet; use common::HasLen; use common::{BinarySerializable, VInt}; use docset::{DocSet, SkipResult}; -use fst::Streamer; +use tantivy_fst::Streamer; use owned_read::OwnedRead; use positions::PositionReader; use postings::compression::compressed_block_size; @@ -628,7 +628,7 @@ mod tests { use common::HasLen; use core::Index; use docset::DocSet; - use fst::Streamer; + use tantivy_fst::Streamer; use schema::IndexRecordOption; use schema::Schema; use schema::Term; diff --git a/src/query/automaton_weight.rs b/src/query/automaton_weight.rs index e0963d605..a952ff6e7 100644 --- a/src/query/automaton_weight.rs +++ b/src/query/automaton_weight.rs @@ -1,6 +1,6 @@ use common::BitSet; use core::SegmentReader; -use fst::Automaton; +use tantivy_fst::Automaton; use query::BitSetDocSet; use query::ConstScorer; use query::{Scorer, Weight}; diff --git a/src/query/regex_query.rs b/src/query/regex_query.rs index caa8f080a..4905be987 100644 --- a/src/query/regex_query.rs +++ b/src/query/regex_query.rs @@ -1,5 +1,5 @@ use error::TantivyError; -use fst_regex::Regex; +use tantivy_fst::Regex; use query::{AutomatonWeight, Query, Weight}; use schema::Field; use std::clone::Clone; diff --git a/src/termdict/streamer.rs b/src/termdict/streamer.rs index f1dc74532..2f6816eed 100644 --- a/src/termdict/streamer.rs +++ b/src/termdict/streamer.rs @@ -1,8 +1,8 @@ use super::TermDictionary; -use fst::automaton::AlwaysMatch; -use fst::map::{Stream, StreamBuilder}; -use fst::Automaton; -use fst::{IntoStreamer, Streamer}; +use tantivy_fst::automaton::AlwaysMatch; +use tantivy_fst::map::{Stream, StreamBuilder}; +use tantivy_fst::Automaton; +use tantivy_fst::{IntoStreamer, Streamer}; use postings::TermInfo; use termdict::TermOrdinal; diff --git a/src/termdict/termdict.rs b/src/termdict/termdict.rs index b63bb54d0..0f6308d1f 100644 --- a/src/termdict/termdict.rs +++ b/src/termdict/termdict.rs @@ -3,15 +3,15 @@ use super::{TermStreamer, TermStreamerBuilder}; use common::BinarySerializable; use common::CountingWriter; use directory::ReadOnlySource; -use fst; -use fst::raw::Fst; -use fst::Automaton; +use tantivy_fst; +use tantivy_fst::raw::Fst; +use tantivy_fst::Automaton; use postings::TermInfo; use schema::FieldType; use std::io::{self, Write}; use termdict::TermOrdinal; -fn convert_fst_error(e: fst::Error) -> io::Error { +fn convert_fst_error(e: tantivy_fst::Error) -> io::Error { io::Error::new(io::ErrorKind::Other, e) } @@ -19,7 +19,7 @@ fn convert_fst_error(e: fst::Error) -> io::Error { /// /// Inserting must be done in the order of the `keys`. pub struct TermDictionaryBuilder { - fst_builder: fst::MapBuilder, + fst_builder: tantivy_fst::MapBuilder, term_info_store_writer: TermInfoStoreWriter, term_ord: u64, } @@ -30,7 +30,7 @@ where { /// Creates a new `TermDictionaryBuilder` pub fn create(w: W, _field_type: &FieldType) -> io::Result { - let fst_builder = fst::MapBuilder::new(w).map_err(convert_fst_error)?; + let fst_builder = tantivy_fst::MapBuilder::new(w).map_err(convert_fst_error)?; Ok(TermDictionaryBuilder { fst_builder, term_info_store_writer: TermInfoStoreWriter::new(), @@ -87,17 +87,9 @@ where } } -fn open_fst_index(source: ReadOnlySource) -> fst::Map { - let fst = match source { - ReadOnlySource::Anonymous(data) => { - Fst::from_shared_bytes(data.data, data.start, data.len).expect("FST data is corrupted") - } - #[cfg(feature = "mmap")] - ReadOnlySource::Mmap(mmap_readonly) => { - Fst::from_mmap(mmap_readonly).expect("FST data is corrupted") - } - }; - fst::Map::from(fst) +fn open_fst_index(source: ReadOnlySource) -> tantivy_fst::Map { + let fst = Fst::new(source).expect("FST data is corrupted"); + tantivy_fst::Map::from(fst) } /// The term dictionary contains all of the terms in @@ -107,7 +99,7 @@ fn open_fst_index(source: ReadOnlySource) -> fst::Map { /// respective `TermOrdinal`. The `TermInfoStore` then makes it /// possible to fetch the associated `TermInfo`. pub struct TermDictionary { - fst_index: fst::Map, + fst_index: tantivy_fst::Map, term_info_store: TermInfoStore, } @@ -136,7 +128,7 @@ impl TermDictionary { .expect("Creating a TermDictionaryBuilder in a Vec should never fail") .finish() .expect("Writing in a Vec should never fail"); - let source = ReadOnlySource::from(term_dictionary_data); + let source = ReadOnlySource::new(term_dictionary_data); Self::from_source(&source) }