Compare commits

...

5 Commits

Author SHA1 Message Date
Paul Masurel
dc769b373b Closes #500 2019-02-22 08:59:11 +09:00
Paul Masurel
5f07dc35d8 32bits platforms 2019-02-14 09:12:25 +09:00
Paul Masurel
176f67a266 Refactoring 2019-01-23 10:06:40 +09:00
Paul Masurel
19babff849 Closes #476 2019-01-23 10:06:39 +09:00
Paul Masurel
bf2576adf9 Added a broken unit test 2019-01-23 10:04:27 +09:00
17 changed files with 305 additions and 244 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "tantivy"
version = "0.8.0"
version = "0.8.3"
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
license = "MIT"
categories = ["database-implementations", "data-structures"]
@@ -16,8 +16,8 @@ base64 = "0.10.0"
byteorder = "1.0"
lazy_static = "1"
regex = "1.0"
fst = {version="0.3", default-features=false}
fst-regex = { version="0.2" }
tantivy-fst = {path="../tantivy-search/fst", version="0.1"}
memmap = "0.7"
lz4 = {version="1.20", optional=true}
snap = {version="0.2"}
atomicwrites = {version="0.2.2", optional=true}
@@ -30,7 +30,7 @@ serde_derive = "1.0"
serde_json = "1.0"
num_cpus = "1.2"
itertools = "0.8"
levenshtein_automata = {version="0.1", features=["fst_automaton"]}
levenshtein_automata = {version="0.1"}
bit-set = "0.5"
uuid = { version = "0.7", features = ["v4", "serde"] }
crossbeam = "0.5"
@@ -70,7 +70,7 @@ overflow-checks = true
[features]
# by default no-fail is disabled. We manually enable it when running test.
default = ["mmap", "no_fail"]
mmap = ["fst/mmap", "atomicwrites"]
mmap = ["atomicwrites"]
lz4-compression = ["lz4"]
no_fail = ["fail/no_fail"]
unstable = [] # useful for benches.

View File

