mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2025-12-31 14:32:54 +00:00
Compare commits
16 Commits
common-cra
...
0.4.3
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4f5ce12a77 | ||
|
|
813efa4ab3 | ||
|
|
c3b6c1dc0b | ||
|
|
6f5e0ef6f4 | ||
|
|
7224f58895 | ||
|
|
49519c3f61 | ||
|
|
cb11b92505 | ||
|
|
7b2dcfbd91 | ||
|
|
d2e30e6681 | ||
|
|
ef109927b3 | ||
|
|
44e5c4dfd3 | ||
|
|
6f223253ea | ||
|
|
f7b0392bd5 | ||
|
|
442bc9a1b8 | ||
|
|
db7d784573 | ||
|
|
74d32e522a |
17
.travis.yml
17
.travis.yml
@@ -1,4 +1,5 @@
|
||||
language: rust
|
||||
cache: cargo
|
||||
rust:
|
||||
- nightly
|
||||
env:
|
||||
@@ -11,6 +12,7 @@ addons:
|
||||
apt:
|
||||
sources:
|
||||
- ubuntu-toolchain-r-test
|
||||
- kalakris-cmake
|
||||
packages:
|
||||
- gcc-4.8
|
||||
- g++-4.8
|
||||
@@ -18,18 +20,15 @@ addons:
|
||||
- libelf-dev
|
||||
- libdw-dev
|
||||
- binutils-dev
|
||||
- cmake
|
||||
before_script:
|
||||
- |
|
||||
pip install 'travis-cargo<0.2' --user &&
|
||||
export PATH=$HOME/.local/bin:$PATH
|
||||
cargo install cargo-travis || echo "cargo-travis already installed"
|
||||
export PATH=$HOME/.cargo/bin:$PATH
|
||||
script:
|
||||
- |
|
||||
travis-cargo build &&
|
||||
travis-cargo test &&
|
||||
travis-cargo bench
|
||||
- cargo build
|
||||
- cargo test
|
||||
- cargo run --example simple_search
|
||||
after_success:
|
||||
- bash ./script/build-doc.sh
|
||||
- cargo coveralls --exclude-pattern cpp/
|
||||
- travis-cargo doc-upload
|
||||
- if [[ "$TRAVIS_OS_NAME" == "linux" ]]; then travis-cargo coveralls --no-sudo --verify; fi
|
||||
- if [[ "$TRAVIS_OS_NAME" == "linux" ]]; then ./kcov/build/src/kcov --verify --coveralls-id=$TRAVIS_JOB_ID --include-path=`pwd`/src --exclude-path=`pwd`/cpp --exclude-pattern=/.cargo target/kcov target/debug/tantivy-*; fi
|
||||
|
||||
@@ -14,11 +14,10 @@ keywords = ["search", "information", "retrieval"]
|
||||
|
||||
[dependencies]
|
||||
byteorder = "1.0"
|
||||
memmap = "0.4"
|
||||
lazy_static = "0.2.1"
|
||||
tinysegmenter = "0.1.0"
|
||||
regex = "0.2"
|
||||
fst = "0.1.37"
|
||||
fst = "0.2"
|
||||
atomicwrites = "0.1.3"
|
||||
tempfile = "2.1"
|
||||
log = "0.3.6"
|
||||
|
||||
@@ -20,10 +20,7 @@ fn main() {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
fn run_example(index_path: &Path) -> tantivy::Result<()> {
|
||||
|
||||
|
||||
// # Defining the schema
|
||||
//
|
||||
// The Tantivy index requires a very strict schema.
|
||||
@@ -31,7 +28,6 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> {
|
||||
// and for each field, its type and "the way it should
|
||||
// be indexed".
|
||||
|
||||
|
||||
// first we need to define a schema ...
|
||||
let mut schema_builder = SchemaBuilder::default();
|
||||
|
||||
@@ -62,8 +58,6 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> {
|
||||
|
||||
let schema = schema_builder.build();
|
||||
|
||||
|
||||
|
||||
// # Indexing documents
|
||||
//
|
||||
// Let's create a brand new index.
|
||||
@@ -72,7 +66,6 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> {
|
||||
// with our schema in the directory.
|
||||
let index = Index::create(index_path, schema.clone())?;
|
||||
|
||||
|
||||
// To insert document we need an index writer.
|
||||
// There must be only one writer at a time.
|
||||
// This single `IndexWriter` is already
|
||||
@@ -85,7 +78,6 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> {
|
||||
// Let's index our documents!
|
||||
// We first need a handle on the title and the body field.
|
||||
|
||||
|
||||
// ### Create a document "manually".
|
||||
//
|
||||
// We can create a document manually, by setting the fields
|
||||
@@ -98,7 +90,7 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> {
|
||||
old_man_doc.add_text(
|
||||
body,
|
||||
"He was an old man who fished alone in a skiff in the Gulf Stream and \
|
||||
he had gone eighty-four days now without taking a fish.",
|
||||
he had gone eighty-four days now without taking a fish.",
|
||||
);
|
||||
|
||||
// ... and add it to the `IndexWriter`.
|
||||
@@ -145,7 +137,6 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> {
|
||||
// Indexing 5 million articles of the English wikipedia takes
|
||||
// around 4 minutes on my computer!
|
||||
|
||||
|
||||
// ### Committing
|
||||
//
|
||||
// At this point our documents are not searchable.
|
||||
@@ -167,7 +158,6 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> {
|
||||
// tantivy behaves as if has rolled back to its last
|
||||
// commit.
|
||||
|
||||
|
||||
// # Searching
|
||||
//
|
||||
// Let's search our index. Start by reloading
|
||||
@@ -192,7 +182,6 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> {
|
||||
// A ticket has been opened regarding this problem.
|
||||
let query = query_parser.parse_query("sea whale")?;
|
||||
|
||||
|
||||
// A query defines a set of documents, as
|
||||
// well as the way they should be scored.
|
||||
//
|
||||
|
||||
@@ -133,8 +133,7 @@ where
|
||||
addr + 8 <= data.len(),
|
||||
"The fast field field should have been padded with 7 bytes."
|
||||
);
|
||||
let val_unshifted_unmasked: u64 =
|
||||
unsafe { *(data[addr..].as_ptr() as *const u64) };
|
||||
let val_unshifted_unmasked: u64 = unsafe { *(data[addr..].as_ptr() as *const u64) };
|
||||
let val_shifted = (val_unshifted_unmasked >> bit_shift) as u64;
|
||||
(val_shifted & mask)
|
||||
} else {
|
||||
@@ -165,8 +164,7 @@ where
|
||||
for output_val in output.iter_mut() {
|
||||
let addr = addr_in_bits >> 3;
|
||||
let bit_shift = addr_in_bits & 7;
|
||||
let val_unshifted_unmasked: u64 =
|
||||
unsafe { *(data[addr..].as_ptr() as *const u64) };
|
||||
let val_unshifted_unmasked: u64 = unsafe { *(data[addr..].as_ptr() as *const u64) };
|
||||
let val_shifted = (val_unshifted_unmasked >> bit_shift) as u64;
|
||||
*output_val = val_shifted & mask;
|
||||
addr_in_bits += num_bits;
|
||||
|
||||
@@ -25,9 +25,7 @@ fn compress_sorted(vals: &[u32], output: &mut [u8], offset: u32) -> usize {
|
||||
}
|
||||
|
||||
fn uncompress_sorted(compressed_data: &[u8], output: &mut [u32], offset: u32) -> usize {
|
||||
unsafe {
|
||||
simdcomp::uncompress_sorted(compressed_data.as_ptr(), output.as_mut_ptr(), offset)
|
||||
}
|
||||
unsafe { simdcomp::uncompress_sorted(compressed_data.as_ptr(), output.as_mut_ptr(), offset) }
|
||||
}
|
||||
|
||||
fn compress_unsorted(vals: &[u32], output: &mut [u8]) -> usize {
|
||||
|
||||
@@ -18,6 +18,7 @@ use core::SegmentMeta;
|
||||
use super::pool::LeasedItem;
|
||||
use std::path::Path;
|
||||
use core::IndexMeta;
|
||||
use indexer::DirectoryLock;
|
||||
use IndexWriter;
|
||||
use directory::ManagedDirectory;
|
||||
use core::META_FILEPATH;
|
||||
@@ -113,12 +114,9 @@ impl Index {
|
||||
Index::create_from_metas(directory, &metas)
|
||||
}
|
||||
|
||||
/// Returns the index opstamp.
|
||||
///
|
||||
/// The opstamp is the number of documents that have been added
|
||||
/// from the beginning of time, and until the moment of the last commit.
|
||||
pub fn opstamp(&self) -> u64 {
|
||||
load_metas(self.directory()).unwrap().opstamp
|
||||
/// Reads the index meta file from the directory.
|
||||
pub fn load_metas(&self) -> Result<IndexMeta> {
|
||||
load_metas(self.directory())
|
||||
}
|
||||
|
||||
/// Open a new index writer. Attempts to acquire a lockfile.
|
||||
@@ -141,7 +139,8 @@ impl Index {
|
||||
num_threads: usize,
|
||||
heap_size_in_bytes: usize,
|
||||
) -> Result<IndexWriter> {
|
||||
open_index_writer(self, num_threads, heap_size_in_bytes)
|
||||
let directory_lock = DirectoryLock::lock(self.directory().box_clone())?;
|
||||
open_index_writer(self, num_threads, heap_size_in_bytes, directory_lock)
|
||||
}
|
||||
|
||||
/// Creates a multithreaded writer
|
||||
@@ -194,7 +193,7 @@ impl Index {
|
||||
/// Reads the meta.json and returns the list of
|
||||
/// `SegmentMeta` from the last commit.
|
||||
pub fn searchable_segment_metas(&self) -> Result<Vec<SegmentMeta>> {
|
||||
Ok(load_metas(self.directory())?.segments)
|
||||
Ok(self.load_metas()?.segments)
|
||||
}
|
||||
|
||||
/// Returns the list of segment ids that are searchable.
|
||||
|
||||
@@ -14,6 +14,7 @@ pub struct IndexMeta {
|
||||
pub segments: Vec<SegmentMeta>,
|
||||
pub schema: Schema,
|
||||
pub opstamp: u64,
|
||||
#[serde(skip_serializing_if = "Option::is_none")] pub payload: Option<String>,
|
||||
}
|
||||
|
||||
impl IndexMeta {
|
||||
@@ -22,6 +23,32 @@ impl IndexMeta {
|
||||
segments: vec![],
|
||||
schema,
|
||||
opstamp: 0u64,
|
||||
payload: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use serde_json;
|
||||
use super::IndexMeta;
|
||||
use schema::{SchemaBuilder, TEXT};
|
||||
|
||||
#[test]
|
||||
fn test_serialize_metas() {
|
||||
let schema = {
|
||||
let mut schema_builder = SchemaBuilder::new();
|
||||
schema_builder.add_text_field("text", TEXT);
|
||||
schema_builder.build()
|
||||
};
|
||||
let index_metas = IndexMeta {
|
||||
segments: Vec::new(),
|
||||
schema: schema,
|
||||
opstamp: 0u64,
|
||||
payload: None,
|
||||
};
|
||||
let json = serde_json::ser::to_string(&index_metas).expect("serialization failed");
|
||||
assert_eq!(json, r#"{"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","tokenizer":"default"},"stored":false}}],"opstamp":0}"#);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -59,10 +59,7 @@ mod murmurhash2 {
|
||||
/// Returns (the heap size in bytes, the hash table size in number of bits)
|
||||
pub(crate) fn split_memory(per_thread_memory_budget: usize) -> (usize, usize) {
|
||||
let table_size_limit: usize = per_thread_memory_budget / 3;
|
||||
let compute_table_size = |num_bits: usize| {
|
||||
let table_size: usize = (1 << num_bits) * mem::size_of::<KeyValue>();
|
||||
table_size * mem::size_of::<KeyValue>()
|
||||
};
|
||||
let compute_table_size = |num_bits: usize| (1 << num_bits) * mem::size_of::<KeyValue>();
|
||||
let table_num_bits: usize = (1..)
|
||||
.into_iter()
|
||||
.take_while(|num_bits: &usize| compute_table_size(*num_bits) < table_size_limit)
|
||||
@@ -220,9 +217,9 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_hashmap_size() {
|
||||
assert_eq!(split_memory(100_000), (67232, 9));
|
||||
assert_eq!(split_memory(1_000_000), (737856, 12));
|
||||
assert_eq!(split_memory(10_000_000), (7902848, 15));
|
||||
assert_eq!(split_memory(100_000), (67232, 12));
|
||||
assert_eq!(split_memory(1_000_000), (737856, 15));
|
||||
assert_eq!(split_memory(10_000_000), (7902848, 18));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -6,7 +6,6 @@ use directory::ReadOnlySource;
|
||||
use directory::shared_vec_slice::SharedVecSlice;
|
||||
use directory::WritePtr;
|
||||
use fst::raw::MmapReadOnly;
|
||||
use memmap::{Mmap, Protection};
|
||||
use std::collections::hash_map::Entry as HashMapEntry;
|
||||
use std::collections::HashMap;
|
||||
use std::convert::From;
|
||||
@@ -15,35 +14,39 @@ use std::fs::{self, File};
|
||||
use std::fs::OpenOptions;
|
||||
use std::io::{self, Seek, SeekFrom};
|
||||
use std::io::{BufWriter, Read, Write};
|
||||
use std::mem;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::result;
|
||||
use std::sync::Arc;
|
||||
use std::sync::RwLock;
|
||||
use std::sync::Weak;
|
||||
use tempdir::TempDir;
|
||||
|
||||
fn open_mmap(full_path: &Path) -> result::Result<Option<Arc<Mmap>>, OpenReadError> {
|
||||
let file = File::open(&full_path).map_err(|e| {
|
||||
if e.kind() == io::ErrorKind::NotFound {
|
||||
OpenReadError::FileDoesNotExist(full_path.to_owned())
|
||||
} else {
|
||||
OpenReadError::IOError(IOError::with_path(full_path.to_owned(), e))
|
||||
}
|
||||
})?;
|
||||
|
||||
/// Returns None iff the file exists, can be read, but is empty (and hence
|
||||
/// cannot be mmapped).
|
||||
///
|
||||
fn open_mmap(full_path: &PathBuf) -> result::Result<Option<MmapReadOnly>, OpenReadError> {
|
||||
let file = File::open(&full_path)
|
||||
.map_err(|e| {
|
||||
if e.kind() == io::ErrorKind::NotFound {
|
||||
OpenReadError::FileDoesNotExist(full_path.clone())
|
||||
} else {
|
||||
OpenReadError::IOError(IOError::with_path(full_path.to_owned(), e))
|
||||
}
|
||||
})?;
|
||||
|
||||
let meta_data = file.metadata()
|
||||
.map_err(|e| IOError::with_path(full_path.to_owned(), e))?;
|
||||
if meta_data.len() == 0 {
|
||||
// if the file size is 0, it will not be possible
|
||||
// to mmap the file, so we return an anonymous mmap_cache
|
||||
// to mmap the file, so we return None
|
||||
// instead.
|
||||
return Ok(None);
|
||||
}
|
||||
match Mmap::open(&file, Protection::Read) {
|
||||
Ok(mmap) => Ok(Some(Arc::new(mmap))),
|
||||
Err(e) => Err(IOError::with_path(full_path.to_owned(), e))?,
|
||||
}
|
||||
MmapReadOnly::open(&file)
|
||||
.map(Some)
|
||||
.map_err(|e| {
|
||||
From::from(IOError::with_path(full_path.to_owned(), e))
|
||||
})
|
||||
}
|
||||
|
||||
#[derive(Default, Clone, Debug, Serialize, Deserialize)]
|
||||
@@ -52,10 +55,7 @@ pub struct CacheCounters {
|
||||
pub hit: usize,
|
||||
// Number of time tantivy had to call `mmap`
|
||||
// as no entry was in the cache.
|
||||
pub miss_empty: usize,
|
||||
// Number of time tantivy had to call `mmap`
|
||||
// as the entry in the cache was evinced.
|
||||
pub miss_weak: usize,
|
||||
pub miss: usize,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
@@ -66,38 +66,26 @@ pub struct CacheInfo {
|
||||
|
||||
struct MmapCache {
|
||||
counters: CacheCounters,
|
||||
cache: HashMap<PathBuf, Weak<Mmap>>,
|
||||
purge_weak_limit: usize,
|
||||
cache: HashMap<PathBuf, MmapReadOnly>,
|
||||
}
|
||||
|
||||
const STARTING_PURGE_WEAK_LIMIT: usize = 1_000;
|
||||
|
||||
impl Default for MmapCache {
|
||||
fn default() -> MmapCache {
|
||||
MmapCache {
|
||||
counters: CacheCounters::default(),
|
||||
cache: HashMap::new(),
|
||||
purge_weak_limit: STARTING_PURGE_WEAK_LIMIT,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl MmapCache {
|
||||
fn cleanup(&mut self) {
|
||||
let previous_cache_size = self.cache.len();
|
||||
let mut new_cache = HashMap::new();
|
||||
mem::swap(&mut new_cache, &mut self.cache);
|
||||
self.cache = new_cache
|
||||
.into_iter()
|
||||
.filter(|&(_, ref weak_ref)| weak_ref.upgrade().is_some())
|
||||
.collect();
|
||||
if self.cache.len() == previous_cache_size {
|
||||
self.purge_weak_limit *= 2;
|
||||
}
|
||||
|
||||
/// Removes a `MmapReadOnly` entry from the mmap cache.
|
||||
fn discard_from_cache(&mut self, full_path: &Path) -> bool {
|
||||
self.cache.remove(full_path).is_some()
|
||||
}
|
||||
|
||||
fn get_info(&mut self) -> CacheInfo {
|
||||
self.cleanup();
|
||||
let paths: Vec<PathBuf> = self.cache.keys().cloned().collect();
|
||||
CacheInfo {
|
||||
counters: self.counters.clone(),
|
||||
@@ -105,38 +93,23 @@ impl MmapCache {
|
||||
}
|
||||
}
|
||||
|
||||
fn get_mmap(&mut self, full_path: &PathBuf) -> Result<Option<Arc<Mmap>>, OpenReadError> {
|
||||
// if we exceed this limit, then we go through the weak
|
||||
// and remove those that are obsolete.
|
||||
if self.cache.len() > self.purge_weak_limit {
|
||||
self.cleanup();
|
||||
}
|
||||
fn get_mmap(&mut self, full_path: PathBuf) -> Result<Option<MmapReadOnly>, OpenReadError> {
|
||||
Ok(match self.cache.entry(full_path.clone()) {
|
||||
HashMapEntry::Occupied(mut occupied_entry) => {
|
||||
if let Some(mmap_arc) = occupied_entry.get().upgrade() {
|
||||
self.counters.hit += 1;
|
||||
Some(Arc::clone(&mmap_arc))
|
||||
} else {
|
||||
// The entry exists but the weak ref has been destroyed.
|
||||
self.counters.miss_weak += 1;
|
||||
if let Some(mmap_arc) = open_mmap(full_path)? {
|
||||
occupied_entry.insert(Arc::downgrade(&mmap_arc));
|
||||
Some(mmap_arc)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
HashMapEntry::Vacant(vacant_entry) => {
|
||||
self.counters.miss_empty += 1;
|
||||
if let Some(mmap_arc) = open_mmap(full_path)? {
|
||||
vacant_entry.insert(Arc::downgrade(&mmap_arc));
|
||||
Some(mmap_arc)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
})
|
||||
HashMapEntry::Occupied(occupied_entry) => {
|
||||
let mmap = occupied_entry.get();
|
||||
self.counters.hit += 1;
|
||||
Some(mmap.clone())
|
||||
}
|
||||
HashMapEntry::Vacant(vacant_entry) => {
|
||||
self.counters.miss += 1;
|
||||
if let Some(mmap) = open_mmap(&full_path)? {
|
||||
vacant_entry.insert(mmap.clone());
|
||||
Some(mmap)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -228,6 +201,7 @@ impl MmapDirectory {
|
||||
fd.sync_all()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns some statistical information
|
||||
/// about the Mmap cache.
|
||||
///
|
||||
@@ -283,10 +257,9 @@ impl Directory for MmapDirectory {
|
||||
})?;
|
||||
|
||||
Ok(mmap_cache
|
||||
.get_mmap(&full_path)?
|
||||
.map(MmapReadOnly::from)
|
||||
.map(ReadOnlySource::Mmap)
|
||||
.unwrap_or_else(|| ReadOnlySource::Anonymous(SharedVecSlice::empty())))
|
||||
.get_mmap(full_path)?
|
||||
.map(ReadOnlySource::Mmap)
|
||||
.unwrap_or_else(|| ReadOnlySource::Anonymous(SharedVecSlice::empty())))
|
||||
}
|
||||
|
||||
fn open_write(&mut self, path: &Path) -> Result<WritePtr, OpenWriteError> {
|
||||
@@ -319,17 +292,22 @@ impl Directory for MmapDirectory {
|
||||
Ok(BufWriter::new(Box::new(writer)))
|
||||
}
|
||||
|
||||
|
||||
/// Any entry associated to the path in the mmap will be
|
||||
/// removed before the file is deleted.
|
||||
fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
|
||||
debug!("Deleting file {:?}", path);
|
||||
let full_path = self.resolve_path(path);
|
||||
let mut mmap_cache = self.mmap_cache.write().map_err(|_| {
|
||||
let msg = format!(
|
||||
"Failed to acquired write lock \
|
||||
on mmap cache while deleting {:?}",
|
||||
path
|
||||
);
|
||||
IOError::with_path(path.to_owned(), make_io_err(msg))
|
||||
})?;
|
||||
let mut mmap_cache = self.mmap_cache
|
||||
.write()
|
||||
.map_err(|_| {
|
||||
let msg = format!("Failed to acquired write lock \
|
||||
on mmap cache while deleting {:?}",
|
||||
path);
|
||||
IOError::with_path(path.to_owned(), make_io_err(msg))
|
||||
})?;
|
||||
mmap_cache.discard_from_cache(path);
|
||||
|
||||
// Removing the entry in the MMap cache.
|
||||
// The munmap will appear on Drop,
|
||||
// when the last reference is gone.
|
||||
@@ -415,7 +393,8 @@ mod tests {
|
||||
// here we test if the cache releases
|
||||
// mmaps correctly.
|
||||
let mut mmap_directory = MmapDirectory::create_from_tempdir().unwrap();
|
||||
let paths: Vec<PathBuf> = (0..10)
|
||||
let num_paths = 10;
|
||||
let paths: Vec<PathBuf> = (0..num_paths)
|
||||
.map(|i| PathBuf::from(&*format!("file_{}", i)))
|
||||
.collect();
|
||||
{
|
||||
@@ -426,49 +405,22 @@ mod tests {
|
||||
}
|
||||
}
|
||||
{
|
||||
for path in &paths {
|
||||
{
|
||||
let _r = mmap_directory.open_read(path).unwrap();
|
||||
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 1);
|
||||
}
|
||||
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 0);
|
||||
}
|
||||
}
|
||||
assert_eq!(mmap_directory.get_cache_info().counters.miss_empty, 10);
|
||||
|
||||
{
|
||||
// test weak miss
|
||||
// the first pass create the weak refs.
|
||||
for path in &paths {
|
||||
let _r = mmap_directory.open_read(path).unwrap();
|
||||
}
|
||||
// ... the second hits the weak refs.
|
||||
for path in &paths {
|
||||
let _r = mmap_directory.open_read(path).unwrap();
|
||||
}
|
||||
let cache_info = mmap_directory.get_cache_info();
|
||||
assert_eq!(cache_info.counters.miss_empty, 20);
|
||||
assert_eq!(cache_info.counters.miss_weak, 10);
|
||||
}
|
||||
|
||||
{
|
||||
let mut saved_readmmaps = vec![];
|
||||
// Keeps reference alive
|
||||
for (i, path) in paths.iter().enumerate() {
|
||||
let r = mmap_directory.open_read(path).unwrap();
|
||||
saved_readmmaps.push(r);
|
||||
let _r = mmap_directory.open_read(path).unwrap();
|
||||
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), i + 1);
|
||||
}
|
||||
let cache_info = mmap_directory.get_cache_info();
|
||||
assert_eq!(cache_info.counters.miss_empty, 30);
|
||||
assert_eq!(cache_info.counters.miss_weak, 10);
|
||||
assert_eq!(cache_info.mmapped.len(), 10);
|
||||
|
||||
for saved_readmmap in saved_readmmaps {
|
||||
assert_eq!(saved_readmmap.as_slice(), content);
|
||||
for path in paths.iter() {
|
||||
let _r = mmap_directory.open_read(path).unwrap();
|
||||
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), num_paths);
|
||||
}
|
||||
for (i, path) in paths.iter().enumerate() {
|
||||
println!("delete paths {:?}", path);
|
||||
mmap_directory.delete(path).unwrap();
|
||||
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), num_paths - i - 1);
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(mmap_directory.get_cache_info().counters.hit, 10);
|
||||
assert_eq!(mmap_directory.get_cache_info().counters.miss, 10);
|
||||
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 0);
|
||||
}
|
||||
|
||||
|
||||
@@ -10,7 +10,6 @@ use indexer::stamper::Stamper;
|
||||
use datastruct::stacker::Heap;
|
||||
use directory::FileProtection;
|
||||
use error::{Error, ErrorKind, Result, ResultExt};
|
||||
use Directory;
|
||||
use fastfield::write_delete_bitset;
|
||||
use indexer::delete_queue::{DeleteCursor, DeleteQueue};
|
||||
use futures::Canceled;
|
||||
@@ -29,9 +28,10 @@ use schema::Term;
|
||||
use std::mem;
|
||||
use std::mem::swap;
|
||||
use std::thread::JoinHandle;
|
||||
use super::directory_lock::DirectoryLock;
|
||||
use indexer::DirectoryLock;
|
||||
use super::operation::AddOperation;
|
||||
use super::segment_updater::SegmentUpdater;
|
||||
use super::PreparedCommit;
|
||||
use std::thread;
|
||||
|
||||
// Size of the margin for the heap. A segment is closed when the remaining memory
|
||||
@@ -57,7 +57,7 @@ type DocumentReceiver = chan::Receiver<AddOperation>;
|
||||
pub struct IndexWriter {
|
||||
// the lock is just used to bind the
|
||||
// lifetime of the lock with that of the IndexWriter.
|
||||
_directory_lock: DirectoryLock,
|
||||
_directory_lock: Option<DirectoryLock>,
|
||||
|
||||
index: Index,
|
||||
|
||||
@@ -104,6 +104,7 @@ pub fn open_index_writer(
|
||||
index: &Index,
|
||||
num_threads: usize,
|
||||
heap_size_in_bytes_per_thread: usize,
|
||||
directory_lock: DirectoryLock,
|
||||
) -> Result<IndexWriter> {
|
||||
if heap_size_in_bytes_per_thread <= HEAP_SIZE_LIMIT as usize {
|
||||
panic!(format!(
|
||||
@@ -111,15 +112,12 @@ pub fn open_index_writer(
|
||||
HEAP_SIZE_LIMIT
|
||||
));
|
||||
}
|
||||
|
||||
let directory_lock = DirectoryLock::lock(index.directory().box_clone())?;
|
||||
|
||||
let (document_sender, document_receiver): (DocumentSender, DocumentReceiver) =
|
||||
chan::sync(PIPELINE_MAX_SIZE_IN_DOCS);
|
||||
|
||||
let delete_queue = DeleteQueue::new();
|
||||
|
||||
let current_opstamp = index.opstamp();
|
||||
let current_opstamp = index.load_metas()?.opstamp;
|
||||
|
||||
let stamper = Stamper::new(current_opstamp);
|
||||
|
||||
@@ -127,7 +125,7 @@ pub fn open_index_writer(
|
||||
SegmentUpdater::new(index.clone(), stamper.clone(), &delete_queue.cursor())?;
|
||||
|
||||
let mut index_writer = IndexWriter {
|
||||
_directory_lock: directory_lock,
|
||||
_directory_lock: Some(directory_lock),
|
||||
|
||||
heap_size_in_bytes_per_thread,
|
||||
index: index.clone(),
|
||||
@@ -286,6 +284,11 @@ fn index_documents(
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if !segment_updater.is_alive() {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let num_docs = segment_writer.max_doc();
|
||||
|
||||
// this is ensured by the call to peek before starting
|
||||
@@ -476,41 +479,66 @@ impl IndexWriter {
|
||||
/// state as it was after the last commit.
|
||||
///
|
||||
/// The opstamp at the last commit is returned.
|
||||
pub fn rollback(mut self) -> Result<IndexWriter> {
|
||||
pub fn rollback(&mut self) -> Result<()> {
|
||||
info!("Rolling back to opstamp {}", self.committed_opstamp);
|
||||
|
||||
// marks the segment updater as killed. From now on, all
|
||||
// segment updates will be ignored.
|
||||
self.segment_updater.kill();
|
||||
|
||||
let document_receiver = self.document_receiver.clone();
|
||||
|
||||
// take the directory lock to create a new index_writer.
|
||||
let directory_lock = self._directory_lock
|
||||
.take()
|
||||
.expect("The IndexWriter does not have any lock. This is a bug, please report.");
|
||||
|
||||
let new_index_writer: IndexWriter = open_index_writer(
|
||||
&self.index,
|
||||
self.num_threads,
|
||||
self.heap_size_in_bytes_per_thread,
|
||||
directory_lock,
|
||||
)?;
|
||||
|
||||
// the current `self` is dropped right away because of this call.
|
||||
//
|
||||
// This will drop the document queue, and the thread
|
||||
// should terminate.
|
||||
mem::replace(self, new_index_writer);
|
||||
|
||||
// Drains the document receiver pipeline :
|
||||
// Workers don't need to index the pending documents.
|
||||
let receiver_clone = self.document_receiver.clone();
|
||||
let index = self.index.clone();
|
||||
let num_threads = self.num_threads;
|
||||
let heap_size_in_bytes_per_thread = self.heap_size_in_bytes_per_thread;
|
||||
drop(self);
|
||||
for _ in receiver_clone {}
|
||||
//
|
||||
// This will reach an end as the only document_sender
|
||||
// was dropped with the index_writer.
|
||||
for _ in document_receiver.clone() {}
|
||||
|
||||
let index_writer = open_index_writer(&index, num_threads, heap_size_in_bytes_per_thread)?;
|
||||
|
||||
Ok(index_writer)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Commits all of the pending changes
|
||||
/// Prepares a commit.
|
||||
///
|
||||
/// A call to commit blocks.
|
||||
/// After it returns, all of the document that
|
||||
/// were added since the last commit are published
|
||||
/// and persisted.
|
||||
/// Calling `prepare_commit()` will cut the indexing
|
||||
/// queue. All pending documents will be sent to the
|
||||
/// indexing workers. They will then terminate, regardless
|
||||
/// of the size of their current segment and flush their
|
||||
/// work on disk.
|
||||
///
|
||||
/// In case of a crash or an hardware failure (as
|
||||
/// long as the hard disk is spared), it will be possible
|
||||
/// to resume indexing from this point.
|
||||
/// Once a commit is "prepared", you can either
|
||||
/// call
|
||||
/// * `.commit()`: to accept this commit
|
||||
/// * `.abort()`: to cancel this commit.
|
||||
///
|
||||
/// Commit returns the `opstamp` of the last document
|
||||
/// that made it in the commit.
|
||||
/// In the current implementation, `PreparedCommit` borrows
|
||||
/// the `IndexWriter` mutably so we are guaranteed that no new
|
||||
/// document can be added as long as it is committed or is
|
||||
/// dropped.
|
||||
///
|
||||
pub fn commit(&mut self) -> Result<u64> {
|
||||
// here, because we join all of the worker threads,
|
||||
/// It is also possible to add a payload to the `commit`
|
||||
/// using this API.
|
||||
/// See [`PreparedCommit::set_payload()`](PreparedCommit.html)
|
||||
pub fn prepare_commit(&mut self) -> Result<PreparedCommit> {
|
||||
// Here, because we join all of the worker threads,
|
||||
// all of the segment update for this commit have been
|
||||
// sent.
|
||||
//
|
||||
@@ -520,8 +548,7 @@ impl IndexWriter {
|
||||
|
||||
// This will move uncommitted segments to the state of
|
||||
// committed segments.
|
||||
self.committed_opstamp = self.stamper.stamp();
|
||||
info!("committing {}", self.committed_opstamp);
|
||||
info!("Preparing commit");
|
||||
|
||||
// this will drop the current document channel
|
||||
// and recreate a new one channels.
|
||||
@@ -543,10 +570,32 @@ impl IndexWriter {
|
||||
self.add_indexing_worker()?;
|
||||
}
|
||||
|
||||
// wait for the segment update thread to have processed the info
|
||||
self.segment_updater.commit(self.committed_opstamp)?;
|
||||
let commit_opstamp = self.stamper.stamp();
|
||||
let prepared_commit = PreparedCommit::new(self, commit_opstamp);
|
||||
info!("Prepared commit {}", commit_opstamp);
|
||||
Ok(prepared_commit)
|
||||
}
|
||||
|
||||
Ok(self.committed_opstamp)
|
||||
/// Commits all of the pending changes
|
||||
///
|
||||
/// A call to commit blocks.
|
||||
/// After it returns, all of the document that
|
||||
/// were added since the last commit are published
|
||||
/// and persisted.
|
||||
///
|
||||
/// In case of a crash or an hardware failure (as
|
||||
/// long as the hard disk is spared), it will be possible
|
||||
/// to resume indexing from this point.
|
||||
///
|
||||
/// Commit returns the `opstamp` of the last document
|
||||
/// that made it in the commit.
|
||||
///
|
||||
pub fn commit(&mut self) -> Result<u64> {
|
||||
self.prepare_commit()?.commit()
|
||||
}
|
||||
|
||||
pub(crate) fn segment_updater(&self) -> &SegmentUpdater {
|
||||
&self.segment_updater
|
||||
}
|
||||
|
||||
/// Delete all documents containing a given term.
|
||||
@@ -664,8 +713,7 @@ mod tests {
|
||||
doc.add_text(text_field, "a");
|
||||
index_writer.add_document(doc);
|
||||
}
|
||||
|
||||
index_writer = index_writer.rollback().unwrap();
|
||||
index_writer.rollback().unwrap();
|
||||
|
||||
assert_eq!(index_writer.commit_opstamp(), 0u64);
|
||||
assert_eq!(num_docs_containing("a"), 0);
|
||||
@@ -728,4 +776,78 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prepare_with_commit_message() {
|
||||
let _ = env_logger::init();
|
||||
let mut schema_builder = schema::SchemaBuilder::default();
|
||||
let text_field = schema_builder.add_text_field("text", schema::TEXT);
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
|
||||
{
|
||||
// writing the segment
|
||||
let mut index_writer = index.writer_with_num_threads(4, 4 * 30_000_000).unwrap();
|
||||
// create 8 segments with 100 tiny docs
|
||||
for _doc in 0..100 {
|
||||
index_writer.add_document(doc!(text_field => "a"));
|
||||
}
|
||||
{
|
||||
let mut prepared_commit = index_writer.prepare_commit().expect("commit failed");
|
||||
prepared_commit.set_payload("first commit");
|
||||
assert_eq!(prepared_commit.opstamp(), 100);
|
||||
prepared_commit.commit().expect("commit failed");
|
||||
}
|
||||
{
|
||||
let metas = index.load_metas().unwrap();
|
||||
assert_eq!(metas.payload.unwrap(), "first commit");
|
||||
}
|
||||
for _doc in 0..100 {
|
||||
index_writer.add_document(doc!(text_field => "a"));
|
||||
}
|
||||
index_writer.commit().unwrap();
|
||||
{
|
||||
let metas = index.load_metas().unwrap();
|
||||
assert!(metas.payload.is_none());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prepare_but_rollback() {
|
||||
let _ = env_logger::init();
|
||||
let mut schema_builder = schema::SchemaBuilder::default();
|
||||
let text_field = schema_builder.add_text_field("text", schema::TEXT);
|
||||
let index = Index::create_in_ram(schema_builder.build());
|
||||
|
||||
{
|
||||
// writing the segment
|
||||
let mut index_writer = index.writer_with_num_threads(4, 4 * 30_000_000).unwrap();
|
||||
// create 8 segments with 100 tiny docs
|
||||
for _doc in 0..100 {
|
||||
index_writer.add_document(doc!(text_field => "a"));
|
||||
}
|
||||
{
|
||||
let mut prepared_commit = index_writer.prepare_commit().expect("commit failed");
|
||||
prepared_commit.set_payload("first commit");
|
||||
assert_eq!(prepared_commit.opstamp(), 100);
|
||||
prepared_commit.abort().expect("commit failed");
|
||||
}
|
||||
{
|
||||
let metas = index.load_metas().unwrap();
|
||||
assert!(metas.payload.is_none());
|
||||
}
|
||||
for _doc in 0..100 {
|
||||
index_writer.add_document(doc!(text_field => "b"));
|
||||
}
|
||||
index_writer.commit().unwrap();
|
||||
}
|
||||
index.load_searchers().unwrap();
|
||||
let num_docs_containing = |s: &str| {
|
||||
let searcher = index.searcher();
|
||||
let term_a = Term::from_field_text(text_field, s);
|
||||
searcher.doc_freq(&term_a)
|
||||
};
|
||||
assert_eq!(num_docs_containing("a"), 0);
|
||||
assert_eq!(num_docs_containing("b"), 100);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -335,12 +335,16 @@ impl IndexMerger {
|
||||
fn write_storable_fields(&self, store_writer: &mut StoreWriter) -> Result<()> {
|
||||
for reader in &self.readers {
|
||||
let store_reader = reader.get_store_reader();
|
||||
for doc_id in 0..reader.max_doc() {
|
||||
if !reader.is_deleted(doc_id) {
|
||||
let doc = store_reader.get(doc_id)?;
|
||||
let field_values: Vec<&FieldValue> = doc.field_values().iter().collect();
|
||||
store_writer.store(&field_values)?;
|
||||
if reader.num_deleted_docs() > 0 {
|
||||
for doc_id in 0..reader.max_doc() {
|
||||
if !reader.is_deleted(doc_id) {
|
||||
let doc = store_reader.get(doc_id)?;
|
||||
let field_values: Vec<&FieldValue> = doc.field_values().iter().collect();
|
||||
store_writer.store(&field_values)?;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
store_writer.stack(store_reader)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
||||
@@ -13,7 +13,9 @@ mod segment_entry;
|
||||
mod doc_opstamp_mapping;
|
||||
pub mod operation;
|
||||
mod stamper;
|
||||
mod prepared_commit;
|
||||
|
||||
pub use self::prepared_commit::PreparedCommit;
|
||||
pub use self::segment_entry::{SegmentEntry, SegmentState};
|
||||
pub use self::segment_serializer::SegmentSerializer;
|
||||
pub use self::segment_writer::SegmentWriter;
|
||||
@@ -21,6 +23,7 @@ pub use self::index_writer::IndexWriter;
|
||||
pub use self::log_merge_policy::LogMergePolicy;
|
||||
pub use self::merge_policy::{MergeCandidate, MergePolicy, NoMergePolicy};
|
||||
pub use self::segment_manager::SegmentManager;
|
||||
pub(crate) use self::directory_lock::DirectoryLock;
|
||||
|
||||
/// Alias for the default merge policy, which is the `LogMergePolicy`.
|
||||
pub type DefaultMergePolicy = LogMergePolicy;
|
||||
|
||||
39
src/indexer/prepared_commit.rs
Normal file
39
src/indexer/prepared_commit.rs
Normal file
@@ -0,0 +1,39 @@
|
||||
use Result;
|
||||
use super::IndexWriter;
|
||||
|
||||
/// A prepared commit
|
||||
pub struct PreparedCommit<'a> {
|
||||
index_writer: &'a mut IndexWriter,
|
||||
payload: Option<String>,
|
||||
opstamp: u64,
|
||||
}
|
||||
|
||||
impl<'a> PreparedCommit<'a> {
|
||||
pub(crate) fn new(index_writer: &'a mut IndexWriter, opstamp: u64) -> PreparedCommit {
|
||||
PreparedCommit {
|
||||
index_writer: index_writer,
|
||||
payload: None,
|
||||
opstamp: opstamp,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn opstamp(&self) -> u64 {
|
||||
self.opstamp
|
||||
}
|
||||
|
||||
pub fn set_payload(&mut self, payload: &str) {
|
||||
self.payload = Some(payload.to_string())
|
||||
}
|
||||
|
||||
pub fn abort(self) -> Result<()> {
|
||||
self.index_writer.rollback()
|
||||
}
|
||||
|
||||
pub fn commit(self) -> Result<u64> {
|
||||
info!("committing {}", self.opstamp);
|
||||
self.index_writer
|
||||
.segment_updater()
|
||||
.commit(self.opstamp, self.payload)?;
|
||||
Ok(self.opstamp)
|
||||
}
|
||||
}
|
||||
@@ -46,7 +46,7 @@ use super::segment_manager::{get_mergeable_segments, SegmentManager};
|
||||
///
|
||||
/// This method is not part of tantivy's public API
|
||||
pub fn save_new_metas(schema: Schema, opstamp: u64, directory: &mut Directory) -> Result<()> {
|
||||
save_metas(vec![], schema, opstamp, directory)
|
||||
save_metas(vec![], schema, opstamp, None, directory)
|
||||
}
|
||||
|
||||
/// Save the index meta file.
|
||||
@@ -62,12 +62,14 @@ pub fn save_metas(
|
||||
segment_metas: Vec<SegmentMeta>,
|
||||
schema: Schema,
|
||||
opstamp: u64,
|
||||
payload: Option<String>,
|
||||
directory: &mut Directory,
|
||||
) -> Result<()> {
|
||||
let metas = IndexMeta {
|
||||
segments: segment_metas,
|
||||
schema,
|
||||
opstamp,
|
||||
payload: payload.clone(),
|
||||
};
|
||||
let mut buffer = serde_json::to_vec_pretty(&metas)?;
|
||||
write!(&mut buffer, "\n")?;
|
||||
@@ -222,7 +224,7 @@ impl SegmentUpdater {
|
||||
self.0.killed.store(true, Ordering::Release);
|
||||
}
|
||||
|
||||
fn is_alive(&self) -> bool {
|
||||
pub fn is_alive(&self) -> bool {
|
||||
!self.0.killed.load(Ordering::Acquire)
|
||||
}
|
||||
|
||||
@@ -239,7 +241,7 @@ impl SegmentUpdater {
|
||||
Ok(segment_entries)
|
||||
}
|
||||
|
||||
pub fn save_metas(&self, opstamp: u64) {
|
||||
pub fn save_metas(&self, opstamp: u64, commit_message: Option<String>) {
|
||||
if self.is_alive() {
|
||||
let index = &self.0.index;
|
||||
let directory = index.directory();
|
||||
@@ -247,6 +249,7 @@ impl SegmentUpdater {
|
||||
self.0.segment_manager.committed_segment_metas(),
|
||||
index.schema(),
|
||||
opstamp,
|
||||
commit_message,
|
||||
directory.box_clone().borrow_mut(),
|
||||
).expect("Could not save metas.");
|
||||
}
|
||||
@@ -266,14 +269,14 @@ impl SegmentUpdater {
|
||||
.garbage_collect(|| self.0.segment_manager.list_files());
|
||||
}
|
||||
|
||||
pub fn commit(&self, opstamp: u64) -> Result<()> {
|
||||
pub fn commit(&self, opstamp: u64, payload: Option<String>) -> Result<()> {
|
||||
self.run_async(move |segment_updater| {
|
||||
if segment_updater.is_alive() {
|
||||
let segment_entries = segment_updater
|
||||
.purge_deletes(opstamp)
|
||||
.expect("Failed purge deletes");
|
||||
segment_updater.0.segment_manager.commit(segment_entries);
|
||||
segment_updater.save_metas(opstamp);
|
||||
segment_updater.save_metas(opstamp, payload);
|
||||
segment_updater.garbage_collect_files_exec();
|
||||
segment_updater.consider_merge_options();
|
||||
}
|
||||
@@ -382,7 +385,12 @@ impl SegmentUpdater {
|
||||
let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone();
|
||||
let mut _file_protection_opt = None;
|
||||
if let Some(delete_operation) = delete_cursor.get() {
|
||||
let committed_opstamp = segment_updater.0.index.opstamp();
|
||||
let committed_opstamp = segment_updater
|
||||
.0
|
||||
.index
|
||||
.load_metas()
|
||||
.expect("Failed to read opstamp")
|
||||
.opstamp;
|
||||
if delete_operation.opstamp < committed_opstamp {
|
||||
let index = &segment_updater.0.index;
|
||||
let segment = index.segment(after_merge_segment_entry.meta().clone());
|
||||
@@ -418,7 +426,8 @@ impl SegmentUpdater {
|
||||
.end_merge(&before_merge_segment_ids, after_merge_segment_entry);
|
||||
segment_updater.consider_merge_options();
|
||||
info!("save metas");
|
||||
segment_updater.save_metas(segment_updater.0.index.opstamp());
|
||||
let previous_metas = segment_updater.0.index.load_metas().unwrap();
|
||||
segment_updater.save_metas(previous_metas.opstamp, previous_metas.payload);
|
||||
segment_updater.garbage_collect_files_exec();
|
||||
}).wait()
|
||||
}
|
||||
|
||||
@@ -34,6 +34,8 @@ extern crate log;
|
||||
#[macro_use]
|
||||
extern crate error_chain;
|
||||
|
||||
extern crate regex;
|
||||
extern crate tempfile;
|
||||
extern crate atomicwrites;
|
||||
extern crate bit_set;
|
||||
extern crate byteorder;
|
||||
@@ -45,10 +47,8 @@ extern crate futures;
|
||||
extern crate futures_cpupool;
|
||||
extern crate itertools;
|
||||
extern crate lz4;
|
||||
extern crate memmap;
|
||||
extern crate num_cpus;
|
||||
extern crate owning_ref;
|
||||
extern crate regex;
|
||||
extern crate rust_stemmers;
|
||||
extern crate serde;
|
||||
extern crate serde_json;
|
||||
@@ -447,7 +447,7 @@ mod tests {
|
||||
{
|
||||
index_writer.delete_term(Term::from_field_text(text_field, "c"));
|
||||
}
|
||||
index_writer = index_writer.rollback().unwrap();
|
||||
index_writer.rollback().unwrap();
|
||||
index_writer.delete_term(Term::from_field_text(text_field, "a"));
|
||||
index_writer.commit().unwrap();
|
||||
}
|
||||
|
||||
@@ -373,7 +373,7 @@ mod test {
|
||||
|
||||
#[test]
|
||||
pub fn test_parse_nonindexed_field_yields_error() {
|
||||
let query_parser = make_query_parser();
|
||||
let query_parser = make_query_parser();
|
||||
|
||||
let is_not_indexed_err = |query: &str| {
|
||||
let result: Result<Box<Query>, QueryParserError> = query_parser.parse_query(query);
|
||||
|
||||
@@ -42,6 +42,11 @@ impl Default for TextOptions {
|
||||
}
|
||||
}
|
||||
|
||||
/// Configuration defining indexing for a text field.
|
||||
/// It wraps:
|
||||
///
|
||||
/// * record (See [`IndexRecordOption`](./enum.IndexRecordOption.html))
|
||||
/// * tokenizer
|
||||
#[derive(Clone, PartialEq, Eq, Debug, Serialize, Deserialize)]
|
||||
pub struct TextFieldIndexing {
|
||||
record: IndexRecordOption,
|
||||
|
||||
@@ -34,22 +34,33 @@ impl StoreReader {
|
||||
}
|
||||
}
|
||||
|
||||
fn block_offset(&self, doc_id: DocId) -> (DocId, u64) {
|
||||
pub(crate) fn block_index(&self) -> SkipList<u64> {
|
||||
SkipList::from(self.offset_index_source.as_slice())
|
||||
}
|
||||
|
||||
fn block_offset(&self, doc_id: DocId) -> (DocId, u64) {
|
||||
self.block_index()
|
||||
.seek(doc_id + 1)
|
||||
.unwrap_or((0u32, 0u64))
|
||||
}
|
||||
|
||||
pub(crate) fn block_data(&self) -> &[u8] {
|
||||
self.data.as_slice()
|
||||
}
|
||||
|
||||
fn compressed_block(&self, addr: usize) -> &[u8] {
|
||||
let total_buffer = self.data.as_slice();
|
||||
let mut buffer = &total_buffer[addr..];
|
||||
let block_len = u32::deserialize(&mut buffer).expect("") as usize;
|
||||
&buffer[..block_len]
|
||||
}
|
||||
|
||||
fn read_block(&self, block_offset: usize) -> io::Result<()> {
|
||||
if block_offset != *self.current_block_offset.borrow() {
|
||||
let mut current_block_mut = self.current_block.borrow_mut();
|
||||
current_block_mut.clear();
|
||||
let total_buffer = self.data.as_slice();
|
||||
let mut cursor = &total_buffer[block_offset..];
|
||||
let block_length = u32::deserialize(&mut cursor).unwrap();
|
||||
let block_array: &[u8] = &total_buffer
|
||||
[(block_offset + 4 as usize)..(block_offset + 4 + block_length as usize)];
|
||||
let mut lz4_decoder = lz4::Decoder::new(block_array)?;
|
||||
let compressed_block = self.compressed_block(block_offset);
|
||||
let mut lz4_decoder = lz4::Decoder::new(compressed_block)?;
|
||||
*self.current_block_offset.borrow_mut() = usize::max_value();
|
||||
lz4_decoder.read_to_end(&mut current_block_mut).map(|_| ())?;
|
||||
*self.current_block_offset.borrow_mut() = block_offset;
|
||||
|
||||
@@ -3,6 +3,7 @@ use DocId;
|
||||
use schema::FieldValue;
|
||||
use common::BinarySerializable;
|
||||
use std::io::{self, Write};
|
||||
use super::StoreReader;
|
||||
use lz4;
|
||||
use datastruct::SkipListBuilder;
|
||||
use common::CountingWriter;
|
||||
@@ -60,6 +61,35 @@ impl StoreWriter {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Stacks a store reader on top of the documents written so far.
|
||||
/// This method is an optimization compared to iterating over the documents
|
||||
/// in the store and adding them one by one, as the store's data will
|
||||
/// not be decompressed and then recompressed.
|
||||
pub fn stack(&mut self, store_reader: &StoreReader) -> io::Result<()> {
|
||||
if !self.current_block.is_empty() {
|
||||
self.write_and_compress_block()?;
|
||||
self.offset_index_writer.insert(
|
||||
self.doc,
|
||||
&(self.writer.written_bytes() as u64),
|
||||
)?;
|
||||
}
|
||||
let doc_offset = self.doc;
|
||||
let start_offset = self.writer.written_bytes() as u64;
|
||||
|
||||
// just bulk write all of the block of the given reader.
|
||||
self.writer.write_all(store_reader.block_data())?;
|
||||
|
||||
// concatenate the index of the `store_reader`, after translating
|
||||
// its start doc id and its start file offset.
|
||||
for (next_doc_id, block_addr) in store_reader.block_index() {
|
||||
self.doc = doc_offset + next_doc_id;
|
||||
self.offset_index_writer.insert(
|
||||
self.doc,
|
||||
&(start_offset + block_addr))?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn write_and_compress_block(&mut self) -> io::Result<()> {
|
||||
self.intermediary_buffer.clear();
|
||||
{
|
||||
|
||||
@@ -49,19 +49,26 @@ impl TermDeltaDecoder {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// code
|
||||
// first bit represents whether the prefix / suffix len can be encoded
|
||||
// on the same byte. (the next one)
|
||||
//
|
||||
|
||||
#[inline(always)]
|
||||
pub fn decode<'a>(&mut self, code: u8, mut cursor: &'a [u8]) -> &'a [u8] {
|
||||
let (prefix_len, suffix_len): (usize, usize) = if (code & 1u8) == 1u8 {
|
||||
let b = cursor[0];
|
||||
cursor = &cursor[1..];
|
||||
let prefix_len = (b & 15u8) as usize;
|
||||
let suffix_len = (b >> 4u8) as usize;
|
||||
(prefix_len, suffix_len)
|
||||
} else {
|
||||
let prefix_len = u32::deserialize(&mut cursor).unwrap();
|
||||
let suffix_len = u32::deserialize(&mut cursor).unwrap();
|
||||
(prefix_len as usize, suffix_len as usize)
|
||||
};
|
||||
let (prefix_len, suffix_len): (usize, usize) =
|
||||
if (code & 1u8) == 1u8 {
|
||||
let b = cursor[0];
|
||||
cursor = &cursor[1..];
|
||||
let prefix_len = (b & 15u8) as usize;
|
||||
let suffix_len = (b >> 4u8) as usize;
|
||||
(prefix_len, suffix_len)
|
||||
} else {
|
||||
let prefix_len = u32::deserialize(&mut cursor).unwrap();
|
||||
let suffix_len = u32::deserialize(&mut cursor).unwrap();
|
||||
(prefix_len as usize, suffix_len as usize)
|
||||
};
|
||||
unsafe { self.term.set_len(prefix_len) };
|
||||
self.term.extend_from_slice(&(*cursor)[..suffix_len]);
|
||||
&cursor[suffix_len..]
|
||||
@@ -75,8 +82,8 @@ impl TermDeltaDecoder {
|
||||
#[derive(Default)]
|
||||
pub struct DeltaTermInfo {
|
||||
pub doc_freq: u32,
|
||||
pub delta_postings_offset: u32,
|
||||
pub delta_positions_offset: u32,
|
||||
pub delta_postings_offset: u64,
|
||||
pub delta_positions_offset: u64,
|
||||
pub positions_inner_offset: u8,
|
||||
}
|
||||
|
||||
@@ -101,7 +108,7 @@ impl TermInfoDeltaEncoder {
|
||||
let mut delta_term_info = DeltaTermInfo {
|
||||
doc_freq: term_info.doc_freq,
|
||||
delta_postings_offset: term_info.postings_offset - self.term_info.postings_offset,
|
||||
delta_positions_offset: 0,
|
||||
delta_positions_offset: 0u64,
|
||||
positions_inner_offset: 0,
|
||||
};
|
||||
if self.has_positions {
|
||||
@@ -152,7 +159,7 @@ impl TermInfoDeltaDecoder {
|
||||
let mut v: u64 = unsafe { *(cursor.as_ptr() as *const u64) };
|
||||
let doc_freq: u32 = (v as u32) & make_mask(num_bytes_docfreq);
|
||||
v >>= (num_bytes_docfreq as u64) * 8u64;
|
||||
let delta_postings_offset: u32 = (v as u32) & make_mask(num_bytes_postings_offset);
|
||||
let delta_postings_offset: u64 = v & make_mask(num_bytes_postings_offset);
|
||||
cursor = &cursor[num_bytes_docfreq + num_bytes_postings_offset..];
|
||||
self.term_info.doc_freq = doc_freq;
|
||||
self.term_info.postings_offset += delta_postings_offset;
|
||||
|
||||
66
src/tokenizer/alphanum_only.rs
Normal file
66
src/tokenizer/alphanum_only.rs
Normal file
@@ -0,0 +1,66 @@
|
||||
use super::{Token, TokenFilter, TokenStream};
|
||||
|
||||
/// `TokenFilter` that removes all tokens that contain non
|
||||
/// ascii alphanumeric characters.
|
||||
#[derive(Clone)]
|
||||
pub struct AlphaNumOnlyFilter;
|
||||
|
||||
pub struct AlphaNumOnlyFilterStream<TailTokenStream>
|
||||
where TailTokenStream: TokenStream
|
||||
{
|
||||
tail: TailTokenStream,
|
||||
}
|
||||
|
||||
|
||||
impl<TailTokenStream> AlphaNumOnlyFilterStream<TailTokenStream>
|
||||
where TailTokenStream: TokenStream
|
||||
{
|
||||
fn predicate(&self, token: &Token) -> bool {
|
||||
token.text.chars().all(|c| c.is_ascii_alphanumeric())
|
||||
}
|
||||
|
||||
fn wrap(
|
||||
tail: TailTokenStream,
|
||||
) -> AlphaNumOnlyFilterStream<TailTokenStream> {
|
||||
AlphaNumOnlyFilterStream {
|
||||
tail
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
impl<TailTokenStream> TokenFilter<TailTokenStream> for AlphaNumOnlyFilter
|
||||
where
|
||||
TailTokenStream: TokenStream,
|
||||
{
|
||||
type ResultTokenStream = AlphaNumOnlyFilterStream<TailTokenStream>;
|
||||
|
||||
fn transform(&self, token_stream: TailTokenStream) -> Self::ResultTokenStream {
|
||||
AlphaNumOnlyFilterStream::wrap(token_stream)
|
||||
}
|
||||
}
|
||||
|
||||
impl<TailTokenStream> TokenStream for AlphaNumOnlyFilterStream<TailTokenStream>
|
||||
where
|
||||
TailTokenStream: TokenStream
|
||||
{
|
||||
fn token(&self) -> &Token {
|
||||
self.tail.token()
|
||||
}
|
||||
|
||||
fn token_mut(&mut self) -> &mut Token {
|
||||
self.tail.token_mut()
|
||||
}
|
||||
|
||||
fn advance(&mut self) -> bool {
|
||||
loop {
|
||||
if self.tail.advance() {
|
||||
if self.predicate(self.tail.token()) {
|
||||
return true;
|
||||
}
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -137,7 +137,9 @@ mod tokenizer_manager;
|
||||
mod japanese_tokenizer;
|
||||
mod token_stream_chain;
|
||||
mod raw_tokenizer;
|
||||
mod alphanum_only;
|
||||
|
||||
pub use self::alphanum_only::AlphaNumOnlyFilter;
|
||||
pub use self::tokenizer::{Token, TokenFilter, TokenStream, Tokenizer};
|
||||
pub use self::tokenizer::BoxedTokenizer;
|
||||
pub use self::tokenizer_manager::TokenizerManager;
|
||||
|
||||
@@ -11,7 +11,7 @@ pub struct RemoveLongFilter {
|
||||
}
|
||||
|
||||
impl RemoveLongFilter {
|
||||
// the limit is in bytes of the UTF-8 representation.
|
||||
/// Creates a `RemoveLongFilter` given a limit in bytes of the UTF-8 representation.
|
||||
pub fn limit(length_limit: usize) -> RemoveLongFilter {
|
||||
RemoveLongFilter { length_limit }
|
||||
}
|
||||
|
||||
@@ -2,12 +2,15 @@ use std::sync::Arc;
|
||||
use super::{Token, TokenFilter, TokenStream};
|
||||
use rust_stemmers::{self, Algorithm};
|
||||
|
||||
/// `Stemmer` token filter. Currently only English is supported.
|
||||
/// Tokens are expected to be lowercased beforehands.
|
||||
#[derive(Clone)]
|
||||
pub struct Stemmer {
|
||||
stemmer_algorithm: Arc<Algorithm>,
|
||||
}
|
||||
|
||||
impl Stemmer {
|
||||
/// Creates a new Stemmer `TokenFilter`.
|
||||
pub fn new() -> Stemmer {
|
||||
Stemmer {
|
||||
stemmer_algorithm: Arc::new(Algorithm::English),
|
||||
|
||||
@@ -195,6 +195,10 @@ pub trait TokenStream {
|
||||
/// Returns a mutable reference to the current token.
|
||||
fn token_mut(&mut self) -> &mut Token;
|
||||
|
||||
/// Helper to iterate over tokens. It
|
||||
/// simply combines a call to `.advance()`
|
||||
/// and `.token()`.
|
||||
///
|
||||
/// ```
|
||||
/// # extern crate tantivy;
|
||||
/// # use tantivy::tokenizer::*;
|
||||
@@ -217,6 +221,8 @@ pub trait TokenStream {
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper function to consume the entire `TokenStream`
|
||||
/// and push the tokens to a sink function.
|
||||
fn process(&mut self, sink: &mut FnMut(&Token)) -> u32 {
|
||||
let mut num_tokens_pushed = 0u32;
|
||||
while self.advance() {
|
||||
@@ -247,7 +253,10 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// Trait for the pluggable components of `Tokenizer`s.
|
||||
pub trait TokenFilter<TailTokenStream: TokenStream>: Clone {
|
||||
/// The resulting `TokenStream` type.
|
||||
type ResultTokenStream: TokenStream;
|
||||
|
||||
/// Wraps a token stream and returns the modified one.
|
||||
|
||||
@@ -28,6 +28,7 @@ pub struct TokenizerManager {
|
||||
}
|
||||
|
||||
impl TokenizerManager {
|
||||
/// Registers a new tokenizer associated with a given name.
|
||||
pub fn register<A>(&self, tokenizer_name: &str, tokenizer: A)
|
||||
where
|
||||
A: 'static + Send + Sync + for<'a> Tokenizer<'a>,
|
||||
@@ -39,6 +40,7 @@ impl TokenizerManager {
|
||||
.insert(tokenizer_name.to_string(), boxed_tokenizer);
|
||||
}
|
||||
|
||||
/// Accessing a tokenizer given its name.
|
||||
pub fn get(&self, tokenizer_name: &str) -> Option<Box<BoxedTokenizer>> {
|
||||
self.tokenizers
|
||||
.read()
|
||||
|
||||
Reference in New Issue
Block a user