Compare commits

..

2 Commits

Author SHA1 Message Date
Paul Masurel
1658be3792 Various changes. Need to cherrypick some of them and put them into master 2017-12-25 10:35:10 +09:00
Paul Masurel
23fad88b35 NOBUG common crawl, streamdict works with 64 bits (hopefully) 2017-12-21 22:44:50 +09:00
29 changed files with 273 additions and 469 deletions

View File

@@ -1,5 +1,4 @@
language: rust
cache: cargo
rust:
- nightly
env:
@@ -12,7 +11,6 @@ addons:
apt:
sources:
- ubuntu-toolchain-r-test
- kalakris-cmake
packages:
- gcc-4.8
- g++-4.8
@@ -20,15 +18,18 @@ addons:
- libelf-dev
- libdw-dev
- binutils-dev
- cmake
before_script:
- |
cargo install cargo-travis || echo "cargo-travis already installed"
export PATH=$HOME/.cargo/bin:$PATH
pip install 'travis-cargo<0.2' --user &&
export PATH=$HOME/.local/bin:$PATH
script:
- cargo build
- cargo test
- |
travis-cargo build &&
travis-cargo test &&
travis-cargo bench
- cargo run --example simple_search
after_success:
- cargo coveralls --exclude-pattern cpp/
- bash ./script/build-doc.sh
- travis-cargo doc-upload
- if [[ "$TRAVIS_OS_NAME" == "linux" ]]; then travis-cargo coveralls --no-sudo --verify; fi
- if [[ "$TRAVIS_OS_NAME" == "linux" ]]; then ./kcov/build/src/kcov --verify --coveralls-id=$TRAVIS_JOB_ID --include-path=`pwd`/src --exclude-path=`pwd`/cpp --exclude-pattern=/.cargo target/kcov target/debug/tantivy-*; fi

View File

@@ -14,10 +14,11 @@ keywords = ["search", "information", "retrieval"]
[dependencies]
byteorder = "1.0"
memmap = "0.4"
lazy_static = "0.2.1"
tinysegmenter = "0.1.0"
regex = "0.2"
fst = "0.2"
fst = "0.1.37"
atomicwrites = "0.1.3"
tempfile = "2.1"
log = "0.3.6"

View File