@@ -150,7 +150,7 @@ impl Index {
///
/// This will overwrite existing meta.json
fn from_directory(mut directory: ManagedDirectory, schema: Schema) -> Result<Index> {
save_new_metas(schema.clone(), 0, directory.borrow_mut())?;
save_new_metas(schema.clone(), directory.borrow_mut())?;
let metas = IndexMeta::with_schema(schema);
Index::create_from_metas(directory, &metas)
}

View File

@@ -1,12 +1,9 @@
use atomicwrites;
use common::make_io_err;
use directory::error::{DeleteError, IOError, OpenDirectoryError, OpenReadError, OpenWriteError};
use directory::shared_vec_slice::SharedVecSlice;
use directory::Directory;
use directory::ReadOnlySource;
use directory::WritePtr;
use fst::raw::MmapReadOnly;
use std::collections::hash_map::Entry as HashMapEntry;
use std::collections::HashMap;
use std::convert::From;
use std::fmt;
@@ -19,11 +16,14 @@ use std::result;
use std::sync::Arc;
use std::sync::RwLock;
use tempdir::TempDir;
use memmap::Mmap;
use std::sync::Weak;
use std::ops::Deref;
/// Returns None iff the file exists, can be read, but is empty (and hence
/// cannot be mmapped).
///
fn open_mmap(full_path: &Path) -> result::Result<Option<MmapReadOnly>, OpenReadError> {
fn open_mmap(full_path: &Path) -> result::Result<Option<Mmap>, OpenReadError> {
let file = File::open(full_path).map_err(|e| {
if e.kind() == io::ErrorKind::NotFound {
OpenReadError::FileDoesNotExist(full_path.to_owned())
@@ -42,7 +42,7 @@ fn open_mmap(full_path: &Path) -> result::Result<Option<MmapReadOnly>, OpenReadE
return Ok(None);
}
unsafe {
MmapReadOnly::open(&file)
memmap::Mmap::map(&file)
.map(Some)
.map_err(|e| From::from(IOError::with_path(full_path.to_owned(), e)))
}
@@ -65,7 +65,7 @@ pub struct CacheInfo {
struct MmapCache {
counters: CacheCounters,
cache: HashMap<PathBuf, MmapReadOnly>,
cache: HashMap<PathBuf, Weak<Box<Deref<Target=[u8]> + Send + Sync>>>,
}
impl Default for MmapCache {
@@ -78,10 +78,6 @@ impl Default for MmapCache {
}
impl MmapCache {
/// Removes a `MmapReadOnly` entry from the mmap cache.
fn discard_from_cache(&mut self, full_path: &Path) -> bool {
self.cache.remove(full_path).is_some()
}
fn get_info(&mut self) -> CacheInfo {
let paths: Vec<PathBuf> = self.cache.keys().cloned().collect();
@@ -91,23 +87,27 @@ impl MmapCache {
}
}
fn get_mmap(&mut self, full_path: &Path) -> Result<Option<MmapReadOnly>, OpenReadError> {
Ok(match self.cache.entry(full_path.to_owned()) {
HashMapEntry::Occupied(occupied_entry) => {
let mmap = occupied_entry.get();
self.counters.hit += 1;
Some(mmap.clone())
}
HashMapEntry::Vacant(vacant_entry) => {
self.counters.miss += 1;
if let Some(mmap) = open_mmap(full_path)? {
vacant_entry.insert(mmap.clone());
Some(mmap)
} else {
None
// Returns None if the file exists but as a len of 0 (and hence is not mmappable).
fn get_mmap(&mut self, full_path: &Path) -> Result<Option<Arc<Box<Deref<Target=[u8]> + Send + Sync>>>, OpenReadError> {
let path_in_cache = self.cache.contains_key(full_path);
if path_in_cache {
{
let mmap_weak_opt = self.cache.get(full_path);
if let Some(mmap_arc) = mmap_weak_opt.and_then(|mmap_weak| mmap_weak.upgrade()) {
self.counters.hit += 1;
return Ok(Some(mmap_arc));
}
}
})
self.cache.remove(full_path);
}
self.counters.miss += 1;
if let Some(mmap) = open_mmap(full_path)? {
let res: Arc<Box<Deref<Target=[u8]> + Send + Sync>> = Arc::new(Box::new(mmap));
self.cache.insert(full_path.to_owned(), Arc::downgrade(&res));
Ok(Some(res))
} else {
Ok(None)
}
}
}
@@ -253,11 +253,10 @@ impl Directory for MmapDirectory {
);
IOError::with_path(path.to_owned(), make_io_err(msg))
})?;
Ok(mmap_cache
.get_mmap(&full_path)?
.map(ReadOnlySource::Mmap)
.unwrap_or_else(|| ReadOnlySource::Anonymous(SharedVecSlice::empty())))
.map(ReadOnlySource::from)
.unwrap_or_else(|| ReadOnlySource::empty()))
}
fn open_write(&mut self, path: &Path) -> Result<WritePtr, OpenWriteError> {
@@ -295,20 +294,6 @@ impl Directory for MmapDirectory {
fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
debug!("Deleting file {:?}", path);
let full_path = self.resolve_path(path);
let mut mmap_cache = self.mmap_cache.write().map_err(|_| {
let msg = format!(
"Failed to acquired write lock \
on mmap cache while deleting {:?}",
path
);
IOError::with_path(path.to_owned(), make_io_err(msg))
})?;
mmap_cache.discard_from_cache(path);
// Removing the entry in the MMap cache.
// The munmap will appear on Drop,
// when the last reference is gone.
mmap_cache.cache.remove(&full_path);
match fs::remove_file(&full_path) {
Ok(_) => self
.sync_directory()
@@ -403,25 +388,50 @@ mod tests {
w.flush().unwrap();
}
}
{
for (i, path) in paths.iter().enumerate() {
let _r = mmap_directory.open_read(path).unwrap();
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), i + 1);
}
for path in paths.iter() {
let _r = mmap_directory.open_read(path).unwrap();
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), num_paths);
}
for (i, path) in paths.iter().enumerate() {
mmap_directory.delete(path).unwrap();
assert_eq!(
mmap_directory.get_cache_info().mmapped.len(),
num_paths - i - 1
);
}
let mut keep = vec![];
for (i, path) in paths.iter().enumerate() {
keep.push(mmap_directory.open_read(path).unwrap());
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), i + 1);
}
assert_eq!(mmap_directory.get_cache_info().counters.hit, 0);
assert_eq!(mmap_directory.get_cache_info().counters.miss, 10);
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 10);
for path in paths.iter() {
let _r = mmap_directory.open_read(path).unwrap();
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), num_paths);
}
assert_eq!(mmap_directory.get_cache_info().counters.hit, 10);
assert_eq!(mmap_directory.get_cache_info().counters.miss, 10);
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 10);
for path in paths.iter() {
let _r = mmap_directory.open_read(path).unwrap();
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), num_paths);
}
assert_eq!(mmap_directory.get_cache_info().counters.hit, 20);
assert_eq!(mmap_directory.get_cache_info().counters.miss, 10);
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 10);
drop(keep);
for path in paths.iter() {
let _r = mmap_directory.open_read(path).unwrap();
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), num_paths);
}
assert_eq!(mmap_directory.get_cache_info().counters.hit, 20);
assert_eq!(mmap_directory.get_cache_info().counters.miss, 20);
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 10);
for path in &paths {
mmap_directory.delete(path).unwrap();
}
assert_eq!(mmap_directory.get_cache_info().counters.hit, 20);
assert_eq!(mmap_directory.get_cache_info().counters.miss, 20);
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 10);
for path in paths.iter() {
assert!(mmap_directory.open_read(path).is_err());
}
assert_eq!(mmap_directory.get_cache_info().counters.hit, 20);
assert_eq!(mmap_directory.get_cache_info().counters.miss, 30);
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 0);
}

