mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-08 01:52:54 +00:00
Compare commits
16 Commits
common-cra
...
0.4.3
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4f5ce12a77 | ||
|
|
813efa4ab3 | ||
|
|
c3b6c1dc0b | ||
|
|
6f5e0ef6f4 | ||
|
|
7224f58895 | ||
|
|
49519c3f61 | ||
|
|
cb11b92505 | ||
|
|
7b2dcfbd91 | ||
|
|
d2e30e6681 | ||
|
|
ef109927b3 | ||
|
|
44e5c4dfd3 | ||
|
|
6f223253ea | ||
|
|
f7b0392bd5 | ||
|
|
442bc9a1b8 | ||
|
|
db7d784573 | ||
|
|
74d32e522a |
17
.travis.yml
17
.travis.yml
@@ -1,4 +1,5 @@
|
|||||||
language: rust
|
language: rust
|
||||||
|
cache: cargo
|
||||||
rust:
|
rust:
|
||||||
- nightly
|
- nightly
|
||||||
env:
|
env:
|
||||||
@@ -11,6 +12,7 @@ addons:
|
|||||||
apt:
|
apt:
|
||||||
sources:
|
sources:
|
||||||
- ubuntu-toolchain-r-test
|
- ubuntu-toolchain-r-test
|
||||||
|
- kalakris-cmake
|
||||||
packages:
|
packages:
|
||||||
- gcc-4.8
|
- gcc-4.8
|
||||||
- g++-4.8
|
- g++-4.8
|
||||||
@@ -18,18 +20,15 @@ addons:
|
|||||||
- libelf-dev
|
- libelf-dev
|
||||||
- libdw-dev
|
- libdw-dev
|
||||||
- binutils-dev
|
- binutils-dev
|
||||||
|
- cmake
|
||||||
before_script:
|
before_script:
|
||||||
- |
|
- |
|
||||||
pip install 'travis-cargo<0.2' --user &&
|
cargo install cargo-travis || echo "cargo-travis already installed"
|
||||||
export PATH=$HOME/.local/bin:$PATH
|
export PATH=$HOME/.cargo/bin:$PATH
|
||||||
script:
|
script:
|
||||||
- |
|
- cargo build
|
||||||
travis-cargo build &&
|
- cargo test
|
||||||
travis-cargo test &&
|
|
||||||
travis-cargo bench
|
|
||||||
- cargo run --example simple_search
|
- cargo run --example simple_search
|
||||||
after_success:
|
after_success:
|
||||||
- bash ./script/build-doc.sh
|
- cargo coveralls --exclude-pattern cpp/
|
||||||
- travis-cargo doc-upload
|
- travis-cargo doc-upload
|
||||||
- if [[ "$TRAVIS_OS_NAME" == "linux" ]]; then travis-cargo coveralls --no-sudo --verify; fi
|
|
||||||
- if [[ "$TRAVIS_OS_NAME" == "linux" ]]; then ./kcov/build/src/kcov --verify --coveralls-id=$TRAVIS_JOB_ID --include-path=`pwd`/src --exclude-path=`pwd`/cpp --exclude-pattern=/.cargo target/kcov target/debug/tantivy-*; fi
|
|
||||||
|
|||||||
@@ -14,11 +14,10 @@ keywords = ["search", "information", "retrieval"]
|
|||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
byteorder = "1.0"
|
byteorder = "1.0"
|
||||||
memmap = "0.4"
|
|
||||||
lazy_static = "0.2.1"
|
lazy_static = "0.2.1"
|
||||||
tinysegmenter = "0.1.0"
|
tinysegmenter = "0.1.0"
|
||||||
regex = "0.2"
|
regex = "0.2"
|
||||||
fst = "0.1.37"
|
fst = "0.2"
|
||||||
atomicwrites = "0.1.3"
|
atomicwrites = "0.1.3"
|
||||||
tempfile = "2.1"
|
tempfile = "2.1"
|
||||||
log = "0.3.6"
|
log = "0.3.6"
|
||||||
|
|||||||
@@ -20,10 +20,7 @@ fn main() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
fn run_example(index_path: &Path) -> tantivy::Result<()> {
|
fn run_example(index_path: &Path) -> tantivy::Result<()> {
|
||||||
|
|
||||||
|
|
||||||
// # Defining the schema
|
// # Defining the schema
|
||||||
//
|
//
|
||||||
// The Tantivy index requires a very strict schema.
|
// The Tantivy index requires a very strict schema.
|
||||||
@@ -31,7 +28,6 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> {
|
|||||||
// and for each field, its type and "the way it should
|
// and for each field, its type and "the way it should
|
||||||
// be indexed".
|
// be indexed".
|
||||||
|
|
||||||
|
|
||||||
// first we need to define a schema ...
|
// first we need to define a schema ...
|
||||||
let mut schema_builder = SchemaBuilder::default();
|
let mut schema_builder = SchemaBuilder::default();
|
||||||
|
|
||||||
@@ -62,8 +58,6 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> {
|
|||||||
|
|
||||||
let schema = schema_builder.build();
|
let schema = schema_builder.build();
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// # Indexing documents
|
// # Indexing documents
|
||||||
//
|
//
|
||||||
// Let's create a brand new index.
|
// Let's create a brand new index.
|
||||||
@@ -72,7 +66,6 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> {
|
|||||||
// with our schema in the directory.
|
// with our schema in the directory.
|
||||||
let index = Index::create(index_path, schema.clone())?;
|
let index = Index::create(index_path, schema.clone())?;
|
||||||
|
|
||||||
|
|
||||||
// To insert document we need an index writer.
|
// To insert document we need an index writer.
|
||||||
// There must be only one writer at a time.
|
// There must be only one writer at a time.
|
||||||
// This single `IndexWriter` is already
|
// This single `IndexWriter` is already
|
||||||
@@ -85,7 +78,6 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> {
|
|||||||
// Let's index our documents!
|
// Let's index our documents!
|
||||||
// We first need a handle on the title and the body field.
|
// We first need a handle on the title and the body field.
|
||||||
|
|
||||||
|
|
||||||
// ### Create a document "manually".
|
// ### Create a document "manually".
|
||||||
//
|
//
|
||||||
// We can create a document manually, by setting the fields
|
// We can create a document manually, by setting the fields
|
||||||
@@ -98,7 +90,7 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> {
|
|||||||
old_man_doc.add_text(
|
old_man_doc.add_text(
|
||||||
body,
|
body,
|
||||||
"He was an old man who fished alone in a skiff in the Gulf Stream and \
|
"He was an old man who fished alone in a skiff in the Gulf Stream and \
|
||||||
he had gone eighty-four days now without taking a fish.",
|
he had gone eighty-four days now without taking a fish.",
|
||||||
);
|
);
|
||||||
|
|
||||||
// ... and add it to the `IndexWriter`.
|
// ... and add it to the `IndexWriter`.
|
||||||
@@ -145,7 +137,6 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> {
|
|||||||
// Indexing 5 million articles of the English wikipedia takes
|
// Indexing 5 million articles of the English wikipedia takes
|
||||||
// around 4 minutes on my computer!
|
// around 4 minutes on my computer!
|
||||||
|
|
||||||
|
|
||||||
// ### Committing
|
// ### Committing
|
||||||
//
|
//
|
||||||
// At this point our documents are not searchable.
|
// At this point our documents are not searchable.
|
||||||
@@ -167,7 +158,6 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> {
|
|||||||
// tantivy behaves as if has rolled back to its last
|
// tantivy behaves as if has rolled back to its last
|
||||||
// commit.
|
// commit.
|
||||||
|
|
||||||
|
|
||||||
// # Searching
|
// # Searching
|
||||||
//
|
//
|
||||||
// Let's search our index. Start by reloading
|
// Let's search our index. Start by reloading
|
||||||
@@ -192,7 +182,6 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> {
|
|||||||
// A ticket has been opened regarding this problem.
|
// A ticket has been opened regarding this problem.
|
||||||
let query = query_parser.parse_query("sea whale")?;
|
let query = query_parser.parse_query("sea whale")?;
|
||||||
|
|
||||||
|
|
||||||
// A query defines a set of documents, as
|
// A query defines a set of documents, as
|
||||||
// well as the way they should be scored.
|
// well as the way they should be scored.
|
||||||
//
|
//
|
||||||
|
|||||||
@@ -133,8 +133,7 @@ where
|
|||||||
addr + 8 <= data.len(),
|
addr + 8 <= data.len(),
|
||||||
"The fast field field should have been padded with 7 bytes."
|
"The fast field field should have been padded with 7 bytes."
|
||||||
);
|
);
|
||||||
let val_unshifted_unmasked: u64 =
|
let val_unshifted_unmasked: u64 = unsafe { *(data[addr..].as_ptr() as *const u64) };
|
||||||
unsafe { *(data[addr..].as_ptr() as *const u64) };
|
|
||||||
let val_shifted = (val_unshifted_unmasked >> bit_shift) as u64;
|
let val_shifted = (val_unshifted_unmasked >> bit_shift) as u64;
|
||||||
(val_shifted & mask)
|
(val_shifted & mask)
|
||||||
} else {
|
} else {
|
||||||
@@ -165,8 +164,7 @@ where
|
|||||||
for output_val in output.iter_mut() {
|
for output_val in output.iter_mut() {
|
||||||
let addr = addr_in_bits >> 3;
|
let addr = addr_in_bits >> 3;
|
||||||
let bit_shift = addr_in_bits & 7;
|
let bit_shift = addr_in_bits & 7;
|
||||||
let val_unshifted_unmasked: u64 =
|
let val_unshifted_unmasked: u64 = unsafe { *(data[addr..].as_ptr() as *const u64) };
|
||||||
unsafe { *(data[addr..].as_ptr() as *const u64) };
|
|
||||||
let val_shifted = (val_unshifted_unmasked >> bit_shift) as u64;
|
let val_shifted = (val_unshifted_unmasked >> bit_shift) as u64;
|
||||||
*output_val = val_shifted & mask;
|
*output_val = val_shifted & mask;
|
||||||
addr_in_bits += num_bits;
|
addr_in_bits += num_bits;
|
||||||
|
|||||||
@@ -25,9 +25,7 @@ fn compress_sorted(vals: &[u32], output: &mut [u8], offset: u32) -> usize {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn uncompress_sorted(compressed_data: &[u8], output: &mut [u32], offset: u32) -> usize {
|
fn uncompress_sorted(compressed_data: &[u8], output: &mut [u32], offset: u32) -> usize {
|
||||||
unsafe {
|
unsafe { simdcomp::uncompress_sorted(compressed_data.as_ptr(), output.as_mut_ptr(), offset) }
|
||||||
simdcomp::uncompress_sorted(compressed_data.as_ptr(), output.as_mut_ptr(), offset)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn compress_unsorted(vals: &[u32], output: &mut [u8]) -> usize {
|
fn compress_unsorted(vals: &[u32], output: &mut [u8]) -> usize {
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ use core::SegmentMeta;
|
|||||||
use super::pool::LeasedItem;
|
use super::pool::LeasedItem;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use core::IndexMeta;
|
use core::IndexMeta;
|
||||||
|
use indexer::DirectoryLock;
|
||||||
use IndexWriter;
|
use IndexWriter;
|
||||||
use directory::ManagedDirectory;
|
use directory::ManagedDirectory;
|
||||||
use core::META_FILEPATH;
|
use core::META_FILEPATH;
|
||||||
@@ -113,12 +114,9 @@ impl Index {
|
|||||||
Index::create_from_metas(directory, &metas)
|
Index::create_from_metas(directory, &metas)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the index opstamp.
|
/// Reads the index meta file from the directory.
|
||||||
///
|
pub fn load_metas(&self) -> Result<IndexMeta> {
|
||||||
/// The opstamp is the number of documents that have been added
|
load_metas(self.directory())
|
||||||
/// from the beginning of time, and until the moment of the last commit.
|
|
||||||
pub fn opstamp(&self) -> u64 {
|
|
||||||
load_metas(self.directory()).unwrap().opstamp
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Open a new index writer. Attempts to acquire a lockfile.
|
/// Open a new index writer. Attempts to acquire a lockfile.
|
||||||
@@ -141,7 +139,8 @@ impl Index {
|
|||||||
num_threads: usize,
|
num_threads: usize,
|
||||||
heap_size_in_bytes: usize,
|
heap_size_in_bytes: usize,
|
||||||
) -> Result<IndexWriter> {
|
) -> Result<IndexWriter> {
|
||||||
open_index_writer(self, num_threads, heap_size_in_bytes)
|
let directory_lock = DirectoryLock::lock(self.directory().box_clone())?;
|
||||||
|
open_index_writer(self, num_threads, heap_size_in_bytes, directory_lock)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Creates a multithreaded writer
|
/// Creates a multithreaded writer
|
||||||
@@ -194,7 +193,7 @@ impl Index {
|
|||||||
/// Reads the meta.json and returns the list of
|
/// Reads the meta.json and returns the list of
|
||||||
/// `SegmentMeta` from the last commit.
|
/// `SegmentMeta` from the last commit.
|
||||||
pub fn searchable_segment_metas(&self) -> Result<Vec<SegmentMeta>> {
|
pub fn searchable_segment_metas(&self) -> Result<Vec<SegmentMeta>> {
|
||||||
Ok(load_metas(self.directory())?.segments)
|
Ok(self.load_metas()?.segments)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the list of segment ids that are searchable.
|
/// Returns the list of segment ids that are searchable.
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ pub struct IndexMeta {
|
|||||||
pub segments: Vec<SegmentMeta>,
|
pub segments: Vec<SegmentMeta>,
|
||||||
pub schema: Schema,
|
pub schema: Schema,
|
||||||
pub opstamp: u64,
|
pub opstamp: u64,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")] pub payload: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl IndexMeta {
|
impl IndexMeta {
|
||||||
@@ -22,6 +23,32 @@ impl IndexMeta {
|
|||||||
segments: vec![],
|
segments: vec![],
|
||||||
schema,
|
schema,
|
||||||
opstamp: 0u64,
|
opstamp: 0u64,
|
||||||
|
payload: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
|
||||||
|
use serde_json;
|
||||||
|
use super::IndexMeta;
|
||||||
|
use schema::{SchemaBuilder, TEXT};
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_serialize_metas() {
|
||||||
|
let schema = {
|
||||||
|
let mut schema_builder = SchemaBuilder::new();
|
||||||
|
schema_builder.add_text_field("text", TEXT);
|
||||||
|
schema_builder.build()
|
||||||
|
};
|
||||||
|
let index_metas = IndexMeta {
|
||||||
|
segments: Vec::new(),
|
||||||
|
schema: schema,
|
||||||
|
opstamp: 0u64,
|
||||||
|
payload: None,
|
||||||
|
};
|
||||||
|
let json = serde_json::ser::to_string(&index_metas).expect("serialization failed");
|
||||||
|
assert_eq!(json, r#"{"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","tokenizer":"default"},"stored":false}}],"opstamp":0}"#);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -58,10 +58,8 @@ mod murmurhash2 {
|
|||||||
///
|
///
|
||||||
/// Returns (the heap size in bytes, the hash table size in number of bits)
|
/// Returns (the heap size in bytes, the hash table size in number of bits)
|
||||||
pub(crate) fn split_memory(per_thread_memory_budget: usize) -> (usize, usize) {
|
pub(crate) fn split_memory(per_thread_memory_budget: usize) -> (usize, usize) {
|
||||||
let table_size_limit: usize = per_thread_memory_budget / 5;
|
let table_size_limit: usize = per_thread_memory_budget / 3;
|
||||||
let compute_table_size = |num_bits: usize| {
|
let compute_table_size = |num_bits: usize| (1 << num_bits) * mem::size_of::<KeyValue>();
|
||||||
(1 << num_bits) * mem::size_of::<KeyValue>()
|
|
||||||
};
|
|
||||||
let table_num_bits: usize = (1..)
|
let table_num_bits: usize = (1..)
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.take_while(|num_bits: &usize| compute_table_size(*num_bits) < table_size_limit)
|
.take_while(|num_bits: &usize| compute_table_size(*num_bits) < table_size_limit)
|
||||||
@@ -219,9 +217,9 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_hashmap_size() {
|
fn test_hashmap_size() {
|
||||||
assert_eq!(split_memory(100_000), (67232, 9));
|
assert_eq!(split_memory(100_000), (67232, 12));
|
||||||
assert_eq!(split_memory(1_000_000), (737856, 12));
|
assert_eq!(split_memory(1_000_000), (737856, 15));
|
||||||
assert_eq!(split_memory(10_000_000), (7902848, 15));
|
assert_eq!(split_memory(10_000_000), (7902848, 18));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|||||||
@@ -6,7 +6,6 @@ use directory::ReadOnlySource;
|
|||||||
use directory::shared_vec_slice::SharedVecSlice;
|
use directory::shared_vec_slice::SharedVecSlice;
|
||||||
use directory::WritePtr;
|
use directory::WritePtr;
|
||||||
use fst::raw::MmapReadOnly;
|
use fst::raw::MmapReadOnly;
|
||||||
use memmap::{Mmap, Protection};
|
|
||||||
use std::collections::hash_map::Entry as HashMapEntry;
|
use std::collections::hash_map::Entry as HashMapEntry;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::convert::From;
|
use std::convert::From;
|
||||||
@@ -15,35 +14,39 @@ use std::fs::{self, File};
|
|||||||
use std::fs::OpenOptions;
|
use std::fs::OpenOptions;
|
||||||
use std::io::{self, Seek, SeekFrom};
|
use std::io::{self, Seek, SeekFrom};
|
||||||
use std::io::{BufWriter, Read, Write};
|
use std::io::{BufWriter, Read, Write};
|
||||||
use std::mem;
|
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::result;
|
use std::result;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::sync::RwLock;
|
use std::sync::RwLock;
|
||||||
use std::sync::Weak;
|
|
||||||
use tempdir::TempDir;
|
use tempdir::TempDir;
|
||||||
|
|
||||||
fn open_mmap(full_path: &Path) -> result::Result<Option<Arc<Mmap>>, OpenReadError> {
|
|
||||||
let file = File::open(&full_path).map_err(|e| {
|
/// Returns None iff the file exists, can be read, but is empty (and hence
|
||||||
if e.kind() == io::ErrorKind::NotFound {
|
/// cannot be mmapped).
|
||||||
OpenReadError::FileDoesNotExist(full_path.to_owned())
|
///
|
||||||
} else {
|
fn open_mmap(full_path: &PathBuf) -> result::Result<Option<MmapReadOnly>, OpenReadError> {
|
||||||
OpenReadError::IOError(IOError::with_path(full_path.to_owned(), e))
|
let file = File::open(&full_path)
|
||||||
}
|
.map_err(|e| {
|
||||||
})?;
|
if e.kind() == io::ErrorKind::NotFound {
|
||||||
|
OpenReadError::FileDoesNotExist(full_path.clone())
|
||||||
|
} else {
|
||||||
|
OpenReadError::IOError(IOError::with_path(full_path.to_owned(), e))
|
||||||
|
}
|
||||||
|
})?;
|
||||||
|
|
||||||
let meta_data = file.metadata()
|
let meta_data = file.metadata()
|
||||||
.map_err(|e| IOError::with_path(full_path.to_owned(), e))?;
|
.map_err(|e| IOError::with_path(full_path.to_owned(), e))?;
|
||||||
if meta_data.len() == 0 {
|
if meta_data.len() == 0 {
|
||||||
// if the file size is 0, it will not be possible
|
// if the file size is 0, it will not be possible
|
||||||
// to mmap the file, so we return an anonymous mmap_cache
|
// to mmap the file, so we return None
|
||||||
// instead.
|
// instead.
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
match Mmap::open(&file, Protection::Read) {
|
MmapReadOnly::open(&file)
|
||||||
Ok(mmap) => Ok(Some(Arc::new(mmap))),
|
.map(Some)
|
||||||
Err(e) => Err(IOError::with_path(full_path.to_owned(), e))?,
|
.map_err(|e| {
|
||||||
}
|
From::from(IOError::with_path(full_path.to_owned(), e))
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default, Clone, Debug, Serialize, Deserialize)]
|
#[derive(Default, Clone, Debug, Serialize, Deserialize)]
|
||||||
@@ -52,10 +55,7 @@ pub struct CacheCounters {
|
|||||||
pub hit: usize,
|
pub hit: usize,
|
||||||
// Number of time tantivy had to call `mmap`
|
// Number of time tantivy had to call `mmap`
|
||||||
// as no entry was in the cache.
|
// as no entry was in the cache.
|
||||||
pub miss_empty: usize,
|
pub miss: usize,
|
||||||
// Number of time tantivy had to call `mmap`
|
|
||||||
// as the entry in the cache was evinced.
|
|
||||||
pub miss_weak: usize,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
@@ -66,38 +66,26 @@ pub struct CacheInfo {
|
|||||||
|
|
||||||
struct MmapCache {
|
struct MmapCache {
|
||||||
counters: CacheCounters,
|
counters: CacheCounters,
|
||||||
cache: HashMap<PathBuf, Weak<Mmap>>,
|
cache: HashMap<PathBuf, MmapReadOnly>,
|
||||||
purge_weak_limit: usize,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const STARTING_PURGE_WEAK_LIMIT: usize = 1_000;
|
|
||||||
|
|
||||||
impl Default for MmapCache {
|
impl Default for MmapCache {
|
||||||
fn default() -> MmapCache {
|
fn default() -> MmapCache {
|
||||||
MmapCache {
|
MmapCache {
|
||||||
counters: CacheCounters::default(),
|
counters: CacheCounters::default(),
|
||||||
cache: HashMap::new(),
|
cache: HashMap::new(),
|
||||||
purge_weak_limit: STARTING_PURGE_WEAK_LIMIT,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MmapCache {
|
impl MmapCache {
|
||||||
fn cleanup(&mut self) {
|
|
||||||
let previous_cache_size = self.cache.len();
|
/// Removes a `MmapReadOnly` entry from the mmap cache.
|
||||||
let mut new_cache = HashMap::new();
|
fn discard_from_cache(&mut self, full_path: &Path) -> bool {
|
||||||
mem::swap(&mut new_cache, &mut self.cache);
|
self.cache.remove(full_path).is_some()
|
||||||
self.cache = new_cache
|
|
||||||
.into_iter()
|
|
||||||
.filter(|&(_, ref weak_ref)| weak_ref.upgrade().is_some())
|
|
||||||
.collect();
|
|
||||||
if self.cache.len() == previous_cache_size {
|
|
||||||
self.purge_weak_limit *= 2;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_info(&mut self) -> CacheInfo {
|
fn get_info(&mut self) -> CacheInfo {
|
||||||
self.cleanup();
|
|
||||||
let paths: Vec<PathBuf> = self.cache.keys().cloned().collect();
|
let paths: Vec<PathBuf> = self.cache.keys().cloned().collect();
|
||||||
CacheInfo {
|
CacheInfo {
|
||||||
counters: self.counters.clone(),
|
counters: self.counters.clone(),
|
||||||
@@ -105,38 +93,23 @@ impl MmapCache {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_mmap(&mut self, full_path: &PathBuf) -> Result<Option<Arc<Mmap>>, OpenReadError> {
|
fn get_mmap(&mut self, full_path: PathBuf) -> Result<Option<MmapReadOnly>, OpenReadError> {
|
||||||
// if we exceed this limit, then we go through the weak
|
|
||||||
// and remove those that are obsolete.
|
|
||||||
if self.cache.len() > self.purge_weak_limit {
|
|
||||||
self.cleanup();
|
|
||||||
}
|
|
||||||
Ok(match self.cache.entry(full_path.clone()) {
|
Ok(match self.cache.entry(full_path.clone()) {
|
||||||
HashMapEntry::Occupied(mut occupied_entry) => {
|
HashMapEntry::Occupied(occupied_entry) => {
|
||||||
if let Some(mmap_arc) = occupied_entry.get().upgrade() {
|
let mmap = occupied_entry.get();
|
||||||
self.counters.hit += 1;
|
self.counters.hit += 1;
|
||||||
Some(Arc::clone(&mmap_arc))
|
Some(mmap.clone())
|
||||||
} else {
|
}
|
||||||
// The entry exists but the weak ref has been destroyed.
|
HashMapEntry::Vacant(vacant_entry) => {
|
||||||
self.counters.miss_weak += 1;
|
self.counters.miss += 1;
|
||||||
if let Some(mmap_arc) = open_mmap(full_path)? {
|
if let Some(mmap) = open_mmap(&full_path)? {
|
||||||
occupied_entry.insert(Arc::downgrade(&mmap_arc));
|
vacant_entry.insert(mmap.clone());
|
||||||
Some(mmap_arc)
|
Some(mmap)
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
})
|
||||||
HashMapEntry::Vacant(vacant_entry) => {
|
|
||||||
self.counters.miss_empty += 1;
|
|
||||||
if let Some(mmap_arc) = open_mmap(full_path)? {
|
|
||||||
vacant_entry.insert(Arc::downgrade(&mmap_arc));
|
|
||||||
Some(mmap_arc)
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -228,6 +201,7 @@ impl MmapDirectory {
|
|||||||
fd.sync_all()?;
|
fd.sync_all()?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns some statistical information
|
/// Returns some statistical information
|
||||||
/// about the Mmap cache.
|
/// about the Mmap cache.
|
||||||
///
|
///
|
||||||
@@ -283,10 +257,9 @@ impl Directory for MmapDirectory {
|
|||||||
})?;
|
})?;
|
||||||
|
|
||||||
Ok(mmap_cache
|
Ok(mmap_cache
|
||||||
.get_mmap(&full_path)?
|
.get_mmap(full_path)?
|
||||||
.map(MmapReadOnly::from)
|
.map(ReadOnlySource::Mmap)
|
||||||
.map(ReadOnlySource::Mmap)
|
.unwrap_or_else(|| ReadOnlySource::Anonymous(SharedVecSlice::empty())))
|
||||||
.unwrap_or_else(|| ReadOnlySource::Anonymous(SharedVecSlice::empty())))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn open_write(&mut self, path: &Path) -> Result<WritePtr, OpenWriteError> {
|
fn open_write(&mut self, path: &Path) -> Result<WritePtr, OpenWriteError> {
|
||||||
@@ -319,17 +292,22 @@ impl Directory for MmapDirectory {
|
|||||||
Ok(BufWriter::new(Box::new(writer)))
|
Ok(BufWriter::new(Box::new(writer)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/// Any entry associated to the path in the mmap will be
|
||||||
|
/// removed before the file is deleted.
|
||||||
fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
|
fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
|
||||||
debug!("Deleting file {:?}", path);
|
debug!("Deleting file {:?}", path);
|
||||||
let full_path = self.resolve_path(path);
|
let full_path = self.resolve_path(path);
|
||||||
let mut mmap_cache = self.mmap_cache.write().map_err(|_| {
|
let mut mmap_cache = self.mmap_cache
|
||||||
let msg = format!(
|
.write()
|
||||||
"Failed to acquired write lock \
|
.map_err(|_| {
|
||||||
on mmap cache while deleting {:?}",
|
let msg = format!("Failed to acquired write lock \
|
||||||
path
|
on mmap cache while deleting {:?}",
|
||||||
);
|
path);
|
||||||
IOError::with_path(path.to_owned(), make_io_err(msg))
|
IOError::with_path(path.to_owned(), make_io_err(msg))
|
||||||
})?;
|
})?;
|
||||||
|
mmap_cache.discard_from_cache(path);
|
||||||
|
|
||||||
// Removing the entry in the MMap cache.
|
// Removing the entry in the MMap cache.
|
||||||
// The munmap will appear on Drop,
|
// The munmap will appear on Drop,
|
||||||
// when the last reference is gone.
|
// when the last reference is gone.
|
||||||
@@ -415,7 +393,8 @@ mod tests {
|
|||||||
// here we test if the cache releases
|
// here we test if the cache releases
|
||||||
// mmaps correctly.
|
// mmaps correctly.
|
||||||
let mut mmap_directory = MmapDirectory::create_from_tempdir().unwrap();
|
let mut mmap_directory = MmapDirectory::create_from_tempdir().unwrap();
|
||||||
let paths: Vec<PathBuf> = (0..10)
|
let num_paths = 10;
|
||||||
|
let paths: Vec<PathBuf> = (0..num_paths)
|
||||||
.map(|i| PathBuf::from(&*format!("file_{}", i)))
|
.map(|i| PathBuf::from(&*format!("file_{}", i)))
|
||||||
.collect();
|
.collect();
|
||||||
{
|
{
|
||||||
@@ -426,49 +405,22 @@ mod tests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
for path in &paths {
|
|
||||||
{
|
|
||||||
let _r = mmap_directory.open_read(path).unwrap();
|
|
||||||
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 1);
|
|
||||||
}
|
|
||||||
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
assert_eq!(mmap_directory.get_cache_info().counters.miss_empty, 10);
|
|
||||||
|
|
||||||
{
|
|
||||||
// test weak miss
|
|
||||||
// the first pass create the weak refs.
|
|
||||||
for path in &paths {
|
|
||||||
let _r = mmap_directory.open_read(path).unwrap();
|
|
||||||
}
|
|
||||||
// ... the second hits the weak refs.
|
|
||||||
for path in &paths {
|
|
||||||
let _r = mmap_directory.open_read(path).unwrap();
|
|
||||||
}
|
|
||||||
let cache_info = mmap_directory.get_cache_info();
|
|
||||||
assert_eq!(cache_info.counters.miss_empty, 20);
|
|
||||||
assert_eq!(cache_info.counters.miss_weak, 10);
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
|
||||||
let mut saved_readmmaps = vec![];
|
|
||||||
// Keeps reference alive
|
|
||||||
for (i, path) in paths.iter().enumerate() {
|
for (i, path) in paths.iter().enumerate() {
|
||||||
let r = mmap_directory.open_read(path).unwrap();
|
let _r = mmap_directory.open_read(path).unwrap();
|
||||||
saved_readmmaps.push(r);
|
|
||||||
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), i + 1);
|
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), i + 1);
|
||||||
}
|
}
|
||||||
let cache_info = mmap_directory.get_cache_info();
|
for path in paths.iter() {
|
||||||
assert_eq!(cache_info.counters.miss_empty, 30);
|
let _r = mmap_directory.open_read(path).unwrap();
|
||||||
assert_eq!(cache_info.counters.miss_weak, 10);
|
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), num_paths);
|
||||||
assert_eq!(cache_info.mmapped.len(), 10);
|
}
|
||||||
|
for (i, path) in paths.iter().enumerate() {
|
||||||
for saved_readmmap in saved_readmmaps {
|
println!("delete paths {:?}", path);
|
||||||
assert_eq!(saved_readmmap.as_slice(), content);
|
mmap_directory.delete(path).unwrap();
|
||||||
|
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), num_paths - i - 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
assert_eq!(mmap_directory.get_cache_info().counters.hit, 10);
|
||||||
|
assert_eq!(mmap_directory.get_cache_info().counters.miss, 10);
|
||||||
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 0);
|
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -10,7 +10,6 @@ use indexer::stamper::Stamper;
|
|||||||
use datastruct::stacker::Heap;
|
use datastruct::stacker::Heap;
|
||||||
use directory::FileProtection;
|
use directory::FileProtection;
|
||||||
use error::{Error, ErrorKind, Result, ResultExt};
|
use error::{Error, ErrorKind, Result, ResultExt};
|
||||||
use Directory;
|
|
||||||
use fastfield::write_delete_bitset;
|
use fastfield::write_delete_bitset;
|
||||||
use indexer::delete_queue::{DeleteCursor, DeleteQueue};
|
use indexer::delete_queue::{DeleteCursor, DeleteQueue};
|
||||||
use futures::Canceled;
|
use futures::Canceled;
|
||||||
@@ -29,9 +28,10 @@ use schema::Term;
|
|||||||
use std::mem;
|
use std::mem;
|
||||||
use std::mem::swap;
|
use std::mem::swap;
|
||||||
use std::thread::JoinHandle;
|
use std::thread::JoinHandle;
|
||||||
use super::directory_lock::DirectoryLock;
|
use indexer::DirectoryLock;
|
||||||
use super::operation::AddOperation;
|
use super::operation::AddOperation;
|
||||||
use super::segment_updater::SegmentUpdater;
|
use super::segment_updater::SegmentUpdater;
|
||||||
|
use super::PreparedCommit;
|
||||||
use std::thread;
|
use std::thread;
|
||||||
|
|
||||||
// Size of the margin for the heap. A segment is closed when the remaining memory
|
// Size of the margin for the heap. A segment is closed when the remaining memory
|
||||||
@@ -57,7 +57,7 @@ type DocumentReceiver = chan::Receiver<AddOperation>;
|
|||||||
pub struct IndexWriter {
|
pub struct IndexWriter {
|
||||||
// the lock is just used to bind the
|
// the lock is just used to bind the
|
||||||
// lifetime of the lock with that of the IndexWriter.
|
// lifetime of the lock with that of the IndexWriter.
|
||||||
_directory_lock: DirectoryLock,
|
_directory_lock: Option<DirectoryLock>,
|
||||||
|
|
||||||
index: Index,
|
index: Index,
|
||||||
|
|
||||||
@@ -104,6 +104,7 @@ pub fn open_index_writer(
|
|||||||
index: &Index,
|
index: &Index,
|
||||||
num_threads: usize,
|
num_threads: usize,
|
||||||
heap_size_in_bytes_per_thread: usize,
|
heap_size_in_bytes_per_thread: usize,
|
||||||
|
directory_lock: DirectoryLock,
|
||||||
) -> Result<IndexWriter> {
|
) -> Result<IndexWriter> {
|
||||||
if heap_size_in_bytes_per_thread <= HEAP_SIZE_LIMIT as usize {
|
if heap_size_in_bytes_per_thread <= HEAP_SIZE_LIMIT as usize {
|
||||||
panic!(format!(
|
panic!(format!(
|
||||||
@@ -111,15 +112,12 @@ pub fn open_index_writer(
|
|||||||
HEAP_SIZE_LIMIT
|
HEAP_SIZE_LIMIT
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
let directory_lock = DirectoryLock::lock(index.directory().box_clone())?;
|
|
||||||
|
|
||||||
let (document_sender, document_receiver): (DocumentSender, DocumentReceiver) =
|
let (document_sender, document_receiver): (DocumentSender, DocumentReceiver) =
|
||||||
chan::sync(PIPELINE_MAX_SIZE_IN_DOCS);
|
chan::sync(PIPELINE_MAX_SIZE_IN_DOCS);
|
||||||
|
|
||||||
let delete_queue = DeleteQueue::new();
|
let delete_queue = DeleteQueue::new();
|
||||||
|
|
||||||
let current_opstamp = index.opstamp();
|
let current_opstamp = index.load_metas()?.opstamp;
|
||||||
|
|
||||||
let stamper = Stamper::new(current_opstamp);
|
let stamper = Stamper::new(current_opstamp);
|
||||||
|
|
||||||
@@ -127,7 +125,7 @@ pub fn open_index_writer(
|
|||||||
SegmentUpdater::new(index.clone(), stamper.clone(), &delete_queue.cursor())?;
|
SegmentUpdater::new(index.clone(), stamper.clone(), &delete_queue.cursor())?;
|
||||||
|
|
||||||
let mut index_writer = IndexWriter {
|
let mut index_writer = IndexWriter {
|
||||||
_directory_lock: directory_lock,
|
_directory_lock: Some(directory_lock),
|
||||||
|
|
||||||
heap_size_in_bytes_per_thread,
|
heap_size_in_bytes_per_thread,
|
||||||
index: index.clone(),
|
index: index.clone(),
|
||||||
@@ -286,6 +284,11 @@ fn index_documents(
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !segment_updater.is_alive() {
|
||||||
|
return Ok(false);
|
||||||
|
}
|
||||||
|
|
||||||
let num_docs = segment_writer.max_doc();
|
let num_docs = segment_writer.max_doc();
|
||||||
|
|
||||||
// this is ensured by the call to peek before starting
|
// this is ensured by the call to peek before starting
|
||||||
@@ -476,41 +479,66 @@ impl IndexWriter {
|
|||||||
/// state as it was after the last commit.
|
/// state as it was after the last commit.
|
||||||
///
|
///
|
||||||
/// The opstamp at the last commit is returned.
|
/// The opstamp at the last commit is returned.
|
||||||
pub fn rollback(mut self) -> Result<IndexWriter> {
|
pub fn rollback(&mut self) -> Result<()> {
|
||||||
info!("Rolling back to opstamp {}", self.committed_opstamp);
|
info!("Rolling back to opstamp {}", self.committed_opstamp);
|
||||||
|
|
||||||
|
// marks the segment updater as killed. From now on, all
|
||||||
|
// segment updates will be ignored.
|
||||||
self.segment_updater.kill();
|
self.segment_updater.kill();
|
||||||
|
|
||||||
|
let document_receiver = self.document_receiver.clone();
|
||||||
|
|
||||||
|
// take the directory lock to create a new index_writer.
|
||||||
|
let directory_lock = self._directory_lock
|
||||||
|
.take()
|
||||||
|
.expect("The IndexWriter does not have any lock. This is a bug, please report.");
|
||||||
|
|
||||||
|
let new_index_writer: IndexWriter = open_index_writer(
|
||||||
|
&self.index,
|
||||||
|
self.num_threads,
|
||||||
|
self.heap_size_in_bytes_per_thread,
|
||||||
|
directory_lock,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
// the current `self` is dropped right away because of this call.
|
||||||
|
//
|
||||||
|
// This will drop the document queue, and the thread
|
||||||
|
// should terminate.
|
||||||
|
mem::replace(self, new_index_writer);
|
||||||
|
|
||||||
// Drains the document receiver pipeline :
|
// Drains the document receiver pipeline :
|
||||||
// Workers don't need to index the pending documents.
|
// Workers don't need to index the pending documents.
|
||||||
let receiver_clone = self.document_receiver.clone();
|
//
|
||||||
let index = self.index.clone();
|
// This will reach an end as the only document_sender
|
||||||
let num_threads = self.num_threads;
|
// was dropped with the index_writer.
|
||||||
let heap_size_in_bytes_per_thread = self.heap_size_in_bytes_per_thread;
|
for _ in document_receiver.clone() {}
|
||||||
drop(self);
|
|
||||||
for _ in receiver_clone {}
|
|
||||||
|
|
||||||
let index_writer = open_index_writer(&index, num_threads, heap_size_in_bytes_per_thread)?;
|
Ok(())
|
||||||
|
|
||||||
Ok(index_writer)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Commits all of the pending changes
|
/// Prepares a commit.
|
||||||
///
|
///
|
||||||
/// A call to commit blocks.
|
/// Calling `prepare_commit()` will cut the indexing
|
||||||
/// After it returns, all of the document that
|
/// queue. All pending documents will be sent to the
|
||||||
/// were added since the last commit are published
|
/// indexing workers. They will then terminate, regardless
|
||||||
/// and persisted.
|
/// of the size of their current segment and flush their
|
||||||
|
/// work on disk.
|
||||||
///
|
///
|
||||||
/// In case of a crash or an hardware failure (as
|
/// Once a commit is "prepared", you can either
|
||||||
/// long as the hard disk is spared), it will be possible
|
/// call
|
||||||
/// to resume indexing from this point.
|
/// * `.commit()`: to accept this commit
|
||||||
|
/// * `.abort()`: to cancel this commit.
|
||||||
///
|
///
|
||||||
/// Commit returns the `opstamp` of the last document
|
/// In the current implementation, `PreparedCommit` borrows
|
||||||
/// that made it in the commit.
|
/// the `IndexWriter` mutably so we are guaranteed that no new
|
||||||
|
/// document can be added as long as it is committed or is
|
||||||
|
/// dropped.
|
||||||
///
|
///
|
||||||
pub fn commit(&mut self) -> Result<u64> {
|
/// It is also possible to add a payload to the `commit`
|
||||||
// here, because we join all of the worker threads,
|
/// using this API.
|
||||||
|
/// See [`PreparedCommit::set_payload()`](PreparedCommit.html)
|
||||||
|
pub fn prepare_commit(&mut self) -> Result<PreparedCommit> {
|
||||||
|
// Here, because we join all of the worker threads,
|
||||||
// all of the segment update for this commit have been
|
// all of the segment update for this commit have been
|
||||||
// sent.
|
// sent.
|
||||||
//
|
//
|
||||||
@@ -520,8 +548,7 @@ impl IndexWriter {
|
|||||||
|
|
||||||
// This will move uncommitted segments to the state of
|
// This will move uncommitted segments to the state of
|
||||||
// committed segments.
|
// committed segments.
|
||||||
self.committed_opstamp = self.stamper.stamp();
|
info!("Preparing commit");
|
||||||
info!("committing {}", self.committed_opstamp);
|
|
||||||
|
|
||||||
// this will drop the current document channel
|
// this will drop the current document channel
|
||||||
// and recreate a new one channels.
|
// and recreate a new one channels.
|
||||||
@@ -543,10 +570,32 @@ impl IndexWriter {
|
|||||||
self.add_indexing_worker()?;
|
self.add_indexing_worker()?;
|
||||||
}
|
}
|
||||||
|
|
||||||
// wait for the segment update thread to have processed the info
|
let commit_opstamp = self.stamper.stamp();
|
||||||
self.segment_updater.commit(self.committed_opstamp)?;
|
let prepared_commit = PreparedCommit::new(self, commit_opstamp);
|
||||||
|
info!("Prepared commit {}", commit_opstamp);
|
||||||
|
Ok(prepared_commit)
|
||||||
|
}
|
||||||
|
|
||||||
Ok(self.committed_opstamp)
|
/// Commits all of the pending changes
|
||||||
|
///
|
||||||
|
/// A call to commit blocks.
|
||||||
|
/// After it returns, all of the document that
|
||||||
|
/// were added since the last commit are published
|
||||||
|
/// and persisted.
|
||||||
|
///
|
||||||
|
/// In case of a crash or an hardware failure (as
|
||||||
|
/// long as the hard disk is spared), it will be possible
|
||||||
|
/// to resume indexing from this point.
|
||||||
|
///
|
||||||
|
/// Commit returns the `opstamp` of the last document
|
||||||
|
/// that made it in the commit.
|
||||||
|
///
|
||||||
|
pub fn commit(&mut self) -> Result<u64> {
|
||||||
|
self.prepare_commit()?.commit()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn segment_updater(&self) -> &SegmentUpdater {
|
||||||
|
&self.segment_updater
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Delete all documents containing a given term.
|
/// Delete all documents containing a given term.
|
||||||
@@ -664,8 +713,7 @@ mod tests {
|
|||||||
doc.add_text(text_field, "a");
|
doc.add_text(text_field, "a");
|
||||||
index_writer.add_document(doc);
|
index_writer.add_document(doc);
|
||||||
}
|
}
|
||||||
|
index_writer.rollback().unwrap();
|
||||||
index_writer = index_writer.rollback().unwrap();
|
|
||||||
|
|
||||||
assert_eq!(index_writer.commit_opstamp(), 0u64);
|
assert_eq!(index_writer.commit_opstamp(), 0u64);
|
||||||
assert_eq!(num_docs_containing("a"), 0);
|
assert_eq!(num_docs_containing("a"), 0);
|
||||||
@@ -728,4 +776,78 @@ mod tests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_prepare_with_commit_message() {
|
||||||
|
let _ = env_logger::init();
|
||||||
|
let mut schema_builder = schema::SchemaBuilder::default();
|
||||||
|
let text_field = schema_builder.add_text_field("text", schema::TEXT);
|
||||||
|
let index = Index::create_in_ram(schema_builder.build());
|
||||||
|
|
||||||
|
{
|
||||||
|
// writing the segment
|
||||||
|
let mut index_writer = index.writer_with_num_threads(4, 4 * 30_000_000).unwrap();
|
||||||
|
// create 8 segments with 100 tiny docs
|
||||||
|
for _doc in 0..100 {
|
||||||
|
index_writer.add_document(doc!(text_field => "a"));
|
||||||
|
}
|
||||||
|
{
|
||||||
|
let mut prepared_commit = index_writer.prepare_commit().expect("commit failed");
|
||||||
|
prepared_commit.set_payload("first commit");
|
||||||
|
assert_eq!(prepared_commit.opstamp(), 100);
|
||||||
|
prepared_commit.commit().expect("commit failed");
|
||||||
|
}
|
||||||
|
{
|
||||||
|
let metas = index.load_metas().unwrap();
|
||||||
|
assert_eq!(metas.payload.unwrap(), "first commit");
|
||||||
|
}
|
||||||
|
for _doc in 0..100 {
|
||||||
|
index_writer.add_document(doc!(text_field => "a"));
|
||||||
|
}
|
||||||
|
index_writer.commit().unwrap();
|
||||||
|
{
|
||||||
|
let metas = index.load_metas().unwrap();
|
||||||
|
assert!(metas.payload.is_none());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_prepare_but_rollback() {
|
||||||
|
let _ = env_logger::init();
|
||||||
|
let mut schema_builder = schema::SchemaBuilder::default();
|
||||||
|
let text_field = schema_builder.add_text_field("text", schema::TEXT);
|
||||||
|
let index = Index::create_in_ram(schema_builder.build());
|
||||||
|
|
||||||
|
{
|
||||||
|
// writing the segment
|
||||||
|
let mut index_writer = index.writer_with_num_threads(4, 4 * 30_000_000).unwrap();
|
||||||
|
// create 8 segments with 100 tiny docs
|
||||||
|
for _doc in 0..100 {
|
||||||
|
index_writer.add_document(doc!(text_field => "a"));
|
||||||
|
}
|
||||||
|
{
|
||||||
|
let mut prepared_commit = index_writer.prepare_commit().expect("commit failed");
|
||||||
|
prepared_commit.set_payload("first commit");
|
||||||
|
assert_eq!(prepared_commit.opstamp(), 100);
|
||||||
|
prepared_commit.abort().expect("commit failed");
|
||||||
|
}
|
||||||
|
{
|
||||||
|
let metas = index.load_metas().unwrap();
|
||||||
|
assert!(metas.payload.is_none());
|
||||||
|
}
|
||||||
|
for _doc in 0..100 {
|
||||||
|
index_writer.add_document(doc!(text_field => "b"));
|
||||||
|
}
|
||||||
|
index_writer.commit().unwrap();
|
||||||
|
}
|
||||||
|
index.load_searchers().unwrap();
|
||||||
|
let num_docs_containing = |s: &str| {
|
||||||
|
let searcher = index.searcher();
|
||||||
|
let term_a = Term::from_field_text(text_field, s);
|
||||||
|
searcher.doc_freq(&term_a)
|
||||||
|
};
|
||||||
|
assert_eq!(num_docs_containing("a"), 0);
|
||||||
|
assert_eq!(num_docs_containing("b"), 100);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -105,7 +105,6 @@ impl IndexMerger {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline(never)]
|
|
||||||
fn write_fieldnorms(&self, fast_field_serializer: &mut FastFieldSerializer) -> Result<()> {
|
fn write_fieldnorms(&self, fast_field_serializer: &mut FastFieldSerializer) -> Result<()> {
|
||||||
let fieldnorm_fastfields: Vec<Field> = self.schema
|
let fieldnorm_fastfields: Vec<Field> = self.schema
|
||||||
.fields()
|
.fields()
|
||||||
@@ -121,7 +120,6 @@ impl IndexMerger {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline(never)]
|
|
||||||
fn write_fast_fields(&self, fast_field_serializer: &mut FastFieldSerializer) -> Result<()> {
|
fn write_fast_fields(&self, fast_field_serializer: &mut FastFieldSerializer) -> Result<()> {
|
||||||
let fast_fields: Vec<Field> = self.schema
|
let fast_fields: Vec<Field> = self.schema
|
||||||
.fields()
|
.fields()
|
||||||
@@ -200,7 +198,6 @@ impl IndexMerger {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline(never)]
|
|
||||||
fn write_postings(&self, serializer: &mut InvertedIndexSerializer) -> Result<()> {
|
fn write_postings(&self, serializer: &mut InvertedIndexSerializer) -> Result<()> {
|
||||||
let mut delta_computer = DeltaComputer::new();
|
let mut delta_computer = DeltaComputer::new();
|
||||||
|
|
||||||
@@ -335,16 +332,19 @@ impl IndexMerger {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline(never)]
|
|
||||||
fn write_storable_fields(&self, store_writer: &mut StoreWriter) -> Result<()> {
|
fn write_storable_fields(&self, store_writer: &mut StoreWriter) -> Result<()> {
|
||||||
for reader in &self.readers {
|
for reader in &self.readers {
|
||||||
let store_reader = reader.get_store_reader();
|
let store_reader = reader.get_store_reader();
|
||||||
for doc_id in 0..reader.max_doc() {
|
if reader.num_deleted_docs() > 0 {
|
||||||
if !reader.is_deleted(doc_id) {
|
for doc_id in 0..reader.max_doc() {
|
||||||
let doc = store_reader.get(doc_id)?;
|
if !reader.is_deleted(doc_id) {
|
||||||
let field_values: Vec<&FieldValue> = doc.field_values().iter().collect();
|
let doc = store_reader.get(doc_id)?;
|
||||||
store_writer.store(&field_values)?;
|
let field_values: Vec<&FieldValue> = doc.field_values().iter().collect();
|
||||||
|
store_writer.store(&field_values)?;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
store_writer.stack(store_reader)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|||||||
@@ -13,7 +13,9 @@ mod segment_entry;
|
|||||||
mod doc_opstamp_mapping;
|
mod doc_opstamp_mapping;
|
||||||
pub mod operation;
|
pub mod operation;
|
||||||
mod stamper;
|
mod stamper;
|
||||||
|
mod prepared_commit;
|
||||||
|
|
||||||
|
pub use self::prepared_commit::PreparedCommit;
|
||||||
pub use self::segment_entry::{SegmentEntry, SegmentState};
|
pub use self::segment_entry::{SegmentEntry, SegmentState};
|
||||||
pub use self::segment_serializer::SegmentSerializer;
|
pub use self::segment_serializer::SegmentSerializer;
|
||||||
pub use self::segment_writer::SegmentWriter;
|
pub use self::segment_writer::SegmentWriter;
|
||||||
@@ -21,6 +23,7 @@ pub use self::index_writer::IndexWriter;
|
|||||||
pub use self::log_merge_policy::LogMergePolicy;
|
pub use self::log_merge_policy::LogMergePolicy;
|
||||||
pub use self::merge_policy::{MergeCandidate, MergePolicy, NoMergePolicy};
|
pub use self::merge_policy::{MergeCandidate, MergePolicy, NoMergePolicy};
|
||||||
pub use self::segment_manager::SegmentManager;
|
pub use self::segment_manager::SegmentManager;
|
||||||
|
pub(crate) use self::directory_lock::DirectoryLock;
|
||||||
|
|
||||||
/// Alias for the default merge policy, which is the `LogMergePolicy`.
|
/// Alias for the default merge policy, which is the `LogMergePolicy`.
|
||||||
pub type DefaultMergePolicy = LogMergePolicy;
|
pub type DefaultMergePolicy = LogMergePolicy;
|
||||||
|
|||||||
39
src/indexer/prepared_commit.rs
Normal file
39
src/indexer/prepared_commit.rs
Normal file
@@ -0,0 +1,39 @@
|
|||||||
|
use Result;
|
||||||
|
use super::IndexWriter;
|
||||||
|
|
||||||
|
/// A prepared commit
|
||||||
|
pub struct PreparedCommit<'a> {
|
||||||
|
index_writer: &'a mut IndexWriter,
|
||||||
|
payload: Option<String>,
|
||||||
|
opstamp: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> PreparedCommit<'a> {
|
||||||
|
pub(crate) fn new(index_writer: &'a mut IndexWriter, opstamp: u64) -> PreparedCommit {
|
||||||
|
PreparedCommit {
|
||||||
|
index_writer: index_writer,
|
||||||
|
payload: None,
|
||||||
|
opstamp: opstamp,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn opstamp(&self) -> u64 {
|
||||||
|
self.opstamp
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set_payload(&mut self, payload: &str) {
|
||||||
|
self.payload = Some(payload.to_string())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn abort(self) -> Result<()> {
|
||||||
|
self.index_writer.rollback()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn commit(self) -> Result<u64> {
|
||||||
|
info!("committing {}", self.opstamp);
|
||||||
|
self.index_writer
|
||||||
|
.segment_updater()
|
||||||
|
.commit(self.opstamp, self.payload)?;
|
||||||
|
Ok(self.opstamp)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -56,7 +56,6 @@ impl SegmentSerializer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Finalize the segment serialization.
|
/// Finalize the segment serialization.
|
||||||
#[inline(never)]
|
|
||||||
pub fn close(self) -> Result<()> {
|
pub fn close(self) -> Result<()> {
|
||||||
self.fast_field_serializer.close()?;
|
self.fast_field_serializer.close()?;
|
||||||
self.postings_serializer.close()?;
|
self.postings_serializer.close()?;
|
||||||
|
|||||||
@@ -46,7 +46,7 @@ use super::segment_manager::{get_mergeable_segments, SegmentManager};
|
|||||||
///
|
///
|
||||||
/// This method is not part of tantivy's public API
|
/// This method is not part of tantivy's public API
|
||||||
pub fn save_new_metas(schema: Schema, opstamp: u64, directory: &mut Directory) -> Result<()> {
|
pub fn save_new_metas(schema: Schema, opstamp: u64, directory: &mut Directory) -> Result<()> {
|
||||||
save_metas(vec![], schema, opstamp, directory)
|
save_metas(vec![], schema, opstamp, None, directory)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Save the index meta file.
|
/// Save the index meta file.
|
||||||
@@ -62,12 +62,14 @@ pub fn save_metas(
|
|||||||
segment_metas: Vec<SegmentMeta>,
|
segment_metas: Vec<SegmentMeta>,
|
||||||
schema: Schema,
|
schema: Schema,
|
||||||
opstamp: u64,
|
opstamp: u64,
|
||||||
|
payload: Option<String>,
|
||||||
directory: &mut Directory,
|
directory: &mut Directory,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let metas = IndexMeta {
|
let metas = IndexMeta {
|
||||||
segments: segment_metas,
|
segments: segment_metas,
|
||||||
schema,
|
schema,
|
||||||
opstamp,
|
opstamp,
|
||||||
|
payload: payload.clone(),
|
||||||
};
|
};
|
||||||
let mut buffer = serde_json::to_vec_pretty(&metas)?;
|
let mut buffer = serde_json::to_vec_pretty(&metas)?;
|
||||||
write!(&mut buffer, "\n")?;
|
write!(&mut buffer, "\n")?;
|
||||||
@@ -222,7 +224,7 @@ impl SegmentUpdater {
|
|||||||
self.0.killed.store(true, Ordering::Release);
|
self.0.killed.store(true, Ordering::Release);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn is_alive(&self) -> bool {
|
pub fn is_alive(&self) -> bool {
|
||||||
!self.0.killed.load(Ordering::Acquire)
|
!self.0.killed.load(Ordering::Acquire)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -239,7 +241,7 @@ impl SegmentUpdater {
|
|||||||
Ok(segment_entries)
|
Ok(segment_entries)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn save_metas(&self, opstamp: u64) {
|
pub fn save_metas(&self, opstamp: u64, commit_message: Option<String>) {
|
||||||
if self.is_alive() {
|
if self.is_alive() {
|
||||||
let index = &self.0.index;
|
let index = &self.0.index;
|
||||||
let directory = index.directory();
|
let directory = index.directory();
|
||||||
@@ -247,6 +249,7 @@ impl SegmentUpdater {
|
|||||||
self.0.segment_manager.committed_segment_metas(),
|
self.0.segment_manager.committed_segment_metas(),
|
||||||
index.schema(),
|
index.schema(),
|
||||||
opstamp,
|
opstamp,
|
||||||
|
commit_message,
|
||||||
directory.box_clone().borrow_mut(),
|
directory.box_clone().borrow_mut(),
|
||||||
).expect("Could not save metas.");
|
).expect("Could not save metas.");
|
||||||
}
|
}
|
||||||
@@ -266,14 +269,14 @@ impl SegmentUpdater {
|
|||||||
.garbage_collect(|| self.0.segment_manager.list_files());
|
.garbage_collect(|| self.0.segment_manager.list_files());
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn commit(&self, opstamp: u64) -> Result<()> {
|
pub fn commit(&self, opstamp: u64, payload: Option<String>) -> Result<()> {
|
||||||
self.run_async(move |segment_updater| {
|
self.run_async(move |segment_updater| {
|
||||||
if segment_updater.is_alive() {
|
if segment_updater.is_alive() {
|
||||||
let segment_entries = segment_updater
|
let segment_entries = segment_updater
|
||||||
.purge_deletes(opstamp)
|
.purge_deletes(opstamp)
|
||||||
.expect("Failed purge deletes");
|
.expect("Failed purge deletes");
|
||||||
segment_updater.0.segment_manager.commit(segment_entries);
|
segment_updater.0.segment_manager.commit(segment_entries);
|
||||||
segment_updater.save_metas(opstamp);
|
segment_updater.save_metas(opstamp, payload);
|
||||||
segment_updater.garbage_collect_files_exec();
|
segment_updater.garbage_collect_files_exec();
|
||||||
segment_updater.consider_merge_options();
|
segment_updater.consider_merge_options();
|
||||||
}
|
}
|
||||||
@@ -382,7 +385,12 @@ impl SegmentUpdater {
|
|||||||
let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone();
|
let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone();
|
||||||
let mut _file_protection_opt = None;
|
let mut _file_protection_opt = None;
|
||||||
if let Some(delete_operation) = delete_cursor.get() {
|
if let Some(delete_operation) = delete_cursor.get() {
|
||||||
let committed_opstamp = segment_updater.0.index.opstamp();
|
let committed_opstamp = segment_updater
|
||||||
|
.0
|
||||||
|
.index
|
||||||
|
.load_metas()
|
||||||
|
.expect("Failed to read opstamp")
|
||||||
|
.opstamp;
|
||||||
if delete_operation.opstamp < committed_opstamp {
|
if delete_operation.opstamp < committed_opstamp {
|
||||||
let index = &segment_updater.0.index;
|
let index = &segment_updater.0.index;
|
||||||
let segment = index.segment(after_merge_segment_entry.meta().clone());
|
let segment = index.segment(after_merge_segment_entry.meta().clone());
|
||||||
@@ -418,7 +426,8 @@ impl SegmentUpdater {
|
|||||||
.end_merge(&before_merge_segment_ids, after_merge_segment_entry);
|
.end_merge(&before_merge_segment_ids, after_merge_segment_entry);
|
||||||
segment_updater.consider_merge_options();
|
segment_updater.consider_merge_options();
|
||||||
info!("save metas");
|
info!("save metas");
|
||||||
segment_updater.save_metas(segment_updater.0.index.opstamp());
|
let previous_metas = segment_updater.0.index.load_metas().unwrap();
|
||||||
|
segment_updater.save_metas(previous_metas.opstamp, previous_metas.payload);
|
||||||
segment_updater.garbage_collect_files_exec();
|
segment_updater.garbage_collect_files_exec();
|
||||||
}).wait()
|
}).wait()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -34,6 +34,8 @@ extern crate log;
|
|||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate error_chain;
|
extern crate error_chain;
|
||||||
|
|
||||||
|
extern crate regex;
|
||||||
|
extern crate tempfile;
|
||||||
extern crate atomicwrites;
|
extern crate atomicwrites;
|
||||||
extern crate bit_set;
|
extern crate bit_set;
|
||||||
extern crate byteorder;
|
extern crate byteorder;
|
||||||
@@ -45,10 +47,8 @@ extern crate futures;
|
|||||||
extern crate futures_cpupool;
|
extern crate futures_cpupool;
|
||||||
extern crate itertools;
|
extern crate itertools;
|
||||||
extern crate lz4;
|
extern crate lz4;
|
||||||
extern crate memmap;
|
|
||||||
extern crate num_cpus;
|
extern crate num_cpus;
|
||||||
extern crate owning_ref;
|
extern crate owning_ref;
|
||||||
extern crate regex;
|
|
||||||
extern crate rust_stemmers;
|
extern crate rust_stemmers;
|
||||||
extern crate serde;
|
extern crate serde;
|
||||||
extern crate serde_json;
|
extern crate serde_json;
|
||||||
@@ -447,7 +447,7 @@ mod tests {
|
|||||||
{
|
{
|
||||||
index_writer.delete_term(Term::from_field_text(text_field, "c"));
|
index_writer.delete_term(Term::from_field_text(text_field, "c"));
|
||||||
}
|
}
|
||||||
index_writer = index_writer.rollback().unwrap();
|
index_writer.rollback().unwrap();
|
||||||
index_writer.delete_term(Term::from_field_text(text_field, "a"));
|
index_writer.delete_term(Term::from_field_text(text_field, "a"));
|
||||||
index_writer.commit().unwrap();
|
index_writer.commit().unwrap();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -373,7 +373,7 @@ mod test {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
pub fn test_parse_nonindexed_field_yields_error() {
|
pub fn test_parse_nonindexed_field_yields_error() {
|
||||||
let query_parser = make_query_parser();
|
let query_parser = make_query_parser();
|
||||||
|
|
||||||
let is_not_indexed_err = |query: &str| {
|
let is_not_indexed_err = |query: &str| {
|
||||||
let result: Result<Box<Query>, QueryParserError> = query_parser.parse_query(query);
|
let result: Result<Box<Query>, QueryParserError> = query_parser.parse_query(query);
|
||||||
|
|||||||
@@ -42,6 +42,11 @@ impl Default for TextOptions {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Configuration defining indexing for a text field.
|
||||||
|
/// It wraps:
|
||||||
|
///
|
||||||
|
/// * record (See [`IndexRecordOption`](./enum.IndexRecordOption.html))
|
||||||
|
/// * tokenizer
|
||||||
#[derive(Clone, PartialEq, Eq, Debug, Serialize, Deserialize)]
|
#[derive(Clone, PartialEq, Eq, Debug, Serialize, Deserialize)]
|
||||||
pub struct TextFieldIndexing {
|
pub struct TextFieldIndexing {
|
||||||
record: IndexRecordOption,
|
record: IndexRecordOption,
|
||||||
|
|||||||
@@ -34,22 +34,33 @@ impl StoreReader {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn block_offset(&self, doc_id: DocId) -> (DocId, u64) {
|
pub(crate) fn block_index(&self) -> SkipList<u64> {
|
||||||
SkipList::from(self.offset_index_source.as_slice())
|
SkipList::from(self.offset_index_source.as_slice())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn block_offset(&self, doc_id: DocId) -> (DocId, u64) {
|
||||||
|
self.block_index()
|
||||||
.seek(doc_id + 1)
|
.seek(doc_id + 1)
|
||||||
.unwrap_or((0u32, 0u64))
|
.unwrap_or((0u32, 0u64))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn block_data(&self) -> &[u8] {
|
||||||
|
self.data.as_slice()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn compressed_block(&self, addr: usize) -> &[u8] {
|
||||||
|
let total_buffer = self.data.as_slice();
|
||||||
|
let mut buffer = &total_buffer[addr..];
|
||||||
|
let block_len = u32::deserialize(&mut buffer).expect("") as usize;
|
||||||
|
&buffer[..block_len]
|
||||||
|
}
|
||||||
|
|
||||||
fn read_block(&self, block_offset: usize) -> io::Result<()> {
|
fn read_block(&self, block_offset: usize) -> io::Result<()> {
|
||||||
if block_offset != *self.current_block_offset.borrow() {
|
if block_offset != *self.current_block_offset.borrow() {
|
||||||
let mut current_block_mut = self.current_block.borrow_mut();
|
let mut current_block_mut = self.current_block.borrow_mut();
|
||||||
current_block_mut.clear();
|
current_block_mut.clear();
|
||||||
let total_buffer = self.data.as_slice();
|
let compressed_block = self.compressed_block(block_offset);
|
||||||
let mut cursor = &total_buffer[block_offset..];
|
let mut lz4_decoder = lz4::Decoder::new(compressed_block)?;
|
||||||
let block_length = u32::deserialize(&mut cursor).unwrap();
|
|
||||||
let block_array: &[u8] = &total_buffer
|
|
||||||
[(block_offset + 4 as usize)..(block_offset + 4 + block_length as usize)];
|
|
||||||
let mut lz4_decoder = lz4::Decoder::new(block_array)?;
|
|
||||||
*self.current_block_offset.borrow_mut() = usize::max_value();
|
*self.current_block_offset.borrow_mut() = usize::max_value();
|
||||||
lz4_decoder.read_to_end(&mut current_block_mut).map(|_| ())?;
|
lz4_decoder.read_to_end(&mut current_block_mut).map(|_| ())?;
|
||||||
*self.current_block_offset.borrow_mut() = block_offset;
|
*self.current_block_offset.borrow_mut() = block_offset;
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ use DocId;
|
|||||||
use schema::FieldValue;
|
use schema::FieldValue;
|
||||||
use common::BinarySerializable;
|
use common::BinarySerializable;
|
||||||
use std::io::{self, Write};
|
use std::io::{self, Write};
|
||||||
|
use super::StoreReader;
|
||||||
use lz4;
|
use lz4;
|
||||||
use datastruct::SkipListBuilder;
|
use datastruct::SkipListBuilder;
|
||||||
use common::CountingWriter;
|
use common::CountingWriter;
|
||||||
@@ -60,6 +61,35 @@ impl StoreWriter {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Stacks a store reader on top of the documents written so far.
|
||||||
|
/// This method is an optimization compared to iterating over the documents
|
||||||
|
/// in the store and adding them one by one, as the store's data will
|
||||||
|
/// not be decompressed and then recompressed.
|
||||||
|
pub fn stack(&mut self, store_reader: &StoreReader) -> io::Result<()> {
|
||||||
|
if !self.current_block.is_empty() {
|
||||||
|
self.write_and_compress_block()?;
|
||||||
|
self.offset_index_writer.insert(
|
||||||
|
self.doc,
|
||||||
|
&(self.writer.written_bytes() as u64),
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
let doc_offset = self.doc;
|
||||||
|
let start_offset = self.writer.written_bytes() as u64;
|
||||||
|
|
||||||
|
// just bulk write all of the block of the given reader.
|
||||||
|
self.writer.write_all(store_reader.block_data())?;
|
||||||
|
|
||||||
|
// concatenate the index of the `store_reader`, after translating
|
||||||
|
// its start doc id and its start file offset.
|
||||||
|
for (next_doc_id, block_addr) in store_reader.block_index() {
|
||||||
|
self.doc = doc_offset + next_doc_id;
|
||||||
|
self.offset_index_writer.insert(
|
||||||
|
self.doc,
|
||||||
|
&(start_offset + block_addr))?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
fn write_and_compress_block(&mut self) -> io::Result<()> {
|
fn write_and_compress_block(&mut self) -> io::Result<()> {
|
||||||
self.intermediary_buffer.clear();
|
self.intermediary_buffer.clear();
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -49,19 +49,26 @@ impl TermDeltaDecoder {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// code
|
||||||
|
// first bit represents whether the prefix / suffix len can be encoded
|
||||||
|
// on the same byte. (the next one)
|
||||||
|
//
|
||||||
|
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
pub fn decode<'a>(&mut self, code: u8, mut cursor: &'a [u8]) -> &'a [u8] {
|
pub fn decode<'a>(&mut self, code: u8, mut cursor: &'a [u8]) -> &'a [u8] {
|
||||||
let (prefix_len, suffix_len): (usize, usize) = if (code & 1u8) == 1u8 {
|
let (prefix_len, suffix_len): (usize, usize) =
|
||||||
let b = cursor[0];
|
if (code & 1u8) == 1u8 {
|
||||||
cursor = &cursor[1..];
|
let b = cursor[0];
|
||||||
let prefix_len = (b & 15u8) as usize;
|
cursor = &cursor[1..];
|
||||||
let suffix_len = (b >> 4u8) as usize;
|
let prefix_len = (b & 15u8) as usize;
|
||||||
(prefix_len, suffix_len)
|
let suffix_len = (b >> 4u8) as usize;
|
||||||
} else {
|
(prefix_len, suffix_len)
|
||||||
let prefix_len = u32::deserialize(&mut cursor).unwrap();
|
} else {
|
||||||
let suffix_len = u32::deserialize(&mut cursor).unwrap();
|
let prefix_len = u32::deserialize(&mut cursor).unwrap();
|
||||||
(prefix_len as usize, suffix_len as usize)
|
let suffix_len = u32::deserialize(&mut cursor).unwrap();
|
||||||
};
|
(prefix_len as usize, suffix_len as usize)
|
||||||
|
};
|
||||||
unsafe { self.term.set_len(prefix_len) };
|
unsafe { self.term.set_len(prefix_len) };
|
||||||
self.term.extend_from_slice(&(*cursor)[..suffix_len]);
|
self.term.extend_from_slice(&(*cursor)[..suffix_len]);
|
||||||
&cursor[suffix_len..]
|
&cursor[suffix_len..]
|
||||||
@@ -75,8 +82,8 @@ impl TermDeltaDecoder {
|
|||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
pub struct DeltaTermInfo {
|
pub struct DeltaTermInfo {
|
||||||
pub doc_freq: u32,
|
pub doc_freq: u32,
|
||||||
pub delta_postings_offset: u32,
|
pub delta_postings_offset: u64,
|
||||||
pub delta_positions_offset: u32,
|
pub delta_positions_offset: u64,
|
||||||
pub positions_inner_offset: u8,
|
pub positions_inner_offset: u8,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -100,13 +107,13 @@ impl TermInfoDeltaEncoder {
|
|||||||
pub fn encode(&mut self, term_info: TermInfo) -> DeltaTermInfo {
|
pub fn encode(&mut self, term_info: TermInfo) -> DeltaTermInfo {
|
||||||
let mut delta_term_info = DeltaTermInfo {
|
let mut delta_term_info = DeltaTermInfo {
|
||||||
doc_freq: term_info.doc_freq,
|
doc_freq: term_info.doc_freq,
|
||||||
delta_postings_offset: (term_info.postings_offset - self.term_info.postings_offset) as u32,
|
delta_postings_offset: term_info.postings_offset - self.term_info.postings_offset,
|
||||||
delta_positions_offset: 0,
|
delta_positions_offset: 0u64,
|
||||||
positions_inner_offset: 0,
|
positions_inner_offset: 0,
|
||||||
};
|
};
|
||||||
if self.has_positions {
|
if self.has_positions {
|
||||||
delta_term_info.delta_positions_offset =
|
delta_term_info.delta_positions_offset =
|
||||||
(term_info.positions_offset - self.term_info.positions_offset) as u32;
|
term_info.positions_offset - self.term_info.positions_offset;
|
||||||
delta_term_info.positions_inner_offset = term_info.positions_inner_offset;
|
delta_term_info.positions_inner_offset = term_info.positions_inner_offset;
|
||||||
}
|
}
|
||||||
mem::replace(&mut self.term_info, term_info);
|
mem::replace(&mut self.term_info, term_info);
|
||||||
@@ -152,15 +159,15 @@ impl TermInfoDeltaDecoder {
|
|||||||
let mut v: u64 = unsafe { *(cursor.as_ptr() as *const u64) };
|
let mut v: u64 = unsafe { *(cursor.as_ptr() as *const u64) };
|
||||||
let doc_freq: u32 = (v as u32) & make_mask(num_bytes_docfreq);
|
let doc_freq: u32 = (v as u32) & make_mask(num_bytes_docfreq);
|
||||||
v >>= (num_bytes_docfreq as u64) * 8u64;
|
v >>= (num_bytes_docfreq as u64) * 8u64;
|
||||||
let delta_postings_offset: u32 = (v as u32) & make_mask(num_bytes_postings_offset);
|
let delta_postings_offset: u64 = v & make_mask(num_bytes_postings_offset);
|
||||||
cursor = &cursor[num_bytes_docfreq + num_bytes_postings_offset..];
|
cursor = &cursor[num_bytes_docfreq + num_bytes_postings_offset..];
|
||||||
self.term_info.doc_freq = doc_freq;
|
self.term_info.doc_freq = doc_freq;
|
||||||
self.term_info.postings_offset += delta_postings_offset as u64;
|
self.term_info.postings_offset += delta_postings_offset;
|
||||||
if self.has_positions {
|
if self.has_positions {
|
||||||
let num_bytes_positions_offset = ((code >> 5) & 3) as usize + 1;
|
let num_bytes_positions_offset = ((code >> 5) & 3) as usize + 1;
|
||||||
let delta_positions_offset: u32 =
|
let delta_positions_offset: u32 =
|
||||||
unsafe { *(cursor.as_ptr() as *const u32) } & make_mask(num_bytes_positions_offset);
|
unsafe { *(cursor.as_ptr() as *const u32) } & make_mask(num_bytes_positions_offset);
|
||||||
self.term_info.positions_offset += delta_positions_offset as u64;
|
self.term_info.positions_offset += delta_positions_offset;
|
||||||
self.term_info.positions_inner_offset = cursor[num_bytes_positions_offset];
|
self.term_info.positions_inner_offset = cursor[num_bytes_positions_offset];
|
||||||
&cursor[num_bytes_positions_offset + 1..]
|
&cursor[num_bytes_positions_offset + 1..]
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@@ -1,11 +1,10 @@
|
|||||||
use std::io::{self, Read, Write};
|
use std::io::{self, Read, Write};
|
||||||
use common::{VInt, BinarySerializable};
|
use common::BinarySerializable;
|
||||||
|
|
||||||
mod termdict;
|
mod termdict;
|
||||||
mod streamer;
|
mod streamer;
|
||||||
mod delta_encoder;
|
mod delta_encoder;
|
||||||
|
|
||||||
|
|
||||||
pub use self::delta_encoder::{TermDeltaDecoder, TermDeltaEncoder};
|
pub use self::delta_encoder::{TermDeltaDecoder, TermDeltaEncoder};
|
||||||
pub use self::delta_encoder::{DeltaTermInfo, TermInfoDeltaDecoder, TermInfoDeltaEncoder};
|
pub use self::delta_encoder::{DeltaTermInfo, TermInfoDeltaDecoder, TermInfoDeltaEncoder};
|
||||||
|
|
||||||
@@ -16,23 +15,23 @@ pub use self::streamer::TermStreamerBuilderImpl;
|
|||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct CheckPoint {
|
pub struct CheckPoint {
|
||||||
pub stream_offset: u64,
|
pub stream_offset: u32,
|
||||||
pub postings_offset: u64,
|
pub postings_offset: u32,
|
||||||
pub positions_offset: u64,
|
pub positions_offset: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BinarySerializable for CheckPoint {
|
impl BinarySerializable for CheckPoint {
|
||||||
fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
|
fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
|
||||||
VInt(self.stream_offset).serialize(writer)?;
|
self.stream_offset.serialize(writer)?;
|
||||||
VInt(self.postings_offset).serialize(writer)?;
|
self.postings_offset.serialize(writer)?;
|
||||||
VInt(self.positions_offset).serialize(writer)?;
|
self.positions_offset.serialize(writer)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
|
fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
|
||||||
let stream_offset = VInt::deserialize(reader)?.0;
|
let stream_offset = u32::deserialize(reader)?;
|
||||||
let postings_offset = VInt::deserialize(reader)?.0;
|
let postings_offset = u32::deserialize(reader)?;
|
||||||
let positions_offset = VInt::deserialize(reader)?.0;
|
let positions_offset = u32::deserialize(reader)?;
|
||||||
Ok(CheckPoint {
|
Ok(CheckPoint {
|
||||||
stream_offset,
|
stream_offset,
|
||||||
postings_offset,
|
postings_offset,
|
||||||
|
|||||||
@@ -28,10 +28,11 @@ fn has_positions(field_type: &FieldType) -> bool {
|
|||||||
match *field_type {
|
match *field_type {
|
||||||
FieldType::Str(ref text_options) => {
|
FieldType::Str(ref text_options) => {
|
||||||
let indexing_options = text_options.get_indexing_options();
|
let indexing_options = text_options.get_indexing_options();
|
||||||
if let Some(text_field_indexing) = indexing_options {
|
if indexing_options.is_position_enabled() {
|
||||||
return text_field_indexing.index_option().has_positions()
|
true
|
||||||
|
} else {
|
||||||
|
false
|
||||||
}
|
}
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
_ => false,
|
_ => false,
|
||||||
}
|
}
|
||||||
@@ -59,10 +60,10 @@ where
|
|||||||
W: Write,
|
W: Write,
|
||||||
{
|
{
|
||||||
fn add_index_entry(&mut self) {
|
fn add_index_entry(&mut self) {
|
||||||
let stream_offset: u64 = self.write.written_bytes() as u64;
|
let stream_offset = self.write.written_bytes() as u32;
|
||||||
let term_info = self.term_info_encoder.term_info();
|
let term_info = self.term_info_encoder.term_info();
|
||||||
let postings_offset: u64 = term_info.postings_offset;
|
let postings_offset = term_info.postings_offset as u32;
|
||||||
let positions_offset: u64 = term_info.positions_offset;
|
let positions_offset = term_info.positions_offset as u32;
|
||||||
let checkpoint = CheckPoint {
|
let checkpoint = CheckPoint {
|
||||||
stream_offset,
|
stream_offset,
|
||||||
postings_offset,
|
postings_offset,
|
||||||
|
|||||||
@@ -1,9 +1,10 @@
|
|||||||
use super::{Token, TokenFilter, TokenStream};
|
use super::{Token, TokenFilter, TokenStream};
|
||||||
|
|
||||||
|
/// `TokenFilter` that removes all tokens that contain non
|
||||||
|
/// ascii alphanumeric characters.
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct AlphaNumOnlyFilter;
|
pub struct AlphaNumOnlyFilter;
|
||||||
|
|
||||||
|
|
||||||
pub struct AlphaNumOnlyFilterStream<TailTokenStream>
|
pub struct AlphaNumOnlyFilterStream<TailTokenStream>
|
||||||
where TailTokenStream: TokenStream
|
where TailTokenStream: TokenStream
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -139,7 +139,6 @@ mod token_stream_chain;
|
|||||||
mod raw_tokenizer;
|
mod raw_tokenizer;
|
||||||
mod alphanum_only;
|
mod alphanum_only;
|
||||||
|
|
||||||
|
|
||||||
pub use self::alphanum_only::AlphaNumOnlyFilter;
|
pub use self::alphanum_only::AlphaNumOnlyFilter;
|
||||||
pub use self::tokenizer::{Token, TokenFilter, TokenStream, Tokenizer};
|
pub use self::tokenizer::{Token, TokenFilter, TokenStream, Tokenizer};
|
||||||
pub use self::tokenizer::BoxedTokenizer;
|
pub use self::tokenizer::BoxedTokenizer;
|
||||||
|
|||||||
@@ -11,7 +11,7 @@ pub struct RemoveLongFilter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl RemoveLongFilter {
|
impl RemoveLongFilter {
|
||||||
// the limit is in bytes of the UTF-8 representation.
|
/// Creates a `RemoveLongFilter` given a limit in bytes of the UTF-8 representation.
|
||||||
pub fn limit(length_limit: usize) -> RemoveLongFilter {
|
pub fn limit(length_limit: usize) -> RemoveLongFilter {
|
||||||
RemoveLongFilter { length_limit }
|
RemoveLongFilter { length_limit }
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,12 +2,15 @@ use std::sync::Arc;
|
|||||||
use super::{Token, TokenFilter, TokenStream};
|
use super::{Token, TokenFilter, TokenStream};
|
||||||
use rust_stemmers::{self, Algorithm};
|
use rust_stemmers::{self, Algorithm};
|
||||||
|
|
||||||
|
/// `Stemmer` token filter. Currently only English is supported.
|
||||||
|
/// Tokens are expected to be lowercased beforehands.
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct Stemmer {
|
pub struct Stemmer {
|
||||||
stemmer_algorithm: Arc<Algorithm>,
|
stemmer_algorithm: Arc<Algorithm>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Stemmer {
|
impl Stemmer {
|
||||||
|
/// Creates a new Stemmer `TokenFilter`.
|
||||||
pub fn new() -> Stemmer {
|
pub fn new() -> Stemmer {
|
||||||
Stemmer {
|
Stemmer {
|
||||||
stemmer_algorithm: Arc::new(Algorithm::English),
|
stemmer_algorithm: Arc::new(Algorithm::English),
|
||||||
|
|||||||
@@ -195,6 +195,10 @@ pub trait TokenStream {
|
|||||||
/// Returns a mutable reference to the current token.
|
/// Returns a mutable reference to the current token.
|
||||||
fn token_mut(&mut self) -> &mut Token;
|
fn token_mut(&mut self) -> &mut Token;
|
||||||
|
|
||||||
|
/// Helper to iterate over tokens. It
|
||||||
|
/// simply combines a call to `.advance()`
|
||||||
|
/// and `.token()`.
|
||||||
|
///
|
||||||
/// ```
|
/// ```
|
||||||
/// # extern crate tantivy;
|
/// # extern crate tantivy;
|
||||||
/// # use tantivy::tokenizer::*;
|
/// # use tantivy::tokenizer::*;
|
||||||
@@ -217,6 +221,8 @@ pub trait TokenStream {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Helper function to consume the entire `TokenStream`
|
||||||
|
/// and push the tokens to a sink function.
|
||||||
fn process(&mut self, sink: &mut FnMut(&Token)) -> u32 {
|
fn process(&mut self, sink: &mut FnMut(&Token)) -> u32 {
|
||||||
let mut num_tokens_pushed = 0u32;
|
let mut num_tokens_pushed = 0u32;
|
||||||
while self.advance() {
|
while self.advance() {
|
||||||
@@ -247,7 +253,10 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/// Trait for the pluggable components of `Tokenizer`s.
|
||||||
pub trait TokenFilter<TailTokenStream: TokenStream>: Clone {
|
pub trait TokenFilter<TailTokenStream: TokenStream>: Clone {
|
||||||
|
/// The resulting `TokenStream` type.
|
||||||
type ResultTokenStream: TokenStream;
|
type ResultTokenStream: TokenStream;
|
||||||
|
|
||||||
/// Wraps a token stream and returns the modified one.
|
/// Wraps a token stream and returns the modified one.
|
||||||
|
|||||||
@@ -7,7 +7,6 @@ use tokenizer::RawTokenizer;
|
|||||||
use tokenizer::SimpleTokenizer;
|
use tokenizer::SimpleTokenizer;
|
||||||
use tokenizer::JapaneseTokenizer;
|
use tokenizer::JapaneseTokenizer;
|
||||||
use tokenizer::RemoveLongFilter;
|
use tokenizer::RemoveLongFilter;
|
||||||
use tokenizer::AlphaNumOnlyFilter;
|
|
||||||
use tokenizer::LowerCaser;
|
use tokenizer::LowerCaser;
|
||||||
use tokenizer::Stemmer;
|
use tokenizer::Stemmer;
|
||||||
|
|
||||||
@@ -29,6 +28,7 @@ pub struct TokenizerManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl TokenizerManager {
|
impl TokenizerManager {
|
||||||
|
/// Registers a new tokenizer associated with a given name.
|
||||||
pub fn register<A>(&self, tokenizer_name: &str, tokenizer: A)
|
pub fn register<A>(&self, tokenizer_name: &str, tokenizer: A)
|
||||||
where
|
where
|
||||||
A: 'static + Send + Sync + for<'a> Tokenizer<'a>,
|
A: 'static + Send + Sync + for<'a> Tokenizer<'a>,
|
||||||
@@ -40,6 +40,7 @@ impl TokenizerManager {
|
|||||||
.insert(tokenizer_name.to_string(), boxed_tokenizer);
|
.insert(tokenizer_name.to_string(), boxed_tokenizer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Accessing a tokenizer given its name.
|
||||||
pub fn get(&self, tokenizer_name: &str) -> Option<Box<BoxedTokenizer>> {
|
pub fn get(&self, tokenizer_name: &str) -> Option<Box<BoxedTokenizer>> {
|
||||||
self.tokenizers
|
self.tokenizers
|
||||||
.read()
|
.read()
|
||||||
@@ -71,7 +72,6 @@ impl Default for TokenizerManager {
|
|||||||
SimpleTokenizer
|
SimpleTokenizer
|
||||||
.filter(RemoveLongFilter::limit(40))
|
.filter(RemoveLongFilter::limit(40))
|
||||||
.filter(LowerCaser)
|
.filter(LowerCaser)
|
||||||
.filter(AlphaNumOnlyFilter)
|
|
||||||
.filter(Stemmer::new()),
|
.filter(Stemmer::new()),
|
||||||
);
|
);
|
||||||
manager.register("ja", JapaneseTokenizer.filter(RemoveLongFilter::limit(40)));
|
manager.register("ja", JapaneseTokenizer.filter(RemoveLongFilter::limit(40)));
|
||||||
|
|||||||
Reference in New Issue
Block a user