@@ -20,7 +20,10 @@ fn main() {
}
}
fn run_example(index_path: &Path) -> tantivy::Result<()> {
// # Defining the schema
//
// The Tantivy index requires a very strict schema.
@@ -28,6 +31,7 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> {
// and for each field, its type and "the way it should
// be indexed".
// first we need to define a schema ...
let mut schema_builder = SchemaBuilder::default();
@@ -58,6 +62,8 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> {
let schema = schema_builder.build();
// # Indexing documents
//
// Let's create a brand new index.
@@ -66,6 +72,7 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> {
// with our schema in the directory.
let index = Index::create(index_path, schema.clone())?;
// To insert document we need an index writer.
// There must be only one writer at a time.
// This single `IndexWriter` is already
@@ -78,6 +85,7 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> {
// Let's index our documents!
// We first need a handle on the title and the body field.
// ### Create a document "manually".
//
// We can create a document manually, by setting the fields
@@ -90,7 +98,7 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> {
old_man_doc.add_text(
body,
"He was an old man who fished alone in a skiff in the Gulf Stream and \
he had gone eighty-four days now without taking a fish.",
he had gone eighty-four days now without taking a fish.",
);
// ... and add it to the `IndexWriter`.
@@ -137,6 +145,7 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> {
// Indexing 5 million articles of the English wikipedia takes
// around 4 minutes on my computer!
// ### Committing
//
// At this point our documents are not searchable.
@@ -158,6 +167,7 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> {
// tantivy behaves as if has rolled back to its last
// commit.
// # Searching
//
// Let's search our index. Start by reloading
@@ -182,6 +192,7 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> {
// A ticket has been opened regarding this problem.
let query = query_parser.parse_query("sea whale")?;
// A query defines a set of documents, as
// well as the way they should be scored.
//

View File

@@ -133,7 +133,8 @@ where
addr + 8 <= data.len(),
"The fast field field should have been padded with 7 bytes."
);
let val_unshifted_unmasked: u64 = unsafe { *(data[addr..].as_ptr() as *const u64) };
let val_unshifted_unmasked: u64 =
unsafe { *(data[addr..].as_ptr() as *const u64) };
let val_shifted = (val_unshifted_unmasked >> bit_shift) as u64;
(val_shifted & mask)
} else {
@@ -164,7 +165,8 @@ where
for output_val in output.iter_mut() {
let addr = addr_in_bits >> 3;
let bit_shift = addr_in_bits & 7;
let val_unshifted_unmasked: u64 = unsafe { *(data[addr..].as_ptr() as *const u64) };
let val_unshifted_unmasked: u64 =
unsafe { *(data[addr..].as_ptr() as *const u64) };
let val_shifted = (val_unshifted_unmasked >> bit_shift) as u64;
*output_val = val_shifted & mask;
addr_in_bits += num_bits;

View File

@@ -25,7 +25,9 @@ fn compress_sorted(vals: &[u32], output: &mut [u8], offset: u32) -> usize {
}
fn uncompress_sorted(compressed_data: &[u8], output: &mut [u32], offset: u32) -> usize {
unsafe { simdcomp::uncompress_sorted(compressed_data.as_ptr(), output.as_mut_ptr(), offset) }
unsafe {
simdcomp::uncompress_sorted(compressed_data.as_ptr(), output.as_mut_ptr(), offset)
}
}
fn compress_unsorted(vals: &[u32], output: &mut [u8]) -> usize {

View File

@@ -18,7 +18,6 @@ use core::SegmentMeta;
use super::pool::LeasedItem;
use std::path::Path;
use core::IndexMeta;
use indexer::DirectoryLock;
use IndexWriter;
use directory::ManagedDirectory;
use core::META_FILEPATH;
@@ -114,9 +113,12 @@ impl Index {
Index::create_from_metas(directory, &metas)
}
/// Reads the index meta file from the directory.
pub fn load_metas(&self) -> Result<IndexMeta> {
load_metas(self.directory())
/// Returns the index opstamp.
///
/// The opstamp is the number of documents that have been added
/// from the beginning of time, and until the moment of the last commit.
pub fn opstamp(&self) -> u64 {
load_metas(self.directory()).unwrap().opstamp
}
/// Open a new index writer. Attempts to acquire a lockfile.
@@ -139,8 +141,7 @@ impl Index {
num_threads: usize,
heap_size_in_bytes: usize,
) -> Result<IndexWriter> {
let directory_lock = DirectoryLock::lock(self.directory().box_clone())?;
open_index_writer(self, num_threads, heap_size_in_bytes, directory_lock)
open_index_writer(self, num_threads, heap_size_in_bytes)
}
/// Creates a multithreaded writer
@@ -193,7 +194,7 @@ impl Index {
/// Reads the meta.json and returns the list of
/// `SegmentMeta` from the last commit.
pub fn searchable_segment_metas(&self) -> Result<Vec<SegmentMeta>> {
Ok(self.load_metas()?.segments)
Ok(load_metas(self.directory())?.segments)
}
/// Returns the list of segment ids that are searchable.

View File

@@ -14,7 +14,6 @@ pub struct IndexMeta {
pub segments: Vec<SegmentMeta>,
pub schema: Schema,
pub opstamp: u64,
#[serde(skip_serializing_if = "Option::is_none")] pub payload: Option<String>,
}
impl IndexMeta {
@@ -23,32 +22,6 @@ impl IndexMeta {
segments: vec![],
schema,
opstamp: 0u64,
payload: None,
}
}
}
#[cfg(test)]
mod tests {
use serde_json;
use super::IndexMeta;
use schema::{SchemaBuilder, TEXT};
#[test]
fn test_serialize_metas() {
let schema = {
let mut schema_builder = SchemaBuilder::new();
schema_builder.add_text_field("text", TEXT);
schema_builder.build()
};
let index_metas = IndexMeta {
segments: Vec::new(),
schema: schema,
opstamp: 0u64,
payload: None,
};
let json = serde_json::ser::to_string(&index_metas).expect("serialization failed");
assert_eq!(json, r#"{"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","tokenizer":"default"},"stored":false}}],"opstamp":0}"#);
}
}

View File

@@ -58,8 +58,10 @@ mod murmurhash2 {
///
/// Returns (the heap size in bytes, the hash table size in number of bits)
pub(crate) fn split_memory(per_thread_memory_budget: usize) -> (usize, usize) {
let table_size_limit: usize = per_thread_memory_budget / 3;
let compute_table_size = |num_bits: usize| (1 << num_bits) * mem::size_of::<KeyValue>();
let table_size_limit: usize = per_thread_memory_budget / 5;
let compute_table_size = |num_bits: usize| {
(1 << num_bits) * mem::size_of::<KeyValue>()
};
let table_num_bits: usize = (1..)
.into_iter()
.take_while(|num_bits: &usize| compute_table_size(*num_bits) < table_size_limit)
@@ -217,9 +219,9 @@ mod tests {
#[test]
fn test_hashmap_size() {
assert_eq!(split_memory(100_000), (67232, 12));
assert_eq!(split_memory(1_000_000), (737856, 15));
assert_eq!(split_memory(10_000_000), (7902848, 18));
assert_eq!(split_memory(100_000), (67232, 9));
assert_eq!(split_memory(1_000_000), (737856, 12));
assert_eq!(split_memory(10_000_000), (7902848, 15));
}
#[test]

View File

@@ -6,6 +6,7 @@ use directory::ReadOnlySource;
use directory::shared_vec_slice::SharedVecSlice;
use directory::WritePtr;
use fst::raw::MmapReadOnly;
use memmap::{Mmap, Protection};
use std::collections::hash_map::Entry as HashMapEntry;
use std::collections::HashMap;
use std::convert::From;
@@ -14,39 +15,35 @@ use std::fs::{self, File};
use std::fs::OpenOptions;
use std::io::{self, Seek, SeekFrom};
use std::io::{BufWriter, Read, Write};
use std::mem;
use std::path::{Path, PathBuf};
use std::result;
use std::sync::Arc;
use std::sync::RwLock;
use std::sync::Weak;
use tempdir::TempDir;
/// Returns None iff the file exists, can be read, but is empty (and hence
/// cannot be mmapped).
///
fn open_mmap(full_path: &PathBuf) -> result::Result<Option<MmapReadOnly>, OpenReadError> {
let file = File::open(&full_path)
.map_err(|e| {
if e.kind() == io::ErrorKind::NotFound {
OpenReadError::FileDoesNotExist(full_path.clone())
} else {
OpenReadError::IOError(IOError::with_path(full_path.to_owned(), e))
}
})?;
fn open_mmap(full_path: &Path) -> result::Result<Option<Arc<Mmap>>, OpenReadError> {
let file = File::open(&full_path).map_err(|e| {
if e.kind() == io::ErrorKind::NotFound {
OpenReadError::FileDoesNotExist(full_path.to_owned())
} else {
OpenReadError::IOError(IOError::with_path(full_path.to_owned(), e))
}
})?;
let meta_data = file.metadata()
.map_err(|e| IOError::with_path(full_path.to_owned(), e))?;
if meta_data.len() == 0 {
// if the file size is 0, it will not be possible
// to mmap the file, so we return None
// to mmap the file, so we return an anonymous mmap_cache
// instead.
return Ok(None);
}
MmapReadOnly::open(&file)
.map(Some)
.map_err(|e| {
From::from(IOError::with_path(full_path.to_owned(), e))
})
match Mmap::open(&file, Protection::Read) {
Ok(mmap) => Ok(Some(Arc::new(mmap))),
Err(e) => Err(IOError::with_path(full_path.to_owned(), e))?,
}
}
#[derive(Default, Clone, Debug, Serialize, Deserialize)]
@@ -55,7 +52,10 @@ pub struct CacheCounters {
pub hit: usize,
// Number of time tantivy had to call `mmap`
// as no entry was in the cache.
pub miss: usize,
pub miss_empty: usize,
// Number of time tantivy had to call `mmap`
// as the entry in the cache was evinced.
pub miss_weak: usize,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
@@ -66,26 +66,38 @@ pub struct CacheInfo {
struct MmapCache {
counters: CacheCounters,
cache: HashMap<PathBuf, MmapReadOnly>,
cache: HashMap<PathBuf, Weak<Mmap>>,
purge_weak_limit: usize,
}
const STARTING_PURGE_WEAK_LIMIT: usize = 1_000;
impl Default for MmapCache {
fn default() -> MmapCache {
MmapCache {
counters: CacheCounters::default(),
cache: HashMap::new(),
purge_weak_limit: STARTING_PURGE_WEAK_LIMIT,
}
}
}
impl MmapCache {
/// Removes a `MmapReadOnly` entry from the mmap cache.
fn discard_from_cache(&mut self, full_path: &Path) -> bool {
self.cache.remove(full_path).is_some()
fn cleanup(&mut self) {
let previous_cache_size = self.cache.len();
let mut new_cache = HashMap::new();
mem::swap(&mut new_cache, &mut self.cache);
self.cache = new_cache
.into_iter()
.filter(|&(_, ref weak_ref)| weak_ref.upgrade().is_some())
.collect();
if self.cache.len() == previous_cache_size {
self.purge_weak_limit *= 2;
}
}
fn get_info(&mut self) -> CacheInfo {
self.cleanup();
let paths: Vec<PathBuf> = self.cache.keys().cloned().collect();
CacheInfo {
counters: self.counters.clone(),
@@ -93,23 +105,38 @@ impl MmapCache {
}
}
fn get_mmap(&mut self, full_path: PathBuf) -> Result<Option<MmapReadOnly>, OpenReadError> {
fn get_mmap(&mut self, full_path: &PathBuf) -> Result<Option<Arc<Mmap>>, OpenReadError> {
// if we exceed this limit, then we go through the weak
// and remove those that are obsolete.
if self.cache.len() > self.purge_weak_limit {
self.cleanup();
}
Ok(match self.cache.entry(full_path.clone()) {
HashMapEntry::Occupied(occupied_entry) => {
let mmap = occupied_entry.get();
self.counters.hit += 1;
Some(mmap.clone())
}
HashMapEntry::Vacant(vacant_entry) => {
self.counters.miss += 1;
if let Some(mmap) = open_mmap(&full_path)? {
vacant_entry.insert(mmap.clone());
Some(mmap)
} else {
None
}
}
})
HashMapEntry::Occupied(mut occupied_entry) => {
if let Some(mmap_arc) = occupied_entry.get().upgrade() {
self.counters.hit += 1;
Some(Arc::clone(&mmap_arc))
} else {
// The entry exists but the weak ref has been destroyed.
self.counters.miss_weak += 1;
if let Some(mmap_arc) = open_mmap(full_path)? {
occupied_entry.insert(Arc::downgrade(&mmap_arc));
Some(mmap_arc)
} else {
None
}
}
}
HashMapEntry::Vacant(vacant_entry) => {
self.counters.miss_empty += 1;
if let Some(mmap_arc) = open_mmap(full_path)? {
vacant_entry.insert(Arc::downgrade(&mmap_arc));
Some(mmap_arc)
} else {
None
}
}
})
}
}
@@ -201,7 +228,6 @@ impl MmapDirectory {
fd.sync_all()?;
Ok(())
}
/// Returns some statistical information
/// about the Mmap cache.
///
@@ -257,9 +283,10 @@ impl Directory for MmapDirectory {
})?;
Ok(mmap_cache
.get_mmap(full_path)?
.map(ReadOnlySource::Mmap)
.unwrap_or_else(|| ReadOnlySource::Anonymous(SharedVecSlice::empty())))
.get_mmap(&full_path)?
.map(MmapReadOnly::from)
.map(ReadOnlySource::Mmap)
.unwrap_or_else(|| ReadOnlySource::Anonymous(SharedVecSlice::empty())))
}
fn open_write(&mut self, path: &Path) -> Result<WritePtr, OpenWriteError> {
@@ -292,22 +319,17 @@ impl Directory for MmapDirectory {
Ok(BufWriter::new(Box::new(writer)))
}
/// Any entry associated to the path in the mmap will be
/// removed before the file is deleted.
fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
debug!("Deleting file {:?}", path);
let full_path = self.resolve_path(path);
let mut mmap_cache = self.mmap_cache
.write()
.map_err(|_| {
let msg = format!("Failed to acquired write lock \
on mmap cache while deleting {:?}",
path);
IOError::with_path(path.to_owned(), make_io_err(msg))
})?;
mmap_cache.discard_from_cache(path);
let mut mmap_cache = self.mmap_cache.write().map_err(|_| {
let msg = format!(
"Failed to acquired write lock \
on mmap cache while deleting {:?}",
path
);
IOError::with_path(path.to_owned(), make_io_err(msg))
})?;
// Removing the entry in the MMap cache.
// The munmap will appear on Drop,
// when the last reference is gone.
@@ -393,8 +415,7 @@ mod tests {
// here we test if the cache releases
// mmaps correctly.
let mut mmap_directory = MmapDirectory::create_from_tempdir().unwrap();
let num_paths = 10;
let paths: Vec<PathBuf> = (0..num_paths)
let paths: Vec<PathBuf> = (0..10)
.map(|i| PathBuf::from(&*format!("file_{}", i)))
.collect();
{
@@ -405,22 +426,49 @@ mod tests {
}
}
{
for (i, path) in paths.iter().enumerate() {
let _r = mmap_directory.open_read(path).unwrap();
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), i + 1);
}
for path in paths.iter() {
let _r = mmap_directory.open_read(path).unwrap();
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), num_paths);
}
for (i, path) in paths.iter().enumerate() {
println!("delete paths {:?}", path);
mmap_directory.delete(path).unwrap();
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), num_paths - i - 1);
for path in &paths {
{
let _r = mmap_directory.open_read(path).unwrap();
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 1);
}
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 0);
}
}
assert_eq!(mmap_directory.get_cache_info().counters.hit, 10);
assert_eq!(mmap_directory.get_cache_info().counters.miss, 10);
assert_eq!(mmap_directory.get_cache_info().counters.miss_empty, 10);
{
// test weak miss
// the first pass create the weak refs.
for path in &paths {
let _r = mmap_directory.open_read(path).unwrap();
}
// ... the second hits the weak refs.
for path in &paths {
let _r = mmap_directory.open_read(path).unwrap();
}
let cache_info = mmap_directory.get_cache_info();
assert_eq!(cache_info.counters.miss_empty, 20);
assert_eq!(cache_info.counters.miss_weak, 10);
}
{
let mut saved_readmmaps = vec![];
// Keeps reference alive
for (i, path) in paths.iter().enumerate() {
let r = mmap_directory.open_read(path).unwrap();
saved_readmmaps.push(r);
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), i + 1);
}
let cache_info = mmap_directory.get_cache_info();
assert_eq!(cache_info.counters.miss_empty, 30);
assert_eq!(cache_info.counters.miss_weak, 10);
assert_eq!(cache_info.mmapped.len(), 10);
for saved_readmmap in saved_readmmaps {
assert_eq!(saved_readmmap.as_slice(), content);
}
}
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 0);
}

View File

@@ -10,6 +10,7 @@ use indexer::stamper::Stamper;
use datastruct::stacker::Heap;
use directory::FileProtection;
use error::{Error, ErrorKind, Result, ResultExt};
use Directory;
use fastfield::write_delete_bitset;
use indexer::delete_queue::{DeleteCursor, DeleteQueue};
use futures::Canceled;
@@ -28,10 +29,9 @@ use schema::Term;
use std::mem;
use std::mem::swap;
use std::thread::JoinHandle;
use indexer::DirectoryLock;
use super::directory_lock::DirectoryLock;
use super::operation::AddOperation;
use super::segment_updater::SegmentUpdater;
use super::PreparedCommit;
use std::thread;
// Size of the margin for the heap. A segment is closed when the remaining memory
@@ -57,7 +57,7 @@ type DocumentReceiver = chan::Receiver<AddOperation>;
pub struct IndexWriter {
// the lock is just used to bind the
// lifetime of the lock with that of the IndexWriter.
_directory_lock: Option<DirectoryLock>,
_directory_lock: DirectoryLock,
index: Index,
@@ -104,7 +104,6 @@ pub fn open_index_writer(
index: &Index,
num_threads: usize,
heap_size_in_bytes_per_thread: usize,
directory_lock: DirectoryLock,
) -> Result<IndexWriter> {
if heap_size_in_bytes_per_thread <= HEAP_SIZE_LIMIT as usize {
panic!(format!(
@@ -112,12 +111,15 @@ pub fn open_index_writer(
HEAP_SIZE_LIMIT
));
}
let directory_lock = DirectoryLock::lock(index.directory().box_clone())?;
let (document_sender, document_receiver): (DocumentSender, DocumentReceiver) =
chan::sync(PIPELINE_MAX_SIZE_IN_DOCS);
let delete_queue = DeleteQueue::new();
let current_opstamp = index.load_metas()?.opstamp;
let current_opstamp = index.opstamp();
let stamper = Stamper::new(current_opstamp);
@@ -125,7 +127,7 @@ pub fn open_index_writer(
SegmentUpdater::new(index.clone(), stamper.clone(), &delete_queue.cursor())?;
let mut index_writer = IndexWriter {
_directory_lock: Some(directory_lock),
_directory_lock: directory_lock,
heap_size_in_bytes_per_thread,
index: index.clone(),
@@ -284,11 +286,6 @@ fn index_documents(
break;
}
}
if !segment_updater.is_alive() {
return Ok(false);
}
let num_docs = segment_writer.max_doc();
// this is ensured by the call to peek before starting
@@ -479,66 +476,41 @@ impl IndexWriter {
/// state as it was after the last commit.
///
/// The opstamp at the last commit is returned.
pub fn rollback(&mut self) -> Result<()> {
pub fn rollback(mut self) -> Result<IndexWriter> {
info!("Rolling back to opstamp {}", self.committed_opstamp);
// marks the segment updater as killed. From now on, all
// segment updates will be ignored.
self.segment_updater.kill();
let document_receiver = self.document_receiver.clone();
// take the directory lock to create a new index_writer.
let directory_lock = self._directory_lock
.take()
.expect("The IndexWriter does not have any lock. This is a bug, please report.");
let new_index_writer: IndexWriter = open_index_writer(
&self.index,
self.num_threads,
self.heap_size_in_bytes_per_thread,
directory_lock,
)?;
// the current `self` is dropped right away because of this call.
//
// This will drop the document queue, and the thread
// should terminate.
mem::replace(self, new_index_writer);
// Drains the document receiver pipeline :
// Workers don't need to index the pending documents.
//
// This will reach an end as the only document_sender
// was dropped with the index_writer.
for _ in document_receiver.clone() {}
let receiver_clone = self.document_receiver.clone();
let index = self.index.clone();
let num_threads = self.num_threads;
let heap_size_in_bytes_per_thread = self.heap_size_in_bytes_per_thread;
drop(self);
for _ in receiver_clone {}
Ok(())
let index_writer = open_index_writer(&index, num_threads, heap_size_in_bytes_per_thread)?;
Ok(index_writer)
}
/// Prepares a commit.
/// Commits all of the pending changes
///
/// Calling `prepare_commit()` will cut the indexing
/// queue. All pending documents will be sent to the
/// indexing workers. They will then terminate, regardless
/// of the size of their current segment and flush their
/// work on disk.
/// A call to commit blocks.
/// After it returns, all of the document that
/// were added since the last commit are published
/// and persisted.
///
/// Once a commit is "prepared", you can either
/// call
/// * `.commit()`: to accept this commit
/// * `.abort()`: to cancel this commit.
/// In case of a crash or an hardware failure (as
/// long as the hard disk is spared), it will be possible
/// to resume indexing from this point.
///
/// In the current implementation, `PreparedCommit` borrows
/// the `IndexWriter` mutably so we are guaranteed that no new
/// document can be added as long as it is committed or is
/// dropped.
/// Commit returns the `opstamp` of the last document
/// that made it in the commit.
///
/// It is also possible to add a payload to the `commit`
/// using this API.
/// See [`PreparedCommit::set_payload()`](PreparedCommit.html)
pub fn prepare_commit(&mut self) -> Result<PreparedCommit> {
// Here, because we join all of the worker threads,
pub fn commit(&mut self) -> Result<u64> {
// here, because we join all of the worker threads,
// all of the segment update for this commit have been
// sent.
//
@@ -548,7 +520,8 @@ impl IndexWriter {
// This will move uncommitted segments to the state of
// committed segments.
info!("Preparing commit");
self.committed_opstamp = self.stamper.stamp();
info!("committing {}", self.committed_opstamp);
// this will drop the current document channel
// and recreate a new one channels.
@@ -570,32 +543,10 @@ impl IndexWriter {
self.add_indexing_worker()?;
}
let commit_opstamp = self.stamper.stamp();
let prepared_commit = PreparedCommit::new(self, commit_opstamp);
info!("Prepared commit {}", commit_opstamp);
Ok(prepared_commit)
}
// wait for the segment update thread to have processed the info
self.segment_updater.commit(self.committed_opstamp)?;
/// Commits all of the pending changes
///
/// A call to commit blocks.
/// After it returns, all of the document that
/// were added since the last commit are published
/// and persisted.
///
/// In case of a crash or an hardware failure (as
/// long as the hard disk is spared), it will be possible
/// to resume indexing from this point.
///
/// Commit returns the `opstamp` of the last document
/// that made it in the commit.
///
pub fn commit(&mut self) -> Result<u64> {
self.prepare_commit()?.commit()
}
pub(crate) fn segment_updater(&self) -> &SegmentUpdater {
&self.segment_updater
Ok(self.committed_opstamp)
}
/// Delete all documents containing a given term.
@@ -713,7 +664,8 @@ mod tests {
doc.add_text(text_field, "a");
index_writer.add_document(doc);
}
index_writer.rollback().unwrap();
index_writer = index_writer.rollback().unwrap();
assert_eq!(index_writer.commit_opstamp(), 0u64);
assert_eq!(num_docs_containing("a"), 0);
@@ -776,78 +728,4 @@ mod tests {
}
}
#[test]
fn test_prepare_with_commit_message() {
let _ = env_logger::init();
let mut schema_builder = schema::SchemaBuilder::default();
let text_field = schema_builder.add_text_field("text", schema::TEXT);
let index = Index::create_in_ram(schema_builder.build());
{
// writing the segment
let mut index_writer = index.writer_with_num_threads(4, 4 * 30_000_000).unwrap();
// create 8 segments with 100 tiny docs
for _doc in 0..100 {
index_writer.add_document(doc!(text_field => "a"));
}
{
let mut prepared_commit = index_writer.prepare_commit().expect("commit failed");
prepared_commit.set_payload("first commit");
assert_eq!(prepared_commit.opstamp(), 100);
prepared_commit.commit().expect("commit failed");
}
{
let metas = index.load_metas().unwrap();
assert_eq!(metas.payload.unwrap(), "first commit");
}
for _doc in 0..100 {
index_writer.add_document(doc!(text_field => "a"));
}
index_writer.commit().unwrap();
{
let metas = index.load_metas().unwrap();
assert!(metas.payload.is_none());
}
}
}
#[test]
fn test_prepare_but_rollback() {
let _ = env_logger::init();
let mut schema_builder = schema::SchemaBuilder::default();
let text_field = schema_builder.add_text_field("text", schema::TEXT);
let index = Index::create_in_ram(schema_builder.build());
{
// writing the segment
let mut index_writer = index.writer_with_num_threads(4, 4 * 30_000_000).unwrap();
// create 8 segments with 100 tiny docs
for _doc in 0..100 {
index_writer.add_document(doc!(text_field => "a"));
}
{
let mut prepared_commit = index_writer.prepare_commit().expect("commit failed");
prepared_commit.set_payload("first commit");
assert_eq!(prepared_commit.opstamp(), 100);
prepared_commit.abort().expect("commit failed");
}
{
let metas = index.load_metas().unwrap();
assert!(metas.payload.is_none());
}
for _doc in 0..100 {
index_writer.add_document(doc!(text_field => "b"));
}
index_writer.commit().unwrap();
}
index.load_searchers().unwrap();
let num_docs_containing = |s: &str| {
let searcher = index.searcher();
let term_a = Term::from_field_text(text_field, s);
searcher.doc_freq(&term_a)
};
assert_eq!(num_docs_containing("a"), 0);
assert_eq!(num_docs_containing("b"), 100);
}
}

View File

@@ -105,6 +105,7 @@ impl IndexMerger {
})
}
#[inline(never)]
fn write_fieldnorms(&self, fast_field_serializer: &mut FastFieldSerializer) -> Result<()> {
let fieldnorm_fastfields: Vec<Field> = self.schema
.fields()
@@ -120,6 +121,7 @@ impl IndexMerger {
)
}
#[inline(never)]
fn write_fast_fields(&self, fast_field_serializer: &mut FastFieldSerializer) -> Result<()> {
let fast_fields: Vec<Field> = self.schema
.fields()
@@ -198,6 +200,7 @@ impl IndexMerger {
Ok(())
}
#[inline(never)]
fn write_postings(&self, serializer: &mut InvertedIndexSerializer) -> Result<()> {
let mut delta_computer = DeltaComputer::new();
@@ -332,19 +335,16 @@ impl IndexMerger {
Ok(())
}
#[inline(never)]
fn write_storable_fields(&self, store_writer: &mut StoreWriter) -> Result<()> {
for reader in &self.readers {
let store_reader = reader.get_store_reader();
if reader.num_deleted_docs() > 0 {
for doc_id in 0..reader.max_doc() {
if !reader.is_deleted(doc_id) {
let doc = store_reader.get(doc_id)?;
let field_values: Vec<&FieldValue> = doc.field_values().iter().collect();
store_writer.store(&field_values)?;
}
for doc_id in 0..reader.max_doc() {
if !reader.is_deleted(doc_id) {
let doc = store_reader.get(doc_id)?;
let field_values: Vec<&FieldValue> = doc.field_values().iter().collect();
store_writer.store(&field_values)?;
}
} else {
store_writer.stack(store_reader)?;
}
}
Ok(())

View File

@@ -13,9 +13,7 @@ mod segment_entry;
mod doc_opstamp_mapping;
pub mod operation;
mod stamper;
mod prepared_commit;
pub use self::prepared_commit::PreparedCommit;
pub use self::segment_entry::{SegmentEntry, SegmentState};
pub use self::segment_serializer::SegmentSerializer;
pub use self::segment_writer::SegmentWriter;
@@ -23,7 +21,6 @@ pub use self::index_writer::IndexWriter;
pub use self::log_merge_policy::LogMergePolicy;
pub use self::merge_policy::{MergeCandidate, MergePolicy, NoMergePolicy};
pub use self::segment_manager::SegmentManager;
pub(crate) use self::directory_lock::DirectoryLock;
/// Alias for the default merge policy, which is the `LogMergePolicy`.
pub type DefaultMergePolicy = LogMergePolicy;

View File

@@ -1,39 +0,0 @@
use Result;
use super::IndexWriter;
/// A prepared commit
pub struct PreparedCommit<'a> {
index_writer: &'a mut IndexWriter,
payload: Option<String>,
opstamp: u64,
}
impl<'a> PreparedCommit<'a> {
pub(crate) fn new(index_writer: &'a mut IndexWriter, opstamp: u64) -> PreparedCommit {
PreparedCommit {
index_writer: index_writer,
payload: None,
opstamp: opstamp,
}
}
pub fn opstamp(&self) -> u64 {
self.opstamp
}
pub fn set_payload(&mut self, payload: &str) {
self.payload = Some(payload.to_string())
}
pub fn abort(self) -> Result<()> {
self.index_writer.rollback()
}
pub fn commit(self) -> Result<u64> {
info!("committing {}", self.opstamp);
self.index_writer
.segment_updater()
.commit(self.opstamp, self.payload)?;
Ok(self.opstamp)
}
}

View File

@@ -56,6 +56,7 @@ impl SegmentSerializer {
}
/// Finalize the segment serialization.
#[inline(never)]
pub fn close(self) -> Result<()> {
self.fast_field_serializer.close()?;
self.postings_serializer.close()?;

View File

@@ -46,7 +46,7 @@ use super::segment_manager::{get_mergeable_segments, SegmentManager};
///
/// This method is not part of tantivy's public API
pub fn save_new_metas(schema: Schema, opstamp: u64, directory: &mut Directory) -> Result<()> {
save_metas(vec![], schema, opstamp, None, directory)
save_metas(vec![], schema, opstamp, directory)
}
/// Save the index meta file.
@@ -62,14 +62,12 @@ pub fn save_metas(
segment_metas: Vec<SegmentMeta>,
schema: Schema,
opstamp: u64,
payload: Option<String>,
directory: &mut Directory,
) -> Result<()> {
let metas = IndexMeta {
segments: segment_metas,
schema,
opstamp,
payload: payload.clone(),
};
let mut buffer = serde_json::to_vec_pretty(&metas)?;
write!(&mut buffer, "\n")?;
@@ -224,7 +222,7 @@ impl SegmentUpdater {
self.0.killed.store(true, Ordering::Release);
}
pub fn is_alive(&self) -> bool {
fn is_alive(&self) -> bool {
!self.0.killed.load(Ordering::Acquire)
}
@@ -241,7 +239,7 @@ impl SegmentUpdater {
Ok(segment_entries)
}
pub fn save_metas(&self, opstamp: u64, commit_message: Option<String>) {
pub fn save_metas(&self, opstamp: u64) {
if self.is_alive() {
let index = &self.0.index;
let directory = index.directory();
@@ -249,7 +247,6 @@ impl SegmentUpdater {
self.0.segment_manager.committed_segment_metas(),
index.schema(),
opstamp,
commit_message,
directory.box_clone().borrow_mut(),
).expect("Could not save metas.");
}
@@ -269,14 +266,14 @@ impl SegmentUpdater {
.garbage_collect(|| self.0.segment_manager.list_files());
}
pub fn commit(&self, opstamp: u64, payload: Option<String>) -> Result<()> {
pub fn commit(&self, opstamp: u64) -> Result<()> {
self.run_async(move |segment_updater| {
if segment_updater.is_alive() {
let segment_entries = segment_updater
.purge_deletes(opstamp)
.expect("Failed purge deletes");
segment_updater.0.segment_manager.commit(segment_entries);
segment_updater.save_metas(opstamp, payload);
segment_updater.save_metas(opstamp);
segment_updater.garbage_collect_files_exec();
segment_updater.consider_merge_options();
}
@@ -385,12 +382,7 @@ impl SegmentUpdater {
let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone();
let mut _file_protection_opt = None;
if let Some(delete_operation) = delete_cursor.get() {
let committed_opstamp = segment_updater
.0
.index
.load_metas()
.expect("Failed to read opstamp")
.opstamp;
let committed_opstamp = segment_updater.0.index.opstamp();
if delete_operation.opstamp < committed_opstamp {
let index = &segment_updater.0.index;
let segment = index.segment(after_merge_segment_entry.meta().clone());
@@ -426,8 +418,7 @@ impl SegmentUpdater {
.end_merge(&before_merge_segment_ids, after_merge_segment_entry);
segment_updater.consider_merge_options();
info!("save metas");
let previous_metas = segment_updater.0.index.load_metas().unwrap();
segment_updater.save_metas(previous_metas.opstamp, previous_metas.payload);
segment_updater.save_metas(segment_updater.0.index.opstamp());
segment_updater.garbage_collect_files_exec();
}).wait()
}

View File

@@ -34,8 +34,6 @@ extern crate log;
#[macro_use]
extern crate error_chain;
extern crate regex;
extern crate tempfile;
extern crate atomicwrites;
extern crate bit_set;
extern crate byteorder;
@@ -47,8 +45,10 @@ extern crate futures;
extern crate futures_cpupool;
extern crate itertools;
extern crate lz4;
extern crate memmap;
extern crate num_cpus;
extern crate owning_ref;
extern crate regex;
extern crate rust_stemmers;
extern crate serde;
extern crate serde_json;
@@ -447,7 +447,7 @@ mod tests {
{
index_writer.delete_term(Term::from_field_text(text_field, "c"));
}
index_writer.rollback().unwrap();
index_writer = index_writer.rollback().unwrap();
index_writer.delete_term(Term::from_field_text(text_field, "a"));
index_writer.commit().unwrap();
}

View File

@@ -373,7 +373,7 @@ mod test {
#[test]
pub fn test_parse_nonindexed_field_yields_error() {
let query_parser = make_query_parser();
let query_parser = make_query_parser();
let is_not_indexed_err = |query: &str| {
let result: Result<Box<Query>, QueryParserError> = query_parser.parse_query(query);

View File

@@ -42,11 +42,6 @@ impl Default for TextOptions {
}
}
/// Configuration defining indexing for a text field.
/// It wraps:
///
/// * record (See [`IndexRecordOption`](./enum.IndexRecordOption.html))
/// * tokenizer
#[derive(Clone, PartialEq, Eq, Debug, Serialize, Deserialize)]
pub struct TextFieldIndexing {
record: IndexRecordOption,

View File

@@ -34,33 +34,22 @@ impl StoreReader {
}
}
pub(crate) fn block_index(&self) -> SkipList<u64> {
SkipList::from(self.offset_index_source.as_slice())
}
fn block_offset(&self, doc_id: DocId) -> (DocId, u64) {
self.block_index()
SkipList::from(self.offset_index_source.as_slice())
.seek(doc_id + 1)
.unwrap_or((0u32, 0u64))
}
pub(crate) fn block_data(&self) -> &[u8] {
self.data.as_slice()
}
fn compressed_block(&self, addr: usize) -> &[u8] {
let total_buffer = self.data.as_slice();
let mut buffer = &total_buffer[addr..];
let block_len = u32::deserialize(&mut buffer).expect("") as usize;
&buffer[..block_len]
}
fn read_block(&self, block_offset: usize) -> io::Result<()> {
if block_offset != *self.current_block_offset.borrow() {
let mut current_block_mut = self.current_block.borrow_mut();
current_block_mut.clear();
let compressed_block = self.compressed_block(block_offset);
let mut lz4_decoder = lz4::Decoder::new(compressed_block)?;
let total_buffer = self.data.as_slice();
let mut cursor = &total_buffer[block_offset..];
let block_length = u32::deserialize(&mut cursor).unwrap();
let block_array: &[u8] = &total_buffer
[(block_offset + 4 as usize)..(block_offset + 4 + block_length as usize)];
let mut lz4_decoder = lz4::Decoder::new(block_array)?;
*self.current_block_offset.borrow_mut() = usize::max_value();
lz4_decoder.read_to_end(&mut current_block_mut).map(|_| ())?;
*self.current_block_offset.borrow_mut() = block_offset;

View File

@@ -3,7 +3,6 @@ use DocId;
use schema::FieldValue;
use common::BinarySerializable;
use std::io::{self, Write};
use super::StoreReader;
use lz4;
use datastruct::SkipListBuilder;
use common::CountingWriter;
@@ -61,35 +60,6 @@ impl StoreWriter {
Ok(())
}
/// Stacks a store reader on top of the documents written so far.
/// This method is an optimization compared to iterating over the documents
/// in the store and adding them one by one, as the store's data will
/// not be decompressed and then recompressed.
pub fn stack(&mut self, store_reader: &StoreReader) -> io::Result<()> {
if !self.current_block.is_empty() {
self.write_and_compress_block()?;
self.offset_index_writer.insert(
self.doc,
&(self.writer.written_bytes() as u64),
)?;
}
let doc_offset = self.doc;
let start_offset = self.writer.written_bytes() as u64;
// just bulk write all of the block of the given reader.
self.writer.write_all(store_reader.block_data())?;
// concatenate the index of the `store_reader`, after translating
// its start doc id and its start file offset.
for (next_doc_id, block_addr) in store_reader.block_index() {
self.doc = doc_offset + next_doc_id;
self.offset_index_writer.insert(
self.doc,
&(start_offset + block_addr))?;
}
Ok(())
}
fn write_and_compress_block(&mut self) -> io::Result<()> {
self.intermediary_buffer.clear();
{

View File

@@ -49,26 +49,19 @@ impl TermDeltaDecoder {
}
}
// code
// first bit represents whether the prefix / suffix len can be encoded
// on the same byte. (the next one)
//
#[inline(always)]
pub fn decode<'a>(&mut self, code: u8, mut cursor: &'a [u8]) -> &'a [u8] {
let (prefix_len, suffix_len): (usize, usize) =
if (code & 1u8) == 1u8 {
let b = cursor[0];
cursor = &cursor[1..];
let prefix_len = (b & 15u8) as usize;
let suffix_len = (b >> 4u8) as usize;
(prefix_len, suffix_len)
} else {
let prefix_len = u32::deserialize(&mut cursor).unwrap();
let suffix_len = u32::deserialize(&mut cursor).unwrap();
(prefix_len as usize, suffix_len as usize)
};
let (prefix_len, suffix_len): (usize, usize) = if (code & 1u8) == 1u8 {
let b = cursor[0];
cursor = &cursor[1..];
let prefix_len = (b & 15u8) as usize;
let suffix_len = (b >> 4u8) as usize;
(prefix_len, suffix_len)
} else {
let prefix_len = u32::deserialize(&mut cursor).unwrap();
let suffix_len = u32::deserialize(&mut cursor).unwrap();
(prefix_len as usize, suffix_len as usize)
};
unsafe { self.term.set_len(prefix_len) };
self.term.extend_from_slice(&(*cursor)[..suffix_len]);
&cursor[suffix_len..]
@@ -82,8 +75,8 @@ impl TermDeltaDecoder {
#[derive(Default)]
pub struct DeltaTermInfo {
pub doc_freq: u32,
pub delta_postings_offset: u64,
pub delta_positions_offset: u64,
pub delta_postings_offset: u32,
pub delta_positions_offset: u32,
pub positions_inner_offset: u8,
}
@@ -107,13 +100,13 @@ impl TermInfoDeltaEncoder {
pub fn encode(&mut self, term_info: TermInfo) -> DeltaTermInfo {
let mut delta_term_info = DeltaTermInfo {
doc_freq: term_info.doc_freq,
delta_postings_offset: term_info.postings_offset - self.term_info.postings_offset,
delta_positions_offset: 0u64,
delta_postings_offset: (term_info.postings_offset - self.term_info.postings_offset) as u32,
delta_positions_offset: 0,
positions_inner_offset: 0,
};
if self.has_positions {
delta_term_info.delta_positions_offset =
term_info.positions_offset - self.term_info.positions_offset;
(term_info.positions_offset - self.term_info.positions_offset) as u32;
delta_term_info.positions_inner_offset = term_info.positions_inner_offset;
}
mem::replace(&mut self.term_info, term_info);
@@ -159,15 +152,15 @@ impl TermInfoDeltaDecoder {
let mut v: u64 = unsafe { *(cursor.as_ptr() as *const u64) };
let doc_freq: u32 = (v as u32) & make_mask(num_bytes_docfreq);
v >>= (num_bytes_docfreq as u64) * 8u64;
let delta_postings_offset: u64 = v & make_mask(num_bytes_postings_offset);
let delta_postings_offset: u32 = (v as u32) & make_mask(num_bytes_postings_offset);
cursor = &cursor[num_bytes_docfreq + num_bytes_postings_offset..];
self.term_info.doc_freq = doc_freq;
self.term_info.postings_offset += delta_postings_offset;
self.term_info.postings_offset += delta_postings_offset as u64;
if self.has_positions {
let num_bytes_positions_offset = ((code >> 5) & 3) as usize + 1;
let delta_positions_offset: u32 =
unsafe { *(cursor.as_ptr() as *const u32) } & make_mask(num_bytes_positions_offset);
self.term_info.positions_offset += delta_positions_offset;
self.term_info.positions_offset += delta_positions_offset as u64;
self.term_info.positions_inner_offset = cursor[num_bytes_positions_offset];
&cursor[num_bytes_positions_offset + 1..]
} else {

View File

@@ -1,10 +1,11 @@
use std::io::{self, Read, Write};
use common::BinarySerializable;
use common::{VInt, BinarySerializable};
mod termdict;
mod streamer;
mod delta_encoder;
pub use self::delta_encoder::{TermDeltaDecoder, TermDeltaEncoder};
pub use self::delta_encoder::{DeltaTermInfo, TermInfoDeltaDecoder, TermInfoDeltaEncoder};
@@ -15,23 +16,23 @@ pub use self::streamer::TermStreamerBuilderImpl;
#[derive(Debug)]
pub struct CheckPoint {
pub stream_offset: u32,
pub postings_offset: u32,
pub positions_offset: u32,
pub stream_offset: u64,
pub postings_offset: u64,
pub positions_offset: u64,
}
impl BinarySerializable for CheckPoint {
fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
self.stream_offset.serialize(writer)?;
self.postings_offset.serialize(writer)?;
self.positions_offset.serialize(writer)?;
VInt(self.stream_offset).serialize(writer)?;
VInt(self.postings_offset).serialize(writer)?;
VInt(self.positions_offset).serialize(writer)?;
Ok(())
}
fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
let stream_offset = u32::deserialize(reader)?;
let postings_offset = u32::deserialize(reader)?;
let positions_offset = u32::deserialize(reader)?;
let stream_offset = VInt::deserialize(reader)?.0;
let postings_offset = VInt::deserialize(reader)?.0;
let positions_offset = VInt::deserialize(reader)?.0;
Ok(CheckPoint {
stream_offset,
postings_offset,

View File

@@ -28,11 +28,10 @@ fn has_positions(field_type: &FieldType) -> bool {
match *field_type {
FieldType::Str(ref text_options) => {
let indexing_options = text_options.get_indexing_options();
if indexing_options.is_position_enabled() {
true
} else {
false
if let Some(text_field_indexing) = indexing_options {
return text_field_indexing.index_option().has_positions()
}
return false;
}
_ => false,
}
@@ -60,10 +59,10 @@ where
W: Write,
{
fn add_index_entry(&mut self) {
let stream_offset = self.write.written_bytes() as u32;
let stream_offset: u64 = self.write.written_bytes() as u64;
let term_info = self.term_info_encoder.term_info();
let postings_offset = term_info.postings_offset as u32;
let positions_offset = term_info.positions_offset as u32;
let postings_offset: u64 = term_info.postings_offset;
let positions_offset: u64 = term_info.positions_offset;
let checkpoint = CheckPoint {
stream_offset,
postings_offset,

View File

@@ -1,10 +1,9 @@
use super::{Token, TokenFilter, TokenStream};
/// `TokenFilter` that removes all tokens that contain non
/// ascii alphanumeric characters.
#[derive(Clone)]
pub struct AlphaNumOnlyFilter;
pub struct AlphaNumOnlyFilterStream<TailTokenStream>
where TailTokenStream: TokenStream
{

View File

@@ -139,6 +139,7 @@ mod token_stream_chain;
mod raw_tokenizer;
mod alphanum_only;
pub use self::alphanum_only::AlphaNumOnlyFilter;
pub use self::tokenizer::{Token, TokenFilter, TokenStream, Tokenizer};
pub use self::tokenizer::BoxedTokenizer;

View File

@@ -11,7 +11,7 @@ pub struct RemoveLongFilter {
}
impl RemoveLongFilter {
/// Creates a `RemoveLongFilter` given a limit in bytes of the UTF-8 representation.
// the limit is in bytes of the UTF-8 representation.
pub fn limit(length_limit: usize) -> RemoveLongFilter {
RemoveLongFilter { length_limit }
}

View File

@@ -2,15 +2,12 @@ use std::sync::Arc;
use super::{Token, TokenFilter, TokenStream};
use rust_stemmers::{self, Algorithm};
/// `Stemmer` token filter. Currently only English is supported.
/// Tokens are expected to be lowercased beforehands.
#[derive(Clone)]
pub struct Stemmer {
stemmer_algorithm: Arc<Algorithm>,
}
impl Stemmer {
/// Creates a new Stemmer `TokenFilter`.
pub fn new() -> Stemmer {
Stemmer {
stemmer_algorithm: Arc::new(Algorithm::English),

View File

@@ -195,10 +195,6 @@ pub trait TokenStream {
/// Returns a mutable reference to the current token.
fn token_mut(&mut self) -> &mut Token;
/// Helper to iterate over tokens. It
/// simply combines a call to `.advance()`
/// and `.token()`.
///
/// ```
/// # extern crate tantivy;
/// # use tantivy::tokenizer::*;
@@ -221,8 +217,6 @@ pub trait TokenStream {
}
}
/// Helper function to consume the entire `TokenStream`
/// and push the tokens to a sink function.
fn process(&mut self, sink: &mut FnMut(&Token)) -> u32 {
let mut num_tokens_pushed = 0u32;
while self.advance() {
@@ -253,10 +247,7 @@ where
}
}
/// Trait for the pluggable components of `Tokenizer`s.
pub trait TokenFilter<TailTokenStream: TokenStream>: Clone {
/// The resulting `TokenStream` type.
type ResultTokenStream: TokenStream;
/// Wraps a token stream and returns the modified one.

View File

@@ -7,6 +7,7 @@ use tokenizer::RawTokenizer;
use tokenizer::SimpleTokenizer;
use tokenizer::JapaneseTokenizer;
use tokenizer::RemoveLongFilter;
use tokenizer::AlphaNumOnlyFilter;
use tokenizer::LowerCaser;
use tokenizer::Stemmer;
@@ -28,7 +29,6 @@ pub struct TokenizerManager {
}
impl TokenizerManager {
/// Registers a new tokenizer associated with a given name.
pub fn register<A>(&self, tokenizer_name: &str, tokenizer: A)
where
A: 'static + Send + Sync + for<'a> Tokenizer<'a>,
@@ -40,7 +40,6 @@ impl TokenizerManager {
.insert(tokenizer_name.to_string(), boxed_tokenizer);
}
/// Accessing a tokenizer given its name.
pub fn get(&self, tokenizer_name: &str) -> Option<Box<BoxedTokenizer>> {
self.tokenizers
.read()
@@ -72,6 +71,7 @@ impl Default for TokenizerManager {
SimpleTokenizer
.filter(RemoveLongFilter::limit(40))
.filter(LowerCaser)
.filter(AlphaNumOnlyFilter)
.filter(Stemmer::new()),
);
manager.register("ja", JapaneseTokenizer.filter(RemoveLongFilter::limit(40)));