View File

@@ -11,7 +11,6 @@ mod directory;
mod managed_directory;
mod ram_directory;
mod read_only_source;
mod shared_vec_slice;
/// Errors specific to the directory module.
pub mod error;

View File

@@ -1,4 +1,3 @@
use super::shared_vec_slice::SharedVecSlice;
use common::make_io_err;
use directory::error::{DeleteError, IOError, OpenReadError, OpenWriteError};
use directory::WritePtr;
@@ -71,7 +70,7 @@ impl Write for VecWriter {
}
#[derive(Clone)]
struct InnerDirectory(Arc<RwLock<HashMap<PathBuf, Arc<Vec<u8>>>>>);
struct InnerDirectory(Arc<RwLock<HashMap<PathBuf, ReadOnlySource>>>);
impl InnerDirectory {
fn new() -> InnerDirectory {
@@ -85,7 +84,7 @@ impl InnerDirectory {
path
))
})?;
let prev_value = map.insert(path, Arc::new(Vec::from(data)));
let prev_value = map.insert(path, ReadOnlySource::new(Vec::from(data)));
Ok(prev_value.is_some())
}
@@ -105,8 +104,7 @@ impl InnerDirectory {
readable_map
.get(path)
.ok_or_else(|| OpenReadError::FileDoesNotExist(PathBuf::from(path)))
.map(Arc::clone)
.map(|data| ReadOnlySource::Anonymous(SharedVecSlice::new(data)))
.map(|el| el.clone())
})
}

View File

@@ -1,9 +1,8 @@
use super::shared_vec_slice::SharedVecSlice;
use common::HasLen;
#[cfg(feature = "mmap")]
use fst::raw::MmapReadOnly;
use stable_deref_trait::{CloneStableDeref, StableDeref};
use std::ops::Deref;
use std::sync::Arc;
/// Read object that represents files in tantivy.
///
@@ -11,12 +10,10 @@ use std::ops::Deref;
/// the data in the form of a constant read-only `&[u8]`.
/// Whatever happens to the directory file, the data
/// hold by this object should never be altered or destroyed.
pub enum ReadOnlySource {
/// Mmap source of data
#[cfg(feature = "mmap")]
Mmap(MmapReadOnly),
/// Wrapping a `Vec<u8>`
Anonymous(SharedVecSlice),
pub struct ReadOnlySource {
data: Arc<Box<Deref<Target=[u8]> + Send + Sync + 'static>>,
start: usize,
stop: usize
}
unsafe impl StableDeref for ReadOnlySource {}
@@ -30,19 +27,41 @@ impl Deref for ReadOnlySource {
}
}
impl From<Arc<Box<Deref<Target=[u8]> + Send + Sync>>> for ReadOnlySource {
fn from(data: Arc<Box<Deref<Target=[u8]> + Send + Sync>>) -> Self {
let len = data.len();
ReadOnlySource {
data,
start: 0,
stop: len
}
}
}
const EMPTY_ARRAY: [u8; 0] = [0u8; 0];
impl ReadOnlySource {
/// Creates a new `ReadOnlySource`.
pub fn new<D>(data: D) -> ReadOnlySource
where D: Deref<Target=[u8]> + Send + Sync + 'static {
let len = data.len();
ReadOnlySource {
data: Arc::new(Box::new(data)),
start: 0,
stop: len
}
}
/// Creates an empty ReadOnlySource
pub fn empty() -> ReadOnlySource {
ReadOnlySource::Anonymous(SharedVecSlice::empty())
ReadOnlySource::new(&EMPTY_ARRAY[..])
}
/// Returns the data underlying the ReadOnlySource object.
pub fn as_slice(&self) -> &[u8] {
match *self {
#[cfg(feature = "mmap")]
ReadOnlySource::Mmap(ref mmap_read_only) => mmap_read_only.as_slice(),
ReadOnlySource::Anonymous(ref shared_vec) => shared_vec.as_slice(),
}
&self.data[self.start..self.stop]
}
/// Splits into 2 `ReadOnlySource`, at the offset given
@@ -63,22 +82,18 @@ impl ReadOnlySource {
/// worth of data in anonymous memory, and only a
/// 1KB slice is remaining, the whole `500MBs`
/// are retained in memory.
pub fn slice(&self, from_offset: usize, to_offset: usize) -> ReadOnlySource {
pub fn slice(&self, start: usize, stop: usize) -> ReadOnlySource {
assert!(
from_offset <= to_offset,
start <= stop,
"Requested negative slice [{}..{}]",
from_offset,
to_offset
start,
stop
);
match *self {
#[cfg(feature = "mmap")]
ReadOnlySource::Mmap(ref mmap_read_only) => {
let sliced_mmap = mmap_read_only.range(from_offset, to_offset - from_offset);
ReadOnlySource::Mmap(sliced_mmap)
}
ReadOnlySource::Anonymous(ref shared_vec) => {
ReadOnlySource::Anonymous(shared_vec.slice(from_offset, to_offset))
}
assert!(stop <= self.len());
ReadOnlySource {
data: self.data.clone(),
start: self.start + start,
stop: self.start + stop
}
}
@@ -87,8 +102,7 @@ impl ReadOnlySource {
///
/// Equivalent to `.slice(from_offset, self.len())`
pub fn slice_from(&self, from_offset: usize) -> ReadOnlySource {
let len = self.len();
self.slice(from_offset, len)
self.slice(from_offset, self.len())
}
/// Like `.slice(...)` but enforcing only the `to`
@@ -102,19 +116,18 @@ impl ReadOnlySource {
impl HasLen for ReadOnlySource {
fn len(&self) -> usize {
self.as_slice().len()
self.stop - self.start
}
}
impl Clone for ReadOnlySource {
fn clone(&self) -> Self {
self.slice(0, self.len())
self.slice_from(0)
}
}
impl From<Vec<u8>> for ReadOnlySource {
fn from(data: Vec<u8>) -> ReadOnlySource {
let shared_data = SharedVecSlice::from(data);
ReadOnlySource::Anonymous(shared_data)
ReadOnlySource::new(data)
}
}
}

View File

@@ -1,41 +0,0 @@
use std::sync::Arc;
#[derive(Clone)]
pub struct SharedVecSlice {
pub data: Arc<Vec<u8>>,
pub start: usize,
pub len: usize,
}
impl SharedVecSlice {
pub fn empty() -> SharedVecSlice {
SharedVecSlice::new(Arc::new(Vec::new()))
}
pub fn new(data: Arc<Vec<u8>>) -> SharedVecSlice {
let data_len = data.len();
SharedVecSlice {
data,
start: 0,
len: data_len,
}
}
pub fn as_slice(&self) -> &[u8] {
&self.data[self.start..self.start + self.len]
}
pub fn slice(&self, from_offset: usize, to_offset: usize) -> SharedVecSlice {
SharedVecSlice {
data: Arc::clone(&self.data),
start: self.start + from_offset,
len: to_offset - from_offset,
}
}
}
impl From<Vec<u8>> for SharedVecSlice {
fn from(data: Vec<u8>) -> SharedVecSlice {
SharedVecSlice::new(Arc::new(data))
}
}

View File

@@ -558,11 +558,8 @@ impl IndexWriter {
// and recreate a new one channels.
self.recreate_document_channel();
let mut former_workers_join_handle = Vec::new();
swap(
&mut former_workers_join_handle,
&mut self.workers_join_handle,
);
let former_workers_join_handle =
mem::replace(&mut self.workers_join_handle, Vec::new());
for worker_handle in former_workers_join_handle {
let indexing_worker_result = worker_handle
@@ -739,7 +736,7 @@ mod tests {
index_writer.add_document(doc!(text_field=>"b"));
index_writer.add_document(doc!(text_field=>"c"));
}
assert_eq!(index_writer.commit().unwrap(), 2u64);
assert_eq!(index_writer.commit().unwrap(), 3u64);
index.load_searchers().unwrap();
assert_eq!(num_docs_containing("a"), 0);
assert_eq!(num_docs_containing("b"), 1);
@@ -802,7 +799,6 @@ mod tests {
{
let mut prepared_commit = index_writer.prepare_commit().expect("commit failed");
prepared_commit.set_payload("first commit");
assert_eq!(prepared_commit.opstamp(), 100);
prepared_commit.commit().expect("commit failed");
}
{
@@ -836,7 +832,6 @@ mod tests {
{
let mut prepared_commit = index_writer.prepare_commit().expect("commit failed");
prepared_commit.set_payload("first commit");
assert_eq!(prepared_commit.opstamp(), 100);
prepared_commit.abort().expect("commit failed");
}
{

View File

@@ -654,6 +654,7 @@ mod tests {
use schema::IntOptions;
use schema::Term;
use schema::TextFieldIndexing;
use schema::INT_INDEXED;
use std::io::Cursor;
use DocAddress;
use IndexWriter;
@@ -983,7 +984,7 @@ mod tests {
.wait()
.expect("Merging failed");
index.load_searchers().unwrap();
let ref searcher = *index.searcher();
let searcher = index.searcher();
assert_eq!(searcher.segment_readers().len(), 1);
assert_eq!(searcher.num_docs(), 3);
assert_eq!(searcher.segment_readers()[0].num_docs(), 3);
@@ -1029,7 +1030,7 @@ mod tests {
index_writer.commit().unwrap();
index.load_searchers().unwrap();
let ref searcher = *index.searcher();
let searcher = index.searcher();
assert_eq!(searcher.segment_readers().len(), 1);
assert_eq!(searcher.num_docs(), 2);
assert_eq!(searcher.segment_readers()[0].num_docs(), 2);
@@ -1125,6 +1126,7 @@ mod tests {
{
// Test removing all docs
index_writer.delete_term(Term::from_field_text(text_field, "g"));
index_writer.commit().unwrap();
let segment_ids = index
.searchable_segment_ids()
.expect("Searchable segments failed.");
@@ -1255,6 +1257,34 @@ mod tests {
}
}
#[test]
fn test_bug_merge() {
let mut schema_builder = schema::Schema::builder();
let int_field = schema_builder.add_u64_field("intvals", INT_INDEXED);
let index = Index::create_in_ram(schema_builder.build());
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
index_writer.add_document(doc!(int_field => 1u64));
index_writer.commit().expect("commit failed");
index_writer.add_document(doc!(int_field => 1u64));
index_writer.commit().expect("commit failed");
index.load_searchers().unwrap();
let searcher = index.searcher();
assert_eq!(searcher.num_docs(), 2);
index_writer.delete_term(Term::from_field_u64(int_field, 1));
let segment_ids = index
.searchable_segment_ids()
.expect("Searchable segments failed.");
index_writer
.merge(&segment_ids)
.expect("Failed to initiate merge")
.wait()
.expect("Merging failed");
index.load_searchers().unwrap();
// commit has not been called yet. The document should still be
// there.
assert_eq!(index.searcher().num_docs(), 2);
}
#[test]
fn test_merge_multivalued_int_fields_all_deleted() {
let mut schema_builder = schema::Schema::builder();

View File

@@ -18,7 +18,6 @@ use indexer::delete_queue::DeleteCursor;
use indexer::index_writer::advance_deletes;
use indexer::merger::IndexMerger;
use indexer::stamper::Stamper;
use indexer::MergeCandidate;
use indexer::SegmentEntry;
use indexer::SegmentSerializer;
use indexer::{DefaultMergePolicy, MergePolicy};
@@ -45,8 +44,15 @@ use Result;
/// and flushed.
///
/// This method is not part of tantivy's public API
pub fn save_new_metas(schema: Schema, opstamp: u64, directory: &mut Directory) -> Result<()> {
save_metas(vec![], schema, opstamp, None, directory)
pub fn save_new_metas(schema: Schema, directory: &mut Directory) -> Result<()> {
save_metas(
&IndexMeta {
segments: Vec::new(),
schema,
opstamp: 0u64,
payload: None
},
directory)
}
/// Save the index meta file.
@@ -58,20 +64,17 @@ pub fn save_new_metas(schema: Schema, opstamp: u64, directory: &mut Directory) -
/// and flushed.
///
/// This method is not part of tantivy's public API
pub fn save_metas(
segment_metas: Vec<SegmentMeta>,
schema: Schema,
opstamp: u64,
payload: Option<String>,
fn save_metas(
metas: &IndexMeta,
directory: &mut Directory,
) -> Result<()> {
let metas = IndexMeta {
segments: segment_metas,
schema,
opstamp,
payload,
};
let mut buffer = serde_json::to_vec_pretty(&metas)?;
// let metas = IndexMeta {
// segments: segment_metas,
// schema,
// opstamp,
// payload,
// };
let mut buffer = serde_json::to_vec_pretty(metas)?;
writeln!(&mut buffer)?;
directory.atomic_write(&META_FILEPATH, &buffer[..])?;
debug!("Saved metas {:?}", serde_json::to_string_pretty(&metas));
@@ -86,6 +89,11 @@ pub fn save_metas(
#[derive(Clone)]
pub struct SegmentUpdater(Arc<InnerSegmentUpdater>);
struct MergeOperation {
pub target_opstamp: u64,
pub segment_ids: Vec<SegmentId>,
}
fn perform_merge(
index: &Index,
mut segment_entries: Vec<SegmentEntry>,
@@ -126,6 +134,13 @@ fn perform_merge(
}
struct InnerSegmentUpdater {
// we keep a copy of the current active IndexMeta to
// avoid loading the file everytime we need it in the
// `SegmentUpdater`.
//
// This should be up to date as all update happen through
// the unique active `SegmentUpdater`.
active_metas: RwLock<Arc<IndexMeta>>,
pool: CpuPool,
index: Index,
segment_manager: SegmentManager,
@@ -149,7 +164,9 @@ impl SegmentUpdater {
.name_prefix("segment_updater")
.pool_size(1)
.create();
let index_meta = index.load_metas()?;
Ok(SegmentUpdater(Arc::new(InnerSegmentUpdater {
active_metas: RwLock::new(Arc::new(index_meta)),
pool,
index,
segment_manager,
@@ -244,14 +261,18 @@ impl SegmentUpdater {
//
// Segment 1 from disk 1, Segment 1 from disk 2, etc.
commited_segment_metas.sort_by_key(|segment_meta| -(segment_meta.max_doc() as i32));
save_metas(
commited_segment_metas,
index.schema(),
let index_meta = IndexMeta {
segments: commited_segment_metas,
schema: index.schema(),
opstamp,
commit_message,
payload: commit_message
};
save_metas(
&index_meta,
directory.box_clone().borrow_mut(),
)
.expect("Could not save metas.");
self.store_meta(&index_meta);
}
}
@@ -286,16 +307,27 @@ impl SegmentUpdater {
}
pub fn start_merge(&self, segment_ids: &[SegmentId]) -> Result<Receiver<SegmentMeta>> {
//let future_merged_segment = */
let segment_ids_vec = segment_ids.to_vec();
let commit_opstamp = self.load_metas().opstamp;
self.run_async(move |segment_updater| {
segment_updater.start_merge_impl(&segment_ids_vec[..])
segment_updater.start_merge_impl(&segment_ids_vec[..], commit_opstamp)
})
.wait()?
}
fn store_meta(&self, index_meta: &IndexMeta) {
*self.0.active_metas.write().unwrap() = Arc::new(index_meta.clone());
}
fn load_metas(&self) -> Arc<IndexMeta> {
self.0.active_metas.read().unwrap().clone()
}
// `segment_ids` is required to be non-empty.
fn start_merge_impl(&self, segment_ids: &[SegmentId]) -> Result<Receiver<SegmentMeta>> {
fn start_merge_impl(
&self,
segment_ids: &[SegmentId],
target_opstamp: u64,
) -> Result<Receiver<SegmentMeta>> {
assert!(!segment_ids.is_empty(), "Segment_ids cannot be empty.");
let segment_updater_clone = self.clone();
@@ -310,8 +342,6 @@ impl SegmentUpdater {
);
let (merging_future_send, merging_future_recv) = oneshot();
let target_opstamp = self.0.stamper.stamp();
// first we need to apply deletes to our segment.
let merging_join_handle = thread::Builder::new()
.name(format!("mergingthread-{}", merging_thread_id))
@@ -373,11 +403,32 @@ impl SegmentUpdater {
// Committed segments cannot be merged with uncommitted_segments.
// We therefore consider merges using these two sets of segments independently.
let merge_policy = self.get_merge_policy();
let mut merge_candidates = merge_policy.compute_merge_candidates(&uncommitted_segments);
let committed_merge_candidates = merge_policy.compute_merge_candidates(&committed_segments);
merge_candidates.extend_from_slice(&committed_merge_candidates[..]);
for MergeCandidate(segment_metas) in merge_candidates {
match self.start_merge_impl(&segment_metas) {
let current_opstamp = self.0.stamper.stamp();
let mut merge_candidates = merge_policy
.compute_merge_candidates(&uncommitted_segments)
.into_iter()
.map(|merge_candidate| MergeOperation {
target_opstamp: current_opstamp,
segment_ids: merge_candidate.0,
})
.collect::<Vec<_>>();
let commit_opstamp = self.load_metas().opstamp;
let committed_merge_candidates = merge_policy
.compute_merge_candidates(&committed_segments)
.into_iter()
.map(|merge_candidate| MergeOperation {
target_opstamp: commit_opstamp,
segment_ids: merge_candidate.0,
})
.collect::<Vec<_>>();
merge_candidates.extend(committed_merge_candidates.into_iter());
for MergeOperation {
target_opstamp,
segment_ids,
} in merge_candidates
{
match self.start_merge_impl(&segment_ids, target_opstamp) {
Ok(merge_future) => {
if let Err(e) = merge_future.fuse().poll() {
error!("The merge task failed quickly after starting: {:?}", e);
@@ -412,12 +463,7 @@ impl SegmentUpdater {
info!("End merge {:?}", after_merge_segment_entry.meta());
let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone();
if let Some(delete_operation) = delete_cursor.get() {
let committed_opstamp = segment_updater
.0
.index
.load_metas()
.expect("Failed to read opstamp")
.opstamp;
let committed_opstamp = segment_updater.load_metas().opstamp;
if delete_operation.opstamp < committed_opstamp {
let index = &segment_updater.0.index;
let segment = index.segment(after_merge_segment_entry.meta().clone());
@@ -446,8 +492,8 @@ impl SegmentUpdater {
.end_merge(&before_merge_segment_ids, after_merge_segment_entry);
segment_updater.consider_merge_options();
info!("save metas");
let previous_metas = segment_updater.0.index.load_metas().unwrap();
segment_updater.save_metas(previous_metas.opstamp, previous_metas.payload);
let previous_metas = segment_updater.load_metas();
segment_updater.save_metas(previous_metas.opstamp, previous_metas.payload.clone());
segment_updater.garbage_collect_files_exec();
})
.wait()

View File

@@ -1,50 +1,68 @@
use std::sync::Arc;
use std::sync::atomic::Ordering;
// AtomicU64 have not landed in stable.
// For the moment let's just use AtomicUsize on
// x86/64 bit platform, and a mutex on other platform.
#[cfg(target = "x86_64")]
#[cfg(target_arch = "x86_64")]
mod archicture_impl {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
#[derive(Clone, Default)]
pub struct Stamper(Arc<AtomicU64>);
#[derive(Default)]
pub struct AtomicU64Ersatz(AtomicUsize);
impl Stamper {
pub fn new(first_opstamp: u64) -> Stamper {
Stamper(Arc::new(AtomicU64::new(first_opstamp)))
impl AtomicU64Ersatz {
pub fn new(first_opstamp: u64) -> AtomicU64Ersatz {
AtomicU64Ersatz(AtomicUsize::new(first_opstamp as usize))
}
pub fn stamp(&self) -> u64 {
self.0.fetch_add(1u64, Ordering::SeqCst) as u64
pub fn fetch_add(&self, val: u64, order: Ordering) -> u64 {
self.0.fetch_add(val as usize, order) as u64
}
}
}
#[cfg(not(target = "x86_64"))]
#[cfg(not(target_arch = "x86_64"))]
mod archicture_impl {
use std::sync::{Arc, Mutex};
use std::sync::atomic::Ordering;
/// Under other architecture, we rely on a mutex.
use std::sync::RwLock;
#[derive(Clone, Default)]
pub struct Stamper(Arc<Mutex<u64>>);
#[derive(Default)]
pub struct AtomicU64Ersatz(RwLock<u64>);
impl Stamper {
pub fn new(first_opstamp: u64) -> Stamper {
Stamper(Arc::new(Mutex::new(first_opstamp)))
impl AtomicU64Ersatz {
pub fn new(first_opstamp: u64) -> AtomicU64Ersatz {
AtomicU64Ersatz(RwLock::new(first_opstamp))
}
pub fn stamp(&self) -> u64 {
let mut guard = self.0.lock().expect("Failed to lock the stamper");
let previous_val = *guard;
*guard = previous_val + 1;
pub fn fetch_add(&self, incr: u64, _order: Ordering) -> u64 {
let mut lock = self.0.write().unwrap();
let previous_val = *lock;
*lock = previous_val + incr;
previous_val
}
}
}
pub use self::archicture_impl::Stamper;
use self::archicture_impl::AtomicU64Ersatz;
#[derive(Clone, Default)]
pub struct Stamper(Arc<AtomicU64Ersatz>);
impl Stamper {
pub fn new(first_opstamp: u64) -> Stamper {
Stamper(Arc::new(AtomicU64Ersatz::new(first_opstamp)))
}
pub fn stamp(&self) -> u64 {
self.0.fetch_add(1u64, Ordering::SeqCst) as u64
}
}
#[cfg(test)]
mod test {

View File

@@ -123,6 +123,8 @@ extern crate log;
#[macro_use]
extern crate failure;
#[cfg(feature = "mmap")]
extern crate memmap;
#[cfg(feature = "mmap")]
extern crate atomicwrites;
extern crate base64;
@@ -135,8 +137,7 @@ extern crate combine;
extern crate crossbeam;
extern crate fnv;
extern crate fst;
extern crate fst_regex;
extern crate tantivy_fst;
extern crate futures;
extern crate futures_cpupool;
extern crate htmlescape;

View File

@@ -2,7 +2,7 @@ use common::BitSet;
use common::HasLen;
use common::{BinarySerializable, VInt};
use docset::{DocSet, SkipResult};
use fst::Streamer;
use tantivy_fst::Streamer;
use owned_read::OwnedRead;
use positions::PositionReader;
use postings::compression::compressed_block_size;
@@ -628,7 +628,7 @@ mod tests {
use common::HasLen;
use core::Index;
use docset::DocSet;
use fst::Streamer;
use tantivy_fst::Streamer;
use schema::IndexRecordOption;
use schema::Schema;
use schema::Term;

View File

@@ -1,6 +1,6 @@
use common::BitSet;
use core::SegmentReader;
use fst::Automaton;
use tantivy_fst::Automaton;
use query::BitSetDocSet;
use query::ConstScorer;
use query::{Scorer, Weight};

View File

@@ -1,5 +1,5 @@
use error::TantivyError;
use fst_regex::Regex;
use tantivy_fst::Regex;
use query::{AutomatonWeight, Query, Weight};
use schema::Field;
use std::clone::Clone;

View File

@@ -1,8 +1,8 @@
use super::TermDictionary;
use fst::automaton::AlwaysMatch;
use fst::map::{Stream, StreamBuilder};
use fst::Automaton;
use fst::{IntoStreamer, Streamer};
use tantivy_fst::automaton::AlwaysMatch;
use tantivy_fst::map::{Stream, StreamBuilder};
use tantivy_fst::Automaton;
use tantivy_fst::{IntoStreamer, Streamer};
use postings::TermInfo;
use termdict::TermOrdinal;

View File

@@ -3,15 +3,15 @@ use super::{TermStreamer, TermStreamerBuilder};
use common::BinarySerializable;
use common::CountingWriter;
use directory::ReadOnlySource;
use fst;
use fst::raw::Fst;
use fst::Automaton;
use tantivy_fst;
use tantivy_fst::raw::Fst;
use tantivy_fst::Automaton;
use postings::TermInfo;
use schema::FieldType;
use std::io::{self, Write};
use termdict::TermOrdinal;
fn convert_fst_error(e: fst::Error) -> io::Error {
fn convert_fst_error(e: tantivy_fst::Error) -> io::Error {
io::Error::new(io::ErrorKind::Other, e)
}
@@ -19,7 +19,7 @@ fn convert_fst_error(e: fst::Error) -> io::Error {
///
/// Inserting must be done in the order of the `keys`.
pub struct TermDictionaryBuilder<W> {
fst_builder: fst::MapBuilder<W>,
fst_builder: tantivy_fst::MapBuilder<W>,
term_info_store_writer: TermInfoStoreWriter,
term_ord: u64,
}
@@ -30,7 +30,7 @@ where
{
/// Creates a new `TermDictionaryBuilder`
pub fn create(w: W, _field_type: &FieldType) -> io::Result<Self> {
let fst_builder = fst::MapBuilder::new(w).map_err(convert_fst_error)?;
let fst_builder = tantivy_fst::MapBuilder::new(w).map_err(convert_fst_error)?;
Ok(TermDictionaryBuilder {
fst_builder,
term_info_store_writer: TermInfoStoreWriter::new(),
@@ -87,17 +87,9 @@ where
}
}
fn open_fst_index(source: ReadOnlySource) -> fst::Map {
let fst = match source {
ReadOnlySource::Anonymous(data) => {
Fst::from_shared_bytes(data.data, data.start, data.len).expect("FST data is corrupted")
}
#[cfg(feature = "mmap")]
ReadOnlySource::Mmap(mmap_readonly) => {
Fst::from_mmap(mmap_readonly).expect("FST data is corrupted")
}
};
fst::Map::from(fst)
fn open_fst_index(source: ReadOnlySource) -> tantivy_fst::Map<ReadOnlySource> {
let fst = Fst::new(source).expect("FST data is corrupted");
tantivy_fst::Map::from(fst)
}
/// The term dictionary contains all of the terms in
@@ -107,7 +99,7 @@ fn open_fst_index(source: ReadOnlySource) -> fst::Map {
/// respective `TermOrdinal`. The `TermInfoStore` then makes it
/// possible to fetch the associated `TermInfo`.
pub struct TermDictionary {
fst_index: fst::Map,
fst_index: tantivy_fst::Map<ReadOnlySource>,
term_info_store: TermInfoStore,
}
@@ -136,7 +128,7 @@ impl TermDictionary {
.expect("Creating a TermDictionaryBuilder in a Vec<u8> should never fail")
.finish()
.expect("Writing in a Vec<u8> should never fail");
let source = ReadOnlySource::from(term_dictionary_data);
let source = ReadOnlySource::new(term_dictionary_data);
Self::from_source(&source)
}