Compare commits

...

16 Commits

Author SHA1 Message Date
Paul Masurel
72bd0cfd4a Closes #708
Fixes a race condition in the test.
2019-11-21 10:30:10 +09:00
Paul Masurel
1868fc1e2c Text fix 2019-11-20 23:00:39 +09:00
Paul Masurel
451a0252ab thread pool merge (#704) 2019-11-20 21:18:05 +09:00
Paul Masurel
42756c7474 Removing futures-cpupool and upgrading to futures-0.3 2019-11-15 18:35:31 +09:00
Paul Masurel
598b076240 Making some of the IndexWriter's method public. 2019-11-11 12:41:45 +09:00
Paul Masurel
f1f96fc417 Updating some doc. 2019-11-11 10:04:12 +09:00
Paul Masurel
9c941603f5 Petr tik n662 errror incompatible footer version (#696)
* code tidy-up

Replace `20` magic constant with COMMON_FOOTER_SIZE

Add a docstring showing how footer is serialised
Add a test for footer length checking

* Add more tests for VersionedFooter

successful and panicking .to_bytes() calls

* Minor changes in footer.rs
2019-11-10 14:40:06 +09:00
Paul Masurel
fb3d6fa332 Adding Value::From<PretokenizedText> (#697) 2019-11-10 14:39:44 +09:00
Paul Masurel
88fd7f091a SegmentUpdater.add_segment does not need to return true (#693) 2019-11-09 21:18:51 +09:00
Jacob Brown
6e4fdfd4bf replace scoped_pool (#685) 2019-11-07 10:26:08 +09:00
kkoziara
0519056bd8 Added handling of pre-tokenized text fields (#642). (#669)
* Added handling of pre-tokenized text fields (#642).

* * Updated changelog and examples concerning #642.
* Added tokenized_text method to Value implementation.
* Implemented From<TokenizedString> for TokenizedStream.

* * Removed tokenized flag from TextOptions and code reliance on the flag.
* Changed naming to use word "pre-tokenized" instead of "tokenized".
* Updated example code.
* Fixed comments.

* Minor code refactoring. Test improvements.
2019-11-07 10:10:56 +09:00
dependabot-preview[bot]
7305ad575e Update smallvec requirement from 0.6 to 1.0 (#686)
Updates the requirements on [smallvec](https://github.com/servo/rust-smallvec) to permit the latest version.
- [Release notes](https://github.com/servo/rust-smallvec/releases)
- [Commits](https://github.com/servo/rust-smallvec/compare/v0.6.0...v1.0.0)

Signed-off-by: dependabot-preview[bot] <support@dependabot.com>
2019-11-07 09:55:33 +09:00
Paul Masurel
79f64ac2f4 Create FUNDING.yml 2019-11-05 16:26:12 +09:00
Paul Masurel
67bce6cbf2 Fixing the construction of the DeleteBitset. (#683)
Closes #681
2019-11-04 15:39:11 +09:00
xiaoniu-578fa6bff964d005
e5316a4388 Reduce unnecessary clone. (#684) 2019-11-04 13:57:59 +09:00
Mathias Svensson
6a8a8557d2 Use slice::iter instead of into_iter to avoid future breakage (#679)
* Use `slice::iter` instead of `into_iter` to avoid future breakage

`an_array.into_iter()` currently just works because of the autoref
feature, which then calls `<[T] as IntoIterator>::into_iter`. But
in the future, arrays will implement `IntoIterator`, too. In order
to avoid problems in the future, the call is replaced by `iter()`
which is shorter and more explicit.

* cargo fmt
2019-10-31 20:59:50 +09:00
36 changed files with 1435 additions and 615 deletions

12
.github/FUNDING.yml vendored Normal file
View File

@@ -0,0 +1,12 @@
# These are supported funding model platforms
github: fulmicoton
patreon: # Replace with a single Patreon username
open_collective: # Replace with a single Open Collective username
ko_fi: # Replace with a single Ko-fi username
tidelift: # Replace with a single Tidelift platform-name/package-name e.g., npm/babel
community_bridge: # Replace with a single Community Bridge project-name e.g., cloud-foundry
liberapay: # Replace with a single Liberapay username
issuehunt: # Replace with a single IssueHunt username
otechie: # Replace with a single Otechie username
custom: # Replace with up to 4 custom sponsorship URLs e.g., ['link1', 'link2']

View File

@@ -10,7 +10,10 @@ Tantivy 0.11.0
- Avoid rebuilding Regex automaton whenever a regex query is reused. #639 (@brainlock)
- Add footer with some metadata to index files. #605 (@fdb-hiroshima)
- TopDocs collector: ensure stable sorting on equal score. #671 (@brainlock)
- Added handling of pre-tokenized text fields (#642), which will enable users to
load tokens created outside tantivy. See usage in examples/pre_tokenized_text. (@kkoziara)
- Fix crash when committing multiple times with deleted documents. #681 (@brainlock)
## How to update?
- `Box<dyn BoxableTokenizer>` has been replaced by a `BoxedTokenizer` struct.

View File

@@ -36,24 +36,23 @@ notify = {version="4", optional=true}
bit-set = "0.5"
uuid = { version = "0.8", features = ["v4", "serde"] }
crossbeam = "0.7"
futures = "0.1"
futures-cpupool = "0.1"
futures = {version = "0.3", features=["thread-pool"] }
owning_ref = "0.4"
stable_deref_trait = "1.0.0"
rust-stemmers = "1.1"
downcast-rs = { version="1.0" }
tantivy-query-grammar = { path="./query-grammar" }
bitpacking = {version="0.8", default-features = false, features=["bitpacker4x"]}
census = "0.2"
census = "0.3"
fnv = "1.0.6"
owned-read = "0.4"
failure = "0.1"
htmlescape = "0.3.1"
fail = "0.3"
scoped-pool = "1.0"
murmurhash32 = "0.2"
chrono = "0.4"
smallvec = "0.6"
smallvec = "1.0"
rayon = "1"
[target.'cfg(windows)'.dependencies]
winapi = "0.3"
@@ -64,6 +63,10 @@ maplit = "1"
matches = "0.1.8"
time = "0.1.42"
[dev-dependencies.fail]
version = "0.3"
features = ["failpoints"]
[profile.release]
opt-level = 3
debug = false
@@ -87,10 +90,6 @@ members = ["query-grammar"]
[badges]
travis-ci = { repository = "tantivy-search/tantivy" }
[dev-dependencies.fail]
version = "0.3"
features = ["failpoints"]
# Following the "fail" crate best practises, we isolate
# tests that define specific behavior in fail check points
# in a different binary.

View File

@@ -0,0 +1,138 @@
// # Pre-tokenized text example
//
// This example shows how to use pre-tokenized text. Sometimes yout might
// want to index and search through text which is already split into
// tokens by some external tool.
//
// In this example we will:
// - use tantivy tokenizer to create tokens and load them directly into tantivy,
// - import tokenized text straight from json,
// - perform a search on documents with pre-tokenized text
use tantivy::tokenizer::{PreTokenizedString, SimpleTokenizer, Token, TokenStream, Tokenizer};
use tantivy::collector::{Count, TopDocs};
use tantivy::query::TermQuery;
use tantivy::schema::*;
use tantivy::{doc, Index, ReloadPolicy};
use tempfile::TempDir;
fn pre_tokenize_text(text: &str) -> Vec<Token> {
let mut token_stream = SimpleTokenizer.token_stream(text);
let mut tokens = vec![];
while token_stream.advance() {
tokens.push(token_stream.token().clone());
}
tokens
}
fn main() -> tantivy::Result<()> {
let index_path = TempDir::new()?;
let mut schema_builder = Schema::builder();
schema_builder.add_text_field("title", TEXT | STORED);
schema_builder.add_text_field("body", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_dir(&index_path, schema.clone())?;
let mut index_writer = index.writer(50_000_000)?;
// We can create a document manually, by setting the fields
// one by one in a Document object.
let title = schema.get_field("title").unwrap();
let body = schema.get_field("body").unwrap();
let title_text = "The Old Man and the Sea";
let body_text = "He was an old man who fished alone in a skiff in the Gulf Stream";
// Content of our first document
// We create `PreTokenizedString` which contains original text and vector of tokens
let title_tok = PreTokenizedString {
text: String::from(title_text),
tokens: pre_tokenize_text(title_text),
};
println!(
"Original text: \"{}\" and tokens: {:?}",
title_tok.text, title_tok.tokens
);
let body_tok = PreTokenizedString {
text: String::from(body_text),
tokens: pre_tokenize_text(body_text),
};
// Now lets create a document and add our `PreTokenizedString` using
// `add_pre_tokenized_text` method of `Document`
let old_man_doc = doc!(title => title_tok, body => body_tok);
// ... now let's just add it to the IndexWriter
index_writer.add_document(old_man_doc);
// Pretokenized text can also be fed as JSON
let short_man_json = r#"{
"title":[{
"text":"The Old Man",
"tokens":[
{"offset_from":0,"offset_to":3,"position":0,"text":"The","position_length":1},
{"offset_from":4,"offset_to":7,"position":1,"text":"Old","position_length":1},
{"offset_from":8,"offset_to":11,"position":2,"text":"Man","position_length":1}
]
}]
}"#;
let short_man_doc = schema.parse_document(&short_man_json)?;
index_writer.add_document(short_man_doc);
// Let's commit changes
index_writer.commit()?;
// ... and now is the time to query our index
let reader = index
.reader_builder()
.reload_policy(ReloadPolicy::OnCommit)
.try_into()?;
let searcher = reader.searcher();
// We want to get documents with token "Man", we will use TermQuery to do it
// Using PreTokenizedString means the tokens are stored as is avoiding stemming
// and lowercasing, which preserves full words in their original form
let query = TermQuery::new(
Term::from_field_text(title, "Man"),
IndexRecordOption::Basic,
);
let (top_docs, count) = searcher
.search(&query, &(TopDocs::with_limit(2), Count))
.unwrap();
assert_eq!(count, 2);
for (_score, doc_address) in top_docs {
let retrieved_doc = searcher.doc(doc_address)?;
println!("Document: {}", schema.to_json(&retrieved_doc));
}
// In contrary to the previous query, when we search for the "man" term we
// should get no results, as it's not one of the indexed tokens. SimpleTokenizer
// only splits text on whitespace / punctuation.
let query = TermQuery::new(
Term::from_field_text(title, "man"),
IndexRecordOption::Basic,
);
let (_top_docs, count) = searcher
.search(&query, &(TopDocs::with_limit(2), Count))
.unwrap();
assert_eq!(count, 0);
Ok(())
}

View File

@@ -1,6 +1,6 @@
use crate::Result;
use crossbeam::channel;
use scoped_pool::{Pool, ThreadConfig};
use rayon::{ThreadPool, ThreadPoolBuilder};
/// Search executor whether search request are single thread or multithread.
///
@@ -11,7 +11,7 @@ use scoped_pool::{Pool, ThreadConfig};
/// used by the client. Second, we may stop using rayon in the future.
pub enum Executor {
SingleThread,
ThreadPool(Pool),
ThreadPool(ThreadPool),
}
impl Executor {
@@ -21,10 +21,12 @@ impl Executor {
}
// Creates an Executor that dispatches the tasks in a thread pool.
pub fn multi_thread(num_threads: usize, prefix: &'static str) -> Executor {
let thread_config = ThreadConfig::new().prefix(prefix);
let pool = Pool::with_thread_config(num_threads, thread_config);
Executor::ThreadPool(pool)
pub fn multi_thread(num_threads: usize, prefix: &'static str) -> Result<Executor> {
let pool = ThreadPoolBuilder::new()
.num_threads(num_threads)
.thread_name(move |num| format!("{}{}", prefix, num))
.build()?;
Ok(Executor::ThreadPool(pool))
}
// Perform a map in the thread pool.
@@ -48,9 +50,9 @@ impl Executor {
let num_fruits = args_with_indices.len();
let fruit_receiver = {
let (fruit_sender, fruit_receiver) = channel::unbounded();
pool.scoped(|scope| {
pool.scope(|scope| {
for arg_with_idx in args_with_indices {
scope.execute(|| {
scope.spawn(|_| {
let (idx, arg) = arg_with_idx;
let fruit = f(arg);
if let Err(err) = fruit_sender.send((idx, fruit)) {
@@ -103,6 +105,7 @@ mod tests {
#[should_panic] //< unfortunately the panic message is not propagated
fn test_panic_propagates_multi_thread() {
let _result: Vec<usize> = Executor::multi_thread(1, "search-test")
.unwrap()
.map(
|_| {
panic!("panic should propagate");
@@ -126,6 +129,7 @@ mod tests {
#[test]
fn test_map_multithread() {
let result: Vec<usize> = Executor::multi_thread(3, "search-test")
.unwrap()
.map(|i| Ok(i * 2), 0..10)
.unwrap();
assert_eq!(result.len(), 10);

View File

@@ -73,15 +73,16 @@ impl Index {
/// Replace the default single thread search executor pool
/// by a thread pool with a given number of threads.
pub fn set_multithread_executor(&mut self, num_threads: usize) {
self.executor = Arc::new(Executor::multi_thread(num_threads, "thrd-tantivy-search-"));
pub fn set_multithread_executor(&mut self, num_threads: usize) -> Result<()> {
self.executor = Arc::new(Executor::multi_thread(num_threads, "thrd-tantivy-search-")?);
Ok(())
}
/// Replace the default single thread search executor pool
/// by a thread pool with a given number of threads.
pub fn set_default_multithread_executor(&mut self) {
pub fn set_default_multithread_executor(&mut self) -> Result<()> {
let default_num_threads = num_cpus::get();
self.set_multithread_executor(default_num_threads);
self.set_multithread_executor(default_num_threads)
}
/// Creates a new index using the `RAMDirectory`.
@@ -103,23 +104,21 @@ impl Index {
if Index::exists(&mmap_directory) {
return Err(TantivyError::IndexAlreadyExists);
}
Index::create(mmap_directory, schema)
}
/// Opens or creates a new index in the provided directory
pub fn open_or_create<Dir: Directory>(dir: Dir, schema: Schema) -> Result<Index> {
if Index::exists(&dir) {
let index = Index::open(dir)?;
if index.schema() == schema {
Ok(index)
} else {
Err(TantivyError::SchemaError(
"An index exists but the schema does not match.".to_string(),
))
}
if !Index::exists(&dir) {
return Index::create(dir, schema);
}
let index = Index::open(dir)?;
if index.schema() == schema {
Ok(index)
} else {
Index::create(dir, schema)
Err(TantivyError::SchemaError(
"An index exists but the schema does not match.".to_string(),
))
}
}
@@ -387,12 +386,9 @@ mod tests {
use crate::directory::RAMDirectory;
use crate::schema::Field;
use crate::schema::{Schema, INDEXED, TEXT};
use crate::Index;
use crate::IndexReader;
use crate::IndexWriter;
use crate::ReloadPolicy;
use std::thread;
use std::time::Duration;
use crate::{Directory, Index};
#[test]
fn test_indexer_for_field() {
@@ -470,14 +466,14 @@ mod tests {
.try_into()
.unwrap();
assert_eq!(reader.searcher().num_docs(), 0);
let mut writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
test_index_on_commit_reload_policy_aux(field, &mut writer, &reader);
test_index_on_commit_reload_policy_aux(field, &index, &reader);
}
#[cfg(feature = "mmap")]
mod mmap_specific {
use super::*;
use crate::Directory;
use std::path::PathBuf;
use tempfile::TempDir;
@@ -488,22 +484,20 @@ mod tests {
let tempdir = TempDir::new().unwrap();
let tempdir_path = PathBuf::from(tempdir.path());
let index = Index::create_in_dir(&tempdir_path, schema).unwrap();
let mut writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
writer.commit().unwrap();
let reader = index
.reader_builder()
.reload_policy(ReloadPolicy::OnCommit)
.try_into()
.unwrap();
assert_eq!(reader.searcher().num_docs(), 0);
test_index_on_commit_reload_policy_aux(field, &mut writer, &reader);
test_index_on_commit_reload_policy_aux(field, &index, &reader);
}
#[test]
fn test_index_manual_policy_mmap() {
let schema = throw_away_schema();
let field = schema.get_field("num_likes").unwrap();
let index = Index::create_from_tempdir(schema).unwrap();
let mut index = Index::create_from_tempdir(schema).unwrap();
let mut writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
writer.commit().unwrap();
let reader = index
@@ -513,8 +507,12 @@ mod tests {
.unwrap();
assert_eq!(reader.searcher().num_docs(), 0);
writer.add_document(doc!(field=>1u64));
let (sender, receiver) = crossbeam::channel::unbounded();
let _handle = index.directory_mut().watch(Box::new(move || {
let _ = sender.send(());
}));
writer.commit().unwrap();
thread::sleep(Duration::from_millis(500));
assert!(receiver.recv().is_ok());
assert_eq!(reader.searcher().num_docs(), 0);
reader.reload().unwrap();
assert_eq!(reader.searcher().num_docs(), 1);
@@ -534,39 +532,26 @@ mod tests {
.try_into()
.unwrap();
assert_eq!(reader.searcher().num_docs(), 0);
let mut writer = write_index.writer_with_num_threads(1, 3_000_000).unwrap();
test_index_on_commit_reload_policy_aux(field, &mut writer, &reader);
test_index_on_commit_reload_policy_aux(field, &write_index, &reader);
}
}
fn test_index_on_commit_reload_policy_aux(
field: Field,
writer: &mut IndexWriter,
reader: &IndexReader,
) {
fn test_index_on_commit_reload_policy_aux(field: Field, index: &Index, reader: &IndexReader) {
let mut reader_index = reader.index();
let (sender, receiver) = crossbeam::channel::unbounded();
let _watch_handle = reader_index.directory_mut().watch(Box::new(move || {
let _ = sender.send(());
}));
let mut writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
assert_eq!(reader.searcher().num_docs(), 0);
writer.add_document(doc!(field=>1u64));
writer.commit().unwrap();
let mut count = 0;
for _ in 0..100 {
count = reader.searcher().num_docs();
if count > 0 {
break;
}
thread::sleep(Duration::from_millis(100));
}
assert_eq!(count, 1);
assert!(receiver.recv().is_ok());
assert_eq!(reader.searcher().num_docs(), 1);
writer.add_document(doc!(field=>2u64));
writer.commit().unwrap();
let mut count = 0;
for _ in 0..10 {
count = reader.searcher().num_docs();
if count > 1 {
break;
}
thread::sleep(Duration::from_millis(100));
}
assert_eq!(count, 2);
assert!(receiver.recv().is_ok());
assert_eq!(reader.searcher().num_docs(), 2);
}
// This test will not pass on windows, because windows
@@ -583,9 +568,13 @@ mod tests {
for i in 0u64..8_000u64 {
writer.add_document(doc!(field => i));
}
let (sender, receiver) = crossbeam::channel::unbounded();
let _handle = directory.watch(Box::new(move || {
let _ = sender.send(());
}));
writer.commit().unwrap();
let mem_right_after_commit = directory.total_mem_usage();
thread::sleep(Duration::from_millis(1_000));
assert!(receiver.recv().is_ok());
let reader = index
.reader_builder()
.reload_policy(ReloadPolicy::Manual)
@@ -599,6 +588,11 @@ mod tests {
reader.reload().unwrap();
let searcher = reader.searcher();
assert_eq!(searcher.num_docs(), 8_000);
assert!(mem_right_after_merge_finished < mem_right_after_commit);
assert!(
mem_right_after_merge_finished < mem_right_after_commit,
"(mem after merge){} is expected < (mem before merge){}",
mem_right_after_merge_finished,
mem_right_after_commit
);
}
}

View File

@@ -150,6 +150,21 @@ impl SegmentMeta {
self.num_deleted_docs() > 0
}
/// Updates the max_doc value from the `SegmentMeta`.
///
/// This method is only used when updating `max_doc` from 0
/// as we finalize a fresh new segment.
pub(crate) fn with_max_doc(self, max_doc: u32) -> SegmentMeta {
assert_eq!(self.tracked.max_doc, 0);
assert!(self.tracked.deletes.is_none());
let tracked = self.tracked.map(move |inner_meta| InnerSegmentMeta {
segment_id: inner_meta.segment_id,
max_doc,
deletes: None,
});
SegmentMeta { tracked }
}
#[doc(hidden)]
pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: Opstamp) -> SegmentMeta {
let delete_meta = DeleteMeta {
@@ -285,6 +300,9 @@ mod tests {
payload: None,
};
let json = serde_json::ser::to_string(&index_metas).expect("serialization failed");
assert_eq!(json, r#"{"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","tokenizer":"default"},"stored":false}}],"opstamp":0}"#);
assert_eq!(
json,
r#"{"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","tokenizer":"default"},"stored":false}}],"opstamp":0}"#
);
}
}

View File

@@ -50,6 +50,17 @@ impl Segment {
&self.meta
}
/// Updates the max_doc value from the `SegmentMeta`.
///
/// This method is only used when updating `max_doc` from 0
/// as we finalize a fresh new segment.
pub(crate) fn with_max_doc(self, max_doc: u32) -> Segment {
Segment {
index: self.index,
meta: self.meta.with_max_doc(max_doc),
}
}
#[doc(hidden)]
pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: Opstamp) -> Segment {
Segment {

View File

@@ -7,6 +7,8 @@ use std::io::Write;
const COMMON_FOOTER_SIZE: usize = 4 * 5;
type CrcHashU32 = u32;
#[derive(Debug, Clone, PartialEq)]
pub struct Footer {
pub tantivy_version: (u32, u32, u32),
@@ -24,7 +26,7 @@ impl Footer {
Footer {
tantivy_version,
meta: format!(
"tantivy {}.{}.{}, index v{}",
"tantivy v{}.{}.{}, index_format v{}",
tantivy_version.0,
tantivy_version.1,
tantivy_version.2,
@@ -34,6 +36,9 @@ impl Footer {
}
}
/// Serialises the footer to a byte-array
/// [ versioned_footer | meta | common_footer ]
/// [ 0..8 | 8..32 | 32..52 ]
pub fn to_bytes(&self) -> Vec<u8> {
let mut res = self.versioned_footer.to_bytes();
res.extend_from_slice(self.meta.as_bytes());
@@ -64,20 +69,27 @@ impl Footer {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
format!(
"File corrupted. The footer len is {}, while the entire file len is {}",
"The footer len is {}, while the entire file len is {}. \
Your index is either corrupted or was built using a tantivy version\
anterior to 0.11.",
size, len
),
));
}
let footer = &data[len - size as usize..];
let meta_len = LittleEndian::read_u32(&footer[size - 20..]) as usize;
let meta_len = LittleEndian::read_u32(&footer[size - COMMON_FOOTER_SIZE..]) as usize;
let tantivy_major = LittleEndian::read_u32(&footer[size - 16..]);
let tantivy_minor = LittleEndian::read_u32(&footer[size - 12..]);
let tantivy_patch = LittleEndian::read_u32(&footer[size - 8..]);
Ok(Footer {
tantivy_version: (tantivy_major, tantivy_minor, tantivy_patch),
meta: String::from_utf8_lossy(&footer[size - meta_len - 20..size - 20]).into_owned(),
versioned_footer: VersionedFooter::from_bytes(&footer[..size - meta_len - 20])?,
meta: String::from_utf8_lossy(
&footer[size - meta_len - COMMON_FOOTER_SIZE..size - COMMON_FOOTER_SIZE],
)
.into_owned(),
versioned_footer: VersionedFooter::from_bytes(
&footer[..size - meta_len - COMMON_FOOTER_SIZE],
)?,
})
}
@@ -88,24 +100,28 @@ impl Footer {
}
pub fn size(&self) -> usize {
self.versioned_footer.size() as usize + self.meta.len() + 20
self.versioned_footer.size() as usize + self.meta.len() + COMMON_FOOTER_SIZE
}
}
/// Footer that includes a crc32 hash that enables us to checksum files in the index
#[derive(Debug, Clone, PartialEq)]
pub enum VersionedFooter {
UnknownVersion { version: u32, size: u32 },
V0(u32), // crc
V0(CrcHashU32), // crc
}
impl VersionedFooter {
/// Serializes a valid `VersionedFooter` or panics if the version is unknown
/// [ version | crc_hash ]
/// [ 0..4 | 4..8 ]
pub fn to_bytes(&self) -> Vec<u8> {
match self {
VersionedFooter::V0(crc) => {
let mut res = vec![0; 8];
LittleEndian::write_u32(&mut res, 0);
LittleEndian::write_u32(&mut res[4..], *crc);
res
let mut buf = [0u8; 8];
LittleEndian::write_u32(&mut buf[0..4], 0);
LittleEndian::write_u32(&mut buf[4..8], *crc);
buf.to_vec()
}
VersionedFooter::UnknownVersion { .. } => {
panic!("Unsupported index should never get serialized");
@@ -115,20 +131,26 @@ impl VersionedFooter {
pub fn from_bytes(footer: &[u8]) -> Result<Self, io::Error> {
assert!(footer.len() >= 4);
if footer.len() < 4 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Footer should be more than 4 bytes.",
));
}
let version = LittleEndian::read_u32(footer);
match version {
// the first 4 bytes should be zeroed out thus returning a `0`
0 => {
if footer.len() == 8 {
Ok(VersionedFooter::V0(LittleEndian::read_u32(&footer[4..])))
} else {
Err(io::Error::new(
if footer.len() != 8 {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
format!(
"File corrupted. The versioned footer len is {}, while it should be 8",
footer.len()
),
))
));
}
Ok(VersionedFooter::V0(LittleEndian::read_u32(&footer[4..])))
}
version => Ok(VersionedFooter::UnknownVersion {
version,
@@ -151,7 +173,7 @@ impl VersionedFooter {
}
}
pub fn crc(&self) -> Option<u32> {
pub fn crc(&self) -> Option<CrcHashU32> {
match self {
VersionedFooter::V0(crc) => Some(*crc),
VersionedFooter::UnknownVersion { .. } => None,
@@ -190,7 +212,6 @@ impl<W: TerminatingWrite> Write for FooterProxy<W> {
impl<W: TerminatingWrite> TerminatingWrite for FooterProxy<W> {
fn terminate_ref(&mut self, _: AntiCallToken) -> io::Result<()> {
let crc = self.hasher.take().unwrap().finalize();
let footer = Footer::new(VersionedFooter::V0(crc)).to_bytes();
let mut writer = self.writer.take().unwrap();
writer.write_all(&footer)?;
@@ -200,14 +221,57 @@ impl<W: TerminatingWrite> TerminatingWrite for FooterProxy<W> {
#[cfg(test)]
mod tests {
use crate::directory::footer::{Footer, VersionedFooter};
use regex::Regex;
#[test]
fn test_serialize_deserialize_footer() {
let crc = 123456;
let footer = Footer::new(VersionedFooter::V0(crc));
let footer_bytes = footer.to_bytes();
assert_eq!(Footer::from_bytes(&footer_bytes).unwrap(), footer);
}
#[test]
fn footer_length() {
// test to make sure the ascii art in the doc-strings is correct
let crc = 1111111 as u32;
let versioned_footer = VersionedFooter::V0(crc);
assert_eq!(versioned_footer.size(), 8);
let footer = Footer::new(versioned_footer);
let regex_ptn = Regex::new(
"tantivy v[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}\\.{0,10}, index_format v[0-9]{1,5}",
)
.unwrap();
assert!(regex_ptn.find(&footer.meta).is_some());
}
#[test]
fn versioned_footer_from_bytes() {
use byteorder::{ByteOrder, LittleEndian};
let v_footer_bytes = vec![0, 0, 0, 0, 12, 35, 89, 18];
let versioned_footer = VersionedFooter::from_bytes(&v_footer_bytes).unwrap();
let expected_versioned_footer =
VersionedFooter::V0(LittleEndian::read_u32(&[12, 35, 89, 18]));
assert_eq!(versioned_footer, expected_versioned_footer);
assert_eq!(versioned_footer.to_bytes(), v_footer_bytes);
}
#[should_panic(expected = "Unsupported index should never get serialized")]
#[test]
fn versioned_footer_panic() {
use byteorder::{ByteOrder, LittleEndian};
let v_footer_bytes = vec![1; 8];
let versioned_footer = VersionedFooter::from_bytes(&v_footer_bytes).unwrap();
let expected_version = LittleEndian::read_u32(&[1, 1, 1, 1]);
let expected_versioned_footer = VersionedFooter::UnknownVersion {
version: expected_version,
size: v_footer_bytes.len() as u32,
};
assert_eq!(versioned_footer, expected_versioned_footer);
versioned_footer.to_bytes();
}
}

View File

@@ -2,6 +2,7 @@ use crate::core::MANAGED_FILEPATH;
use crate::directory::error::{DeleteError, IOError, LockError, OpenReadError, OpenWriteError};
use crate::directory::footer::{Footer, FooterProxy};
use crate::directory::DirectoryLock;
use crate::directory::GarbageCollectionResult;
use crate::directory::Lock;
use crate::directory::META_LOCK;
use crate::directory::{ReadOnlySource, WritePtr};
@@ -104,7 +105,10 @@ impl ManagedDirectory {
/// If a file cannot be deleted (for permission reasons for instance)
/// an error is simply logged, and the file remains in the list of managed
/// files.
pub fn garbage_collect<L: FnOnce() -> HashSet<PathBuf>>(&mut self, get_living_files: L) {
pub fn garbage_collect<L: FnOnce() -> HashSet<PathBuf>>(
&mut self,
get_living_files: L,
) -> crate::Result<GarbageCollectionResult> {
info!("Garbage collect");
let mut files_to_delete = vec![];
@@ -130,19 +134,25 @@ impl ManagedDirectory {
// 2) writer change meta.json (for instance after a merge or a commit)
// 3) gc kicks in.
// 4) gc removes a file that was useful for process B, before process B opened it.
if let Ok(_meta_lock) = self.acquire_lock(&META_LOCK) {
let living_files = get_living_files();
for managed_path in &meta_informations_rlock.managed_paths {
if !living_files.contains(managed_path) {
files_to_delete.push(managed_path.clone());
match self.acquire_lock(&META_LOCK) {
Ok(_meta_lock) => {
let living_files = get_living_files();
for managed_path in &meta_informations_rlock.managed_paths {
if !living_files.contains(managed_path) {
files_to_delete.push(managed_path.clone());
}
}
}
} else {
error!("Failed to acquire lock for GC");
Err(err) => {
error!("Failed to acquire lock for GC");
return Err(crate::Error::from(err));
}
}
}
let mut failed_to_delete_files = vec![];
let mut deleted_files = vec![];
for file_to_delete in files_to_delete {
match self.delete(&file_to_delete) {
Ok(_) => {
@@ -152,9 +162,10 @@ impl ManagedDirectory {
Err(file_error) => {
match file_error {
DeleteError::FileDoesNotExist(_) => {
deleted_files.push(file_to_delete);
deleted_files.push(file_to_delete.clone());
}
DeleteError::IOError(_) => {
failed_to_delete_files.push(file_to_delete.clone());
if !cfg!(target_os = "windows") {
// On windows, delete is expected to fail if the file
// is mmapped.
@@ -177,10 +188,13 @@ impl ManagedDirectory {
for delete_file in &deleted_files {
managed_paths_write.remove(delete_file);
}
if save_managed_paths(self.directory.as_mut(), &meta_informations_wlock).is_err() {
error!("Failed to save the list of managed files.");
}
save_managed_paths(self.directory.as_mut(), &meta_informations_wlock)?;
}
Ok(GarbageCollectionResult {
deleted_files,
failed_to_delete_files,
})
}
/// Registers a file as managed
@@ -327,9 +341,8 @@ mod tests_mmap_specific {
.unwrap();
assert!(managed_directory.exists(test_path1));
assert!(managed_directory.exists(test_path2));
let living_files: HashSet<PathBuf> =
[test_path1.to_owned()].into_iter().cloned().collect();
managed_directory.garbage_collect(|| living_files);
let living_files: HashSet<PathBuf> = [test_path1.to_owned()].iter().cloned().collect();
assert!(managed_directory.garbage_collect(|| living_files).is_ok());
assert!(managed_directory.exists(test_path1));
assert!(!managed_directory.exists(test_path2));
}
@@ -339,7 +352,7 @@ mod tests_mmap_specific {
assert!(managed_directory.exists(test_path1));
assert!(!managed_directory.exists(test_path2));
let living_files: HashSet<PathBuf> = HashSet::new();
managed_directory.garbage_collect(|| living_files);
assert!(managed_directory.garbage_collect(|| living_files).is_ok());
assert!(!managed_directory.exists(test_path1));
assert!(!managed_directory.exists(test_path2));
}
@@ -361,7 +374,9 @@ mod tests_mmap_specific {
assert!(managed_directory.exists(test_path1));
let _mmap_read = managed_directory.open_read(test_path1).unwrap();
managed_directory.garbage_collect(|| living_files.clone());
assert!(managed_directory
.garbage_collect(|| living_files.clone())
.is_ok());
if cfg!(target_os = "windows") {
// On Windows, gc should try and fail the file as it is mmapped.
assert!(managed_directory.exists(test_path1));
@@ -369,7 +384,7 @@ mod tests_mmap_specific {
drop(_mmap_read);
// The file should still be in the list of managed file and
// eventually be deleted once mmap is released.
managed_directory.garbage_collect(|| living_files);
assert!(managed_directory.garbage_collect(|| living_files).is_ok());
assert!(!managed_directory.exists(test_path1));
} else {
assert!(!managed_directory.exists(test_path1));

View File

@@ -174,7 +174,7 @@ impl WatcherWrapper {
// We might want to be more accurate than this at one point.
if let Some(filename) = changed_path.file_name() {
if filename == *META_FILEPATH {
watcher_router_clone.broadcast();
let _ = watcher_router_clone.broadcast();
}
}
}
@@ -538,16 +538,15 @@ mod tests {
// The following tests are specific to the MmapDirectory
use super::*;
use crate::indexer::LogMergePolicy;
use crate::schema::{Schema, SchemaBuilder, TEXT};
use crate::Index;
use crate::ReloadPolicy;
use std::fs;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use std::time::Duration;
#[test]
fn test_open_non_existant_path() {
fn test_open_non_existent_path() {
assert!(MmapDirectory::open(PathBuf::from("./nowhere")).is_err());
}
@@ -640,13 +639,18 @@ mod tests {
let tmp_dir = tempfile::TempDir::new().unwrap();
let tmp_dirpath = tmp_dir.path().to_owned();
let mut watch_wrapper = WatcherWrapper::new(&tmp_dirpath).unwrap();
let tmp_file = tmp_dirpath.join("coucou");
let tmp_file = tmp_dirpath.join(*META_FILEPATH);
let _handle = watch_wrapper.watch(Box::new(move || {
counter_clone.fetch_add(1, Ordering::SeqCst);
}));
let (sender, receiver) = crossbeam::channel::unbounded();
let _handle2 = watch_wrapper.watch(Box::new(move || {
let _ = sender.send(());
}));
assert_eq!(counter.load(Ordering::SeqCst), 0);
fs::write(&tmp_file, b"whateverwilldo").unwrap();
thread::sleep(Duration::new(0, 1_000u32));
assert!(receiver.recv().is_ok());
assert!(counter.load(Ordering::SeqCst) >= 1);
}
#[test]
@@ -655,34 +659,42 @@ mod tests {
let mut schema_builder: SchemaBuilder = Schema::builder();
let text_field = schema_builder.add_text_field("text", TEXT);
let schema = schema_builder.build();
{
let index = Index::create(mmap_directory.clone(), schema).unwrap();
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
for _num_commits in 0..16 {
let mut log_merge_policy = LogMergePolicy::default();
log_merge_policy.set_min_merge_size(3);
index_writer.set_merge_policy(Box::new(log_merge_policy));
for _num_commits in 0..10 {
for _ in 0..10 {
index_writer.add_document(doc!(text_field=>"abc"));
}
index_writer.commit().unwrap();
}
let reader = index
.reader_builder()
.reload_policy(ReloadPolicy::Manual)
.try_into()
.unwrap();
for _ in 0..30 {
for _ in 0..4 {
index_writer.add_document(doc!(text_field=>"abc"));
index_writer.commit().unwrap();
reader.reload().unwrap();
}
index_writer.wait_merging_threads().unwrap();
reader.reload().unwrap();
let num_segments = reader.searcher().segment_readers().len();
assert_eq!(num_segments, 4);
assert!(num_segments <= 4);
assert_eq!(
num_segments * 7,
mmap_directory.get_cache_info().mmapped.len()
);
}
assert_eq!(mmap_directory.get_cache_info().mmapped.len(), 0);
assert!(mmap_directory.get_cache_info().mmapped.is_empty());
}
}

View File

@@ -26,6 +26,21 @@ pub use self::read_only_source::ReadOnlySource;
pub(crate) use self::watch_event_router::WatchCallbackList;
pub use self::watch_event_router::{WatchCallback, WatchHandle};
use std::io::{self, BufWriter, Write};
use std::path::PathBuf;
/// Outcome of the Garbage collection
pub struct GarbageCollectionResult {
/// List of files that were deleted in this cycle
pub deleted_files: Vec<PathBuf>,
/// List of files that were schedule to be deleted in this cycle,
/// but deletion did not work. This typically happens on windows,
/// as deleting a memory mapped file is forbidden.
///
/// If a searcher is still held, a file cannot be deleted.
/// This is not considered a bug, the file will simply be deleted
/// in the next GC.
pub failed_to_delete_files: Vec<PathBuf>,
}
#[cfg(feature = "mmap")]
pub use self::mmap_directory::MmapDirectory;

View File

@@ -195,7 +195,7 @@ impl Directory for RAMDirectory {
vec_writer.write_all(data)?;
vec_writer.flush()?;
if path == Path::new(&*META_FILEPATH) {
self.fs.write().unwrap().watch_router.broadcast();
let _ = self.fs.write().unwrap().watch_router.broadcast();
}
Ok(())
}

View File

@@ -1,25 +1,117 @@
use super::*;
use futures::channel::oneshot;
use futures::executor::block_on;
use std::io::Write;
use std::mem;
use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::Arc;
use std::thread;
use std::time;
use std::time::Duration;
#[test]
fn test_ram_directory() {
let mut ram_directory = RAMDirectory::create();
test_directory(&mut ram_directory);
#[cfg(feature = "mmap")]
mod mmap_directory_tests {
use crate::directory::MmapDirectory;
type DirectoryImpl = MmapDirectory;
fn make_directory() -> DirectoryImpl {
MmapDirectory::create_from_tempdir().unwrap()
}
#[test]
fn test_simple() {
let mut directory = make_directory();
super::test_simple(&mut directory);
}
#[test]
fn test_write_create_the_file() {
let mut directory = make_directory();
super::test_write_create_the_file(&mut directory);
}
#[test]
fn test_rewrite_forbidden() {
let mut directory = make_directory();
super::test_rewrite_forbidden(&mut directory);
}
#[test]
fn test_directory_delete() {
let mut directory = make_directory();
super::test_directory_delete(&mut directory);
}
#[test]
fn test_lock_non_blocking() {
let mut directory = make_directory();
super::test_lock_non_blocking(&mut directory);
}
#[test]
fn test_lock_blocking() {
let mut directory = make_directory();
super::test_lock_blocking(&mut directory);
}
#[test]
fn test_watch() {
let mut directory = make_directory();
super::test_watch(&mut directory);
}
}
#[test]
#[cfg(feature = "mmap")]
fn test_mmap_directory() {
let mut mmap_directory = MmapDirectory::create_from_tempdir().unwrap();
test_directory(&mut mmap_directory);
mod ram_directory_tests {
use crate::directory::RAMDirectory;
type DirectoryImpl = RAMDirectory;
fn make_directory() -> DirectoryImpl {
RAMDirectory::default()
}
#[test]
fn test_simple() {
let mut directory = make_directory();
super::test_simple(&mut directory);
}
#[test]
fn test_write_create_the_file() {
let mut directory = make_directory();
super::test_write_create_the_file(&mut directory);
}
#[test]
fn test_rewrite_forbidden() {
let mut directory = make_directory();
super::test_rewrite_forbidden(&mut directory);
}
#[test]
fn test_directory_delete() {
let mut directory = make_directory();
super::test_directory_delete(&mut directory);
}
#[test]
fn test_lock_non_blocking() {
let mut directory = make_directory();
super::test_lock_non_blocking(&mut directory);
}
#[test]
fn test_lock_blocking() {
let mut directory = make_directory();
super::test_lock_blocking(&mut directory);
}
#[test]
fn test_watch() {
let mut directory = make_directory();
super::test_watch(&mut directory);
}
}
#[test]
@@ -99,48 +191,39 @@ fn test_directory_delete(directory: &mut dyn Directory) {
assert!(directory.delete(&test_path).is_err());
}
fn test_directory(directory: &mut dyn Directory) {
test_simple(directory);
test_rewrite_forbidden(directory);
test_write_create_the_file(directory);
test_directory_delete(directory);
test_lock_non_blocking(directory);
test_lock_blocking(directory);
test_watch(directory);
}
fn test_watch(directory: &mut dyn Directory) {
let num_progress: Arc<AtomicUsize> = Default::default();
let counter: Arc<AtomicUsize> = Default::default();
let counter_clone = counter.clone();
let (sender, receiver) = crossbeam::channel::unbounded();
let watch_callback = Box::new(move || {
counter_clone.fetch_add(1, Ordering::SeqCst);
counter_clone.fetch_add(1, SeqCst);
});
assert!(directory
.atomic_write(Path::new("meta.json"), b"random_test_data")
.is_ok());
thread::sleep(Duration::new(0, 10_000));
assert_eq!(0, counter.load(Ordering::SeqCst));
// This callback is used to synchronize watching in our unit test.
// We bind it to a variable because the callback is removed when that
// handle is dropped.
let watch_handle = directory.watch(watch_callback).unwrap();
let _progress_listener = directory
.watch(Box::new(move || {
let val = num_progress.fetch_add(1, SeqCst);
let _ = sender.send(val);
}))
.unwrap();
for i in 0..10 {
assert_eq!(i, counter.load(Ordering::SeqCst));
assert_eq!(i, counter.load(SeqCst));
assert!(directory
.atomic_write(Path::new("meta.json"), b"random_test_data_2")
.is_ok());
for _ in 0..1_000 {
if counter.load(Ordering::SeqCst) > i {
break;
}
thread::sleep(Duration::from_millis(10));
}
assert_eq!(i + 1, counter.load(Ordering::SeqCst));
assert_eq!(receiver.recv_timeout(Duration::from_millis(500)), Ok(i));
assert_eq!(i + 1, counter.load(SeqCst));
}
mem::drop(watch_handle);
assert!(directory
.atomic_write(Path::new("meta.json"), b"random_test_data")
.is_ok());
thread::sleep(Duration::from_millis(200));
assert_eq!(10, counter.load(Ordering::SeqCst));
assert!(receiver.recv_timeout(Duration::from_millis(500)).is_ok());
assert_eq!(10, counter.load(SeqCst));
}
fn test_lock_non_blocking(directory: &mut dyn Directory) {
@@ -174,9 +257,13 @@ fn test_lock_blocking(directory: &mut dyn Directory) {
is_blocking: true,
});
assert!(lock_a_res.is_ok());
let in_thread = Arc::new(AtomicBool::default());
let in_thread_clone = in_thread.clone();
let (sender, receiver) = oneshot::channel();
std::thread::spawn(move || {
//< lock_a_res is sent to the thread.
std::thread::sleep(time::Duration::from_millis(10));
in_thread_clone.store(true, SeqCst);
let _just_sync = block_on(receiver);
// explicitely droping lock_a_res. It would have been sufficient to just force it
// to be part of the move, but the intent seems clearer that way.
drop(lock_a_res);
@@ -189,14 +276,18 @@ fn test_lock_blocking(directory: &mut dyn Directory) {
});
assert!(lock_a_res.is_err());
}
{
// the blocking call should wait for at least 10ms.
let start = time::Instant::now();
let lock_a_res = directory.acquire_lock(&Lock {
let directory_clone = directory.box_clone();
let (sender2, receiver2) = oneshot::channel();
let join_handle = std::thread::spawn(move || {
assert!(sender2.send(()).is_ok());
let lock_a_res = directory_clone.acquire_lock(&Lock {
filepath: PathBuf::from("a.lock"),
is_blocking: true,
});
assert!(in_thread.load(SeqCst));
assert!(lock_a_res.is_ok());
assert!(start.elapsed().subsec_millis() >= 10);
}
});
assert!(block_on(receiver2).is_ok());
assert!(sender.send(()).is_ok());
assert!(join_handle.join().is_ok());
}

View File

@@ -1,3 +1,5 @@
use futures::channel::oneshot;
use futures::{Future, TryFutureExt};
use std::sync::Arc;
use std::sync::RwLock;
use std::sync::Weak;
@@ -47,14 +49,21 @@ impl WatchCallbackList {
}
/// Triggers all callbacks
pub fn broadcast(&self) {
pub fn broadcast(&self) -> impl Future<Output = ()> {
let callbacks = self.list_callback();
let (sender, receiver) = oneshot::channel();
let result = receiver.unwrap_or_else(|_| ());
if callbacks.is_empty() {
let _ = sender.send(());
return result;
}
let spawn_res = std::thread::Builder::new()
.name("watch-callbacks".to_string())
.spawn(move || {
for callback in callbacks {
callback();
}
let _ = sender.send(());
});
if let Err(err) = spawn_res {
error!(
@@ -62,19 +71,17 @@ impl WatchCallbackList {
err
);
}
result
}
}
#[cfg(test)]
mod tests {
use crate::directory::WatchCallbackList;
use futures::executor::block_on;
use std::mem;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
const WAIT_TIME: u64 = 20;
#[test]
fn test_watch_event_router_simple() {
@@ -84,22 +91,22 @@ mod tests {
let inc_callback = Box::new(move || {
counter_clone.fetch_add(1, Ordering::SeqCst);
});
watch_event_router.broadcast();
block_on(watch_event_router.broadcast());
assert_eq!(0, counter.load(Ordering::SeqCst));
let handle_a = watch_event_router.subscribe(inc_callback);
thread::sleep(Duration::from_millis(WAIT_TIME));
assert_eq!(0, counter.load(Ordering::SeqCst));
watch_event_router.broadcast();
thread::sleep(Duration::from_millis(WAIT_TIME));
block_on(watch_event_router.broadcast());
assert_eq!(1, counter.load(Ordering::SeqCst));
watch_event_router.broadcast();
watch_event_router.broadcast();
watch_event_router.broadcast();
thread::sleep(Duration::from_millis(WAIT_TIME));
block_on(async {
(
watch_event_router.broadcast().await,
watch_event_router.broadcast().await,
watch_event_router.broadcast().await,
)
});
assert_eq!(4, counter.load(Ordering::SeqCst));
mem::drop(handle_a);
watch_event_router.broadcast();
thread::sleep(Duration::from_millis(WAIT_TIME));
block_on(watch_event_router.broadcast());
assert_eq!(4, counter.load(Ordering::SeqCst));
}
@@ -115,20 +122,20 @@ mod tests {
};
let handle_a = watch_event_router.subscribe(inc_callback(1));
let handle_a2 = watch_event_router.subscribe(inc_callback(10));
thread::sleep(Duration::from_millis(WAIT_TIME));
assert_eq!(0, counter.load(Ordering::SeqCst));
watch_event_router.broadcast();
watch_event_router.broadcast();
thread::sleep(Duration::from_millis(WAIT_TIME));
block_on(async {
futures::join!(
watch_event_router.broadcast(),
watch_event_router.broadcast()
)
});
assert_eq!(22, counter.load(Ordering::SeqCst));
mem::drop(handle_a);
watch_event_router.broadcast();
thread::sleep(Duration::from_millis(WAIT_TIME));
block_on(watch_event_router.broadcast());
assert_eq!(32, counter.load(Ordering::SeqCst));
mem::drop(handle_a2);
watch_event_router.broadcast();
watch_event_router.broadcast();
thread::sleep(Duration::from_millis(WAIT_TIME));
block_on(watch_event_router.broadcast());
block_on(watch_event_router.broadcast());
assert_eq!(32, counter.load(Ordering::SeqCst));
}
@@ -142,14 +149,15 @@ mod tests {
});
let handle_a = watch_event_router.subscribe(inc_callback);
assert_eq!(0, counter.load(Ordering::SeqCst));
watch_event_router.broadcast();
watch_event_router.broadcast();
thread::sleep(Duration::from_millis(WAIT_TIME));
block_on(async {
let future1 = watch_event_router.broadcast();
let future2 = watch_event_router.broadcast();
futures::join!(future1, future2)
});
assert_eq!(2, counter.load(Ordering::SeqCst));
thread::sleep(Duration::from_millis(WAIT_TIME));
mem::drop(handle_a);
watch_event_router.broadcast();
thread::sleep(Duration::from_millis(WAIT_TIME));
let _ = watch_event_router.broadcast();
block_on(watch_event_router.broadcast());
assert_eq!(2, counter.load(Ordering::SeqCst));
}
}

View File

@@ -170,3 +170,9 @@ impl From<serde_json::Error> for TantivyError {
TantivyError::IOError(io_err.into())
}
}
impl From<rayon::ThreadPoolBuildError> for TantivyError {
fn from(error: rayon::ThreadPoolBuildError) -> TantivyError {
TantivyError::SystemError(error.to_string())
}
}

View File

@@ -10,11 +10,14 @@ use std::io::Write;
/// Write a delete `BitSet`
///
/// where `delete_bitset` is the set of deleted `DocId`.
pub fn write_delete_bitset(delete_bitset: &BitSet, writer: &mut WritePtr) -> io::Result<()> {
let max_doc = delete_bitset.capacity();
pub fn write_delete_bitset(
delete_bitset: &BitSet,
max_doc: u32,
writer: &mut WritePtr,
) -> io::Result<()> {
let mut byte = 0u8;
let mut shift = 0u8;
for doc in 0..max_doc {
for doc in 0..(max_doc as usize) {
if delete_bitset.contains(doc) {
byte |= 1 << shift;
}
@@ -86,18 +89,17 @@ mod tests {
use bit_set::BitSet;
use std::path::PathBuf;
fn test_delete_bitset_helper(bitset: &BitSet) {
fn test_delete_bitset_helper(bitset: &BitSet, max_doc: u32) {
let test_path = PathBuf::from("test");
let mut directory = RAMDirectory::create();
{
let mut writer = directory.open_write(&*test_path).unwrap();
write_delete_bitset(bitset, &mut writer).unwrap();
write_delete_bitset(bitset, max_doc, &mut writer).unwrap();
}
{
let source = directory.open_read(&test_path).unwrap();
let delete_bitset = DeleteBitSet::open(source);
let n = bitset.capacity();
for doc in 0..n {
for doc in 0..max_doc as usize {
assert_eq!(bitset.contains(doc), delete_bitset.is_deleted(doc as DocId));
}
assert_eq!(delete_bitset.len(), bitset.len());
@@ -110,7 +112,7 @@ mod tests {
let mut bitset = BitSet::with_capacity(10);
bitset.insert(1);
bitset.insert(9);
test_delete_bitset_helper(&bitset);
test_delete_bitset_helper(&bitset, 10);
}
{
let mut bitset = BitSet::with_capacity(8);
@@ -119,7 +121,7 @@ mod tests {
bitset.insert(3);
bitset.insert(5);
bitset.insert(7);
test_delete_bitset_helper(&bitset);
test_delete_bitset_helper(&bitset, 8);
}
}
}

View File

@@ -7,8 +7,8 @@ use crate::core::SegmentComponent;
use crate::core::SegmentId;
use crate::core::SegmentMeta;
use crate::core::SegmentReader;
use crate::directory::DirectoryLock;
use crate::directory::TerminatingWrite;
use crate::directory::{DirectoryLock, GarbageCollectionResult};
use crate::docset::DocSet;
use crate::error::TantivyError;
use crate::fastfield::write_delete_bitset;
@@ -23,10 +23,10 @@ use crate::schema::Document;
use crate::schema::IndexRecordOption;
use crate::schema::Term;
use crate::Opstamp;
use crate::Result;
use bit_set::BitSet;
use crossbeam::channel;
use futures::{Canceled, Future};
use futures::executor::block_on;
use futures::future::Future;
use smallvec::smallvec;
use smallvec::SmallVec;
use std::mem;
@@ -72,7 +72,7 @@ pub struct IndexWriter {
heap_size_in_bytes_per_thread: usize,
workers_join_handle: Vec<JoinHandle<Result<()>>>,
workers_join_handle: Vec<JoinHandle<crate::Result<()>>>,
operation_receiver: OperationReceiver,
operation_sender: OperationSender,
@@ -95,7 +95,7 @@ fn compute_deleted_bitset(
delete_cursor: &mut DeleteCursor,
doc_opstamps: &DocToOpstampMapping,
target_opstamp: Opstamp,
) -> Result<bool> {
) -> crate::Result<bool> {
let mut might_have_changed = false;
while let Some(delete_op) = delete_cursor.get() {
if delete_op.opstamp > target_opstamp {
@@ -132,7 +132,7 @@ pub(crate) fn advance_deletes(
mut segment: Segment,
segment_entry: &mut SegmentEntry,
target_opstamp: Opstamp,
) -> Result<()> {
) -> crate::Result<()> {
{
if segment_entry.meta().delete_opstamp() == Some(target_opstamp) {
// We are already up-to-date here.
@@ -148,7 +148,6 @@ pub(crate) fn advance_deletes(
};
let delete_cursor = segment_entry.delete_cursor();
compute_deleted_bitset(
&mut delete_bitset,
&segment_reader,
@@ -168,7 +167,7 @@ pub(crate) fn advance_deletes(
if num_deleted_docs > 0 {
segment = segment.with_delete_meta(num_deleted_docs as u32, target_opstamp);
let mut delete_file = segment.open_write(SegmentComponent::DELETE)?;
write_delete_bitset(&delete_bitset, &mut delete_file)?;
write_delete_bitset(&delete_bitset, max_doc, &mut delete_file)?;
delete_file.terminate()?;
}
}
@@ -178,13 +177,13 @@ pub(crate) fn advance_deletes(
fn index_documents(
memory_budget: usize,
segment: &Segment,
segment: Segment,
grouped_document_iterator: &mut dyn Iterator<Item = OperationGroup>,
segment_updater: &mut SegmentUpdater,
mut delete_cursor: DeleteCursor,
) -> Result<bool> {
) -> crate::Result<bool> {
let schema = segment.schema();
let segment_id = segment.id();
let mut segment_writer = SegmentWriter::for_segment(memory_budget, segment.clone(), &schema)?;
for document_group in grouped_document_iterator {
for doc in document_group {
@@ -204,22 +203,32 @@ fn index_documents(
return Ok(false);
}
let num_docs = segment_writer.max_doc();
let max_doc = segment_writer.max_doc();
// this is ensured by the call to peek before starting
// the worker thread.
assert!(num_docs > 0);
assert!(max_doc > 0);
let doc_opstamps: Vec<Opstamp> = segment_writer.finalize()?;
let segment_meta = segment.index().new_segment_meta(segment_id, num_docs);
let segment_with_max_doc = segment.with_max_doc(max_doc);
let last_docstamp: Opstamp = *(doc_opstamps.last().unwrap());
let delete_bitset_opt =
apply_deletes(&segment, &mut delete_cursor, &doc_opstamps, last_docstamp)?;
let delete_bitset_opt = apply_deletes(
&segment_with_max_doc,
&mut delete_cursor,
&doc_opstamps,
last_docstamp,
)?;
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor, delete_bitset_opt);
Ok(segment_updater.add_segment(segment_entry))
let segment_entry = SegmentEntry::new(
segment_with_max_doc.meta().clone(),
delete_cursor,
delete_bitset_opt,
);
block_on(segment_updater.schedule_add_segment(segment_entry))?;
Ok(true)
}
fn apply_deletes(
@@ -227,7 +236,7 @@ fn apply_deletes(
mut delete_cursor: &mut DeleteCursor,
doc_opstamps: &[Opstamp],
last_docstamp: Opstamp,
) -> Result<Option<BitSet<u32>>> {
) -> crate::Result<Option<BitSet<u32>>> {
if delete_cursor.get().is_none() {
// if there are no delete operation in the queue, no need
// to even open the segment.
@@ -235,7 +244,9 @@ fn apply_deletes(
}
let segment_reader = SegmentReader::open(segment)?;
let doc_to_opstamps = DocToOpstampMapping::from(doc_opstamps);
let mut deleted_bitset = BitSet::with_capacity(segment_reader.max_doc() as usize);
let max_doc = segment.meta().max_doc();
let mut deleted_bitset = BitSet::with_capacity(max_doc as usize);
let may_have_deletes = compute_deleted_bitset(
&mut deleted_bitset,
&segment_reader,
@@ -270,7 +281,7 @@ impl IndexWriter {
num_threads: usize,
heap_size_in_bytes_per_thread: usize,
directory_lock: DirectoryLock,
) -> Result<IndexWriter> {
) -> crate::Result<IndexWriter> {
if heap_size_in_bytes_per_thread < HEAP_SIZE_MIN {
let err_msg = format!(
"The heap size per thread needs to be at least {}.",
@@ -321,7 +332,7 @@ impl IndexWriter {
/// If there are some merging threads, blocks until they all finish their work and
/// then drop the `IndexWriter`.
pub fn wait_merging_threads(mut self) -> Result<()> {
pub fn wait_merging_threads(mut self) -> crate::Result<()> {
// this will stop the indexing thread,
// dropping the last reference to the segment_updater.
drop(self.operation_sender);
@@ -350,10 +361,10 @@ impl IndexWriter {
}
#[doc(hidden)]
pub fn add_segment(&mut self, segment_meta: SegmentMeta) {
pub fn add_segment(&self, segment_meta: SegmentMeta) -> crate::Result<()> {
let delete_cursor = self.delete_queue.cursor();
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor, None);
self.segment_updater.add_segment(segment_entry);
block_on(self.segment_updater.schedule_add_segment(segment_entry))
}
/// Creates a new segment.
@@ -370,7 +381,7 @@ impl IndexWriter {
/// Spawns a new worker thread for indexing.
/// The thread consumes documents from the pipeline.
fn add_indexing_worker(&mut self) -> Result<()> {
fn add_indexing_worker(&mut self) -> crate::Result<()> {
let document_receiver_clone = self.operation_receiver.clone();
let mut segment_updater = self.segment_updater.clone();
@@ -378,7 +389,7 @@ impl IndexWriter {
let mem_budget = self.heap_size_in_bytes_per_thread;
let index = self.index.clone();
let join_handle: JoinHandle<Result<()>> = thread::Builder::new()
let join_handle: JoinHandle<crate::Result<()>> = thread::Builder::new()
.name(format!("thrd-tantivy-index{}", self.worker_id))
.spawn(move || {
loop {
@@ -407,7 +418,7 @@ impl IndexWriter {
let segment = index.new_segment();
index_documents(
mem_budget,
&segment,
segment,
&mut document_iterator,
&mut segment_updater,
delete_cursor.clone(),
@@ -424,22 +435,23 @@ impl IndexWriter {
self.segment_updater.get_merge_policy()
}
/// Set the merge policy.
/// Setter for the merge policy.
pub fn set_merge_policy(&self, merge_policy: Box<dyn MergePolicy>) {
self.segment_updater.set_merge_policy(merge_policy);
}
fn start_workers(&mut self) -> Result<()> {
fn start_workers(&mut self) -> crate::Result<()> {
for _ in 0..self.num_threads {
self.add_indexing_worker()?;
}
Ok(())
}
/// Detects and removes the files that
/// are not used by the index anymore.
pub fn garbage_collect_files(&mut self) -> Result<()> {
self.segment_updater.garbage_collect_files().wait()
/// Detects and removes the files that are not used by the index anymore.
pub fn garbage_collect_files(
&self,
) -> impl Future<Output = crate::Result<GarbageCollectionResult>> {
self.segment_updater.schedule_garbage_collect()
}
/// Deletes all documents from the index
@@ -478,7 +490,7 @@ impl IndexWriter {
/// Ok(())
/// }
/// ```
pub fn delete_all_documents(&mut self) -> Result<Opstamp> {
pub fn delete_all_documents(&self) -> crate::Result<Opstamp> {
// Delete segments
self.segment_updater.remove_all_segments();
// Return new stamp - reverted stamp
@@ -489,11 +501,9 @@ impl IndexWriter {
/// Merges a given list of segments
///
/// `segment_ids` is required to be non-empty.
pub fn merge(
&mut self,
segment_ids: &[SegmentId],
) -> Result<impl Future<Item = SegmentMeta, Error = Canceled>> {
self.segment_updater.start_merge(segment_ids)
pub async fn merge(&mut self, segment_ids: &[SegmentId]) -> crate::Result<SegmentMeta> {
let merge_operation = self.segment_updater.make_merge_operation(segment_ids);
self.segment_updater.start_merge(merge_operation)?.await
}
/// Closes the current document channel send.
@@ -519,13 +529,8 @@ impl IndexWriter {
/// state as it was after the last commit.
///
/// The opstamp at the last commit is returned.
pub fn rollback(&mut self) -> Result<Opstamp> {
pub fn rollback(&mut self) -> crate::Result<Opstamp> {
info!("Rolling back to opstamp {}", self.committed_opstamp);
self.rollback_impl()
}
/// Private, implementation of rollback
fn rollback_impl(&mut self) -> Result<Opstamp> {
// marks the segment updater as killed. From now on, all
// segment updates will be ignored.
self.segment_updater.kill();
@@ -581,7 +586,7 @@ impl IndexWriter {
/// It is also possible to add a payload to the `commit`
/// using this API.
/// See [`PreparedCommit::set_payload()`](PreparedCommit.html)
pub fn prepare_commit(&mut self) -> Result<PreparedCommit<'_>> {
pub fn prepare_commit(&mut self) -> crate::Result<PreparedCommit> {
// Here, because we join all of the worker threads,
// all of the segment update for this commit have been
// sent.
@@ -628,7 +633,7 @@ impl IndexWriter {
/// Commit returns the `opstamp` of the last document
/// that made it in the commit.
///
pub fn commit(&mut self) -> Result<Opstamp> {
pub fn commit(&mut self) -> crate::Result<Opstamp> {
self.prepare_commit()?.commit()
}
@@ -669,9 +674,6 @@ impl IndexWriter {
/// The opstamp is an increasing `u64` that can
/// be used by the client to align commits with its own
/// document queue.
///
/// Currently it represents the number of documents that
/// have been added since the creation of the index.
pub fn add_document(&self, document: Document) -> Opstamp {
let opstamp = self.stamper.stamp();
let add_operation = AddOperation { opstamp, document };

View File

@@ -2,14 +2,23 @@ use crate::Opstamp;
use crate::SegmentId;
use census::{Inventory, TrackedObject};
use std::collections::HashSet;
use std::ops::Deref;
#[derive(Default)]
pub struct MergeOperationInventory(Inventory<InnerMergeOperation>);
pub(crate) struct MergeOperationInventory(Inventory<InnerMergeOperation>);
impl Deref for MergeOperationInventory {
type Target = Inventory<InnerMergeOperation>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl MergeOperationInventory {
pub fn segment_in_merge(&self) -> HashSet<SegmentId> {
let mut segment_in_merge = HashSet::default();
for merge_op in self.0.list() {
for merge_op in self.list() {
for &segment_id in &merge_op.segment_ids {
segment_in_merge.insert(segment_id);
}
@@ -35,13 +44,13 @@ pub struct MergeOperation {
inner: TrackedObject<InnerMergeOperation>,
}
struct InnerMergeOperation {
pub(crate) struct InnerMergeOperation {
target_opstamp: Opstamp,
segment_ids: Vec<SegmentId>,
}
impl MergeOperation {
pub fn new(
pub(crate) fn new(
inventory: &MergeOperationInventory,
target_opstamp: Opstamp,
segment_ids: Vec<SegmentId>,
@@ -51,7 +60,7 @@ impl MergeOperation {
segment_ids,
};
MergeOperation {
inner: inventory.0.track(inner_merge_operation),
inner: inventory.track(inner_merge_operation),
}
}

View File

@@ -709,7 +709,7 @@ mod tests {
use crate::IndexWriter;
use crate::Searcher;
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use futures::Future;
use futures::executor::block_on;
use std::io::Cursor;
#[test]
@@ -792,11 +792,7 @@ mod tests {
.searchable_segment_ids()
.expect("Searchable segments failed.");
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
index_writer
.merge(&segment_ids)
.expect("Failed to initiate merge")
.wait()
.expect("Merging failed");
block_on(index_writer.merge(&segment_ids)).expect("Merging failed");
index_writer.wait_merging_threads().unwrap();
}
{
@@ -1040,11 +1036,7 @@ mod tests {
let segment_ids = index
.searchable_segment_ids()
.expect("Searchable segments failed.");
index_writer
.merge(&segment_ids)
.expect("Failed to initiate merge")
.wait()
.expect("Merging failed");
block_on(index_writer.merge(&segment_ids)).expect("Merging failed");
reader.reload().unwrap();
let searcher = reader.searcher();
assert_eq!(searcher.segment_readers().len(), 1);
@@ -1139,11 +1131,7 @@ mod tests {
let segment_ids = index
.searchable_segment_ids()
.expect("Searchable segments failed.");
index_writer
.merge(&segment_ids)
.expect("Failed to initiate merge")
.wait()
.expect("Merging failed");
block_on(index_writer.merge(&segment_ids)).expect("Merging failed");
reader.reload().unwrap();
let searcher = reader.searcher();
@@ -1277,11 +1265,7 @@ mod tests {
.searchable_segment_ids()
.expect("Searchable segments failed.");
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
index_writer
.merge(&segment_ids)
.expect("Failed to initiate merge")
.wait()
.expect("Merging failed");
block_on(index_writer.merge(&segment_ids)).expect("Merging failed");
index_writer.wait_merging_threads().unwrap();
reader.reload().unwrap();
test_searcher(
@@ -1336,11 +1320,7 @@ mod tests {
let segment_ids = index
.searchable_segment_ids()
.expect("Searchable segments failed.");
index_writer
.merge(&segment_ids)
.expect("Failed to initiate merge")
.wait()
.expect("Merging failed");
block_on(index_writer.merge(&segment_ids)).expect("Merging failed");
reader.reload().unwrap();
// commit has not been called yet. The document should still be
// there.
@@ -1361,22 +1341,18 @@ mod tests {
let mut doc = Document::default();
doc.add_u64(int_field, 1);
index_writer.add_document(doc.clone());
index_writer.commit().expect("commit failed");
assert!(index_writer.commit().is_ok());
index_writer.add_document(doc);
index_writer.commit().expect("commit failed");
assert!(index_writer.commit().is_ok());
index_writer.delete_term(Term::from_field_u64(int_field, 1));
let segment_ids = index
.searchable_segment_ids()
.expect("Searchable segments failed.");
index_writer
.merge(&segment_ids)
.expect("Failed to initiate merge")
.wait()
.expect("Merging failed");
assert!(block_on(index_writer.merge(&segment_ids)).is_ok());
// assert delete has not been committed
reader.reload().expect("failed to load searcher 1");
assert!(reader.reload().is_ok());
let searcher = reader.searcher();
assert_eq!(searcher.num_docs(), 2);
@@ -1415,12 +1391,12 @@ mod tests {
index_doc(&mut index_writer, &[1, 5]);
index_doc(&mut index_writer, &[3]);
index_doc(&mut index_writer, &[17]);
index_writer.commit().expect("committed");
assert!(index_writer.commit().is_ok());
index_doc(&mut index_writer, &[20]);
index_writer.commit().expect("committed");
assert!(index_writer.commit().is_ok());
index_doc(&mut index_writer, &[28, 27]);
index_doc(&mut index_writer, &[1_000]);
index_writer.commit().expect("committed");
assert!(index_writer.commit().is_ok());
}
let reader = index.reader().unwrap();
let searcher = reader.searcher();
@@ -1452,15 +1428,6 @@ mod tests {
assert_eq!(&vals, &[17]);
}
println!(
"{:?}",
searcher
.segment_readers()
.iter()
.map(|reader| reader.max_doc())
.collect::<Vec<_>>()
);
{
let segment = searcher.segment_reader(1u32);
let ff_reader = segment.fast_fields().u64s(int_field).unwrap();
@@ -1484,27 +1451,13 @@ mod tests {
.searchable_segment_ids()
.expect("Searchable segments failed.");
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
index_writer
.merge(&segment_ids)
.expect("Failed to initiate merge")
.wait()
.expect("Merging failed");
index_writer
.wait_merging_threads()
.expect("Wait for merging threads");
assert!(block_on(index_writer.merge(&segment_ids)).is_ok());
assert!(index_writer.wait_merging_threads().is_ok());
}
reader.reload().expect("Load searcher");
assert!(reader.reload().is_ok());
{
let searcher = reader.searcher();
println!(
"{:?}",
searcher
.segment_readers()
.iter()
.map(|reader| reader.max_doc())
.collect::<Vec<_>>()
);
let segment = searcher.segment_reader(0u32);
let ff_reader = segment.fast_fields().u64s(int_field).unwrap();

View File

@@ -18,7 +18,7 @@ mod stamper;
pub use self::index_writer::IndexWriter;
pub use self::log_merge_policy::LogMergePolicy;
pub use self::merge_operation::{MergeOperation, MergeOperationInventory};
pub use self::merge_operation::MergeOperation;
pub use self::merge_policy::{MergeCandidate, MergePolicy, NoMergePolicy};
pub use self::prepared_commit::PreparedCommit;
pub use self::segment_entry::SegmentEntry;
@@ -28,3 +28,25 @@ pub use self::segment_writer::SegmentWriter;
/// Alias for the default merge policy, which is the `LogMergePolicy`.
pub type DefaultMergePolicy = LogMergePolicy;
#[cfg(test)]
mod tests {
use crate::schema::{self, Schema};
use crate::{Index, Term};
#[test]
fn test_advance_delete_bug() {
let mut schema_builder = Schema::builder();
let text_field = schema_builder.add_text_field("text", schema::TEXT);
let index = Index::create_from_tempdir(schema_builder.build()).unwrap();
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
// there must be one deleted document in the segment
index_writer.add_document(doc!(text_field=>"b"));
index_writer.delete_term(Term::from_field_text(text_field, "b"));
// we need enough data to trigger the bug (at least 32 documents)
for _ in 0..32 {
index_writer.add_document(doc!(text_field=>"c"));
}
index_writer.commit().unwrap();
index_writer.commit().unwrap();
}
}

View File

@@ -1,6 +1,7 @@
use super::IndexWriter;
use crate::Opstamp;
use crate::Result;
use futures::executor::block_on;
/// A prepared commit
pub struct PreparedCommit<'a> {
@@ -32,9 +33,11 @@ impl<'a> PreparedCommit<'a> {
pub fn commit(self) -> Result<Opstamp> {
info!("committing {}", self.opstamp);
self.index_writer
.segment_updater()
.commit(self.opstamp, self.payload)?;
let _ = block_on(
self.index_writer
.segment_updater()
.schedule_commit(self.opstamp, self.payload),
);
Ok(self.opstamp)
}
}

View File

@@ -16,6 +16,28 @@ struct SegmentRegisters {
committed: SegmentRegister,
}
#[derive(PartialEq, Eq)]
pub(crate) enum SegmentsStatus {
Committed,
Uncommitted,
}
impl SegmentRegisters {
/// Check if all the segments are committed or uncommited.
///
/// If some segment is missing or segments are in a different state (this should not happen
/// if tantivy is used correctly), returns `None`.
fn segments_status(&self, segment_ids: &[SegmentId]) -> Option<SegmentsStatus> {
if self.uncommitted.contains_all(segment_ids) {
Some(SegmentsStatus::Uncommitted)
} else if self.committed.contains_all(segment_ids) {
Some(SegmentsStatus::Committed)
} else {
None
}
}
}
/// The segment manager stores the list of segments
/// as well as their state.
///
@@ -153,33 +175,35 @@ impl SegmentManager {
let mut registers_lock = self.write();
registers_lock.uncommitted.add_segment_entry(segment_entry);
}
pub fn end_merge(
// Replace a list of segments for their equivalent merged segment.
//
// Returns true if these segments are committed, false if the merge segments are uncommited.
pub(crate) fn end_merge(
&self,
before_merge_segment_ids: &[SegmentId],
after_merge_segment_entry: SegmentEntry,
) {
) -> crate::Result<SegmentsStatus> {
let mut registers_lock = self.write();
let target_register: &mut SegmentRegister = {
if registers_lock
.uncommitted
.contains_all(before_merge_segment_ids)
{
&mut registers_lock.uncommitted
} else if registers_lock
.committed
.contains_all(before_merge_segment_ids)
{
&mut registers_lock.committed
} else {
let segments_status = registers_lock
.segments_status(before_merge_segment_ids)
.ok_or_else(|| {
warn!("couldn't find segment in SegmentManager");
return;
}
crate::Error::InvalidArgument(
"The segments that were merged could not be found in the SegmentManager. \
This is not necessarily a bug, and can happen after a rollback for instance."
.to_string(),
)
})?;
let target_register: &mut SegmentRegister = match segments_status {
SegmentsStatus::Uncommitted => &mut registers_lock.uncommitted,
SegmentsStatus::Committed => &mut registers_lock.committed,
};
for segment_id in before_merge_segment_ids {
target_register.remove_segment(segment_id);
}
target_register.add_segment_entry(after_merge_segment_entry);
Ok(segments_status)
}
pub fn committed_segment_metas(&self) -> Vec<SegmentMeta> {

View File

@@ -6,39 +6,34 @@ use crate::core::SegmentId;
use crate::core::SegmentMeta;
use crate::core::SerializableSegment;
use crate::core::META_FILEPATH;
use crate::directory::{Directory, DirectoryClone};
use crate::error::TantivyError;
use crate::directory::{Directory, DirectoryClone, GarbageCollectionResult};
use crate::indexer::delete_queue::DeleteCursor;
use crate::indexer::index_writer::advance_deletes;
use crate::indexer::merge_operation::MergeOperationInventory;
use crate::indexer::merger::IndexMerger;
use crate::indexer::segment_manager::SegmentsStatus;
use crate::indexer::stamper::Stamper;
use crate::indexer::MergeOperation;
use crate::indexer::SegmentEntry;
use crate::indexer::SegmentSerializer;
use crate::indexer::{DefaultMergePolicy, MergePolicy};
use crate::indexer::{MergeCandidate, MergeOperation};
use crate::schema::Schema;
use crate::Opstamp;
use crate::Result;
use futures::oneshot;
use futures::sync::oneshot::Receiver;
use futures::Future;
use futures_cpupool::Builder as CpuPoolBuilder;
use futures_cpupool::CpuFuture;
use futures_cpupool::CpuPool;
use futures::channel::oneshot;
use futures::executor::{ThreadPool, ThreadPoolBuilder};
use futures::future::Future;
use futures::future::TryFutureExt;
use serde_json;
use std::borrow::BorrowMut;
use std::collections::HashMap;
use std::collections::HashSet;
use std::io::Write;
use std::mem;
use std::ops::DerefMut;
use std::ops::Deref;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::RwLock;
use std::thread;
use std::thread::JoinHandle;
const NUM_MERGE_THREADS: usize = 4;
/// Save the index meta file.
/// This operation is atomic :
@@ -49,7 +44,7 @@ use std::thread::JoinHandle;
/// and flushed.
///
/// This method is not part of tantivy's public API
pub fn save_new_metas(schema: Schema, directory: &mut dyn Directory) -> Result<()> {
pub fn save_new_metas(schema: Schema, directory: &mut dyn Directory) -> crate::Result<()> {
save_metas(
&IndexMeta {
segments: Vec::new(),
@@ -70,7 +65,7 @@ pub fn save_new_metas(schema: Schema, directory: &mut dyn Directory) -> Result<(
/// and flushed.
///
/// This method is not part of tantivy's public API
fn save_metas(metas: &IndexMeta, directory: &mut dyn Directory) -> Result<()> {
fn save_metas(metas: &IndexMeta, directory: &mut dyn Directory) -> crate::Result<()> {
info!("save metas");
let mut buffer = serde_json::to_vec_pretty(metas)?;
// Just adding a new line at the end of the buffer.
@@ -89,21 +84,38 @@ fn save_metas(metas: &IndexMeta, directory: &mut dyn Directory) -> Result<()> {
// We voluntarily pass a merge_operation ref to guarantee that
// the merge_operation is alive during the process
#[derive(Clone)]
pub struct SegmentUpdater(Arc<InnerSegmentUpdater>);
pub(crate) struct SegmentUpdater(Arc<InnerSegmentUpdater>);
fn perform_merge(
merge_operation: &MergeOperation,
impl Deref for SegmentUpdater {
type Target = InnerSegmentUpdater;
#[inline]
fn deref(&self) -> &Self::Target {
&self.0
}
}
async fn garbage_collect_files(
segment_updater: SegmentUpdater,
) -> crate::Result<GarbageCollectionResult> {
info!("Running garbage collection");
let mut index = segment_updater.index.clone();
index
.directory_mut()
.garbage_collect(move || segment_updater.list_files())
}
/// Merges a list of segments the list of segment givens in the `segment_entries`.
/// This function happens in the calling thread and is computationally expensive.
fn merge(
index: &Index,
mut segment_entries: Vec<SegmentEntry>,
) -> Result<SegmentEntry> {
let target_opstamp = merge_operation.target_opstamp();
target_opstamp: Opstamp,
) -> crate::Result<SegmentEntry> {
// first we need to apply deletes to our segment.
let mut merged_segment = index.new_segment();
// TODO add logging
let schema = index.schema();
// First we apply all of the delet to the merged segment, up to the target opstamp.
for segment_entry in &mut segment_entries {
let segment = index.segment(segment_entry.meta().clone());
advance_deletes(segment, segment_entry, target_opstamp)?;
@@ -117,22 +129,19 @@ fn perform_merge(
.collect();
// An IndexMerger is like a "view" of our merged segments.
let merger: IndexMerger = IndexMerger::open(schema, &segments[..])?;
// ... we just serialize this index merger in our new segment
// to merge the two segments.
let merger: IndexMerger = IndexMerger::open(index.schema(), &segments[..])?;
// ... we just serialize this index merger in our new segment to merge the two segments.
let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment)?;
let num_docs = merger.write(segment_serializer)?;
let segment_meta = index.new_segment_meta(merged_segment.id(), num_docs);
let after_merge_segment_entry = SegmentEntry::new(segment_meta.clone(), delete_cursor, None);
Ok(after_merge_segment_entry)
Ok(SegmentEntry::new(segment_meta.clone(), delete_cursor, None))
}
struct InnerSegmentUpdater {
pub(crate) struct InnerSegmentUpdater {
// we keep a copy of the current active IndexMeta to
// avoid loading the file everytime we need it in the
// `SegmentUpdater`.
@@ -140,12 +149,12 @@ struct InnerSegmentUpdater {
// This should be up to date as all update happen through
// the unique active `SegmentUpdater`.
active_metas: RwLock<Arc<IndexMeta>>,
pool: CpuPool,
pool: ThreadPool,
merge_thread_pool: ThreadPool,
index: Index,
segment_manager: SegmentManager,
merge_policy: RwLock<Arc<Box<dyn MergePolicy>>>,
merging_thread_id: AtomicUsize,
merging_threads: RwLock<HashMap<usize, JoinHandle<Result<()>>>>,
killed: AtomicBool,
stamper: Stamper,
merge_operations: MergeOperationInventory,
@@ -156,22 +165,31 @@ impl SegmentUpdater {
index: Index,
stamper: Stamper,
delete_cursor: &DeleteCursor,
) -> Result<SegmentUpdater> {
) -> crate::Result<SegmentUpdater> {
let segments = index.searchable_segment_metas()?;
let segment_manager = SegmentManager::from_segments(segments, delete_cursor);
let pool = CpuPoolBuilder::new()
let pool = ThreadPoolBuilder::new()
.name_prefix("segment_updater")
.pool_size(1)
.create();
.create()
.map_err(|_| {
crate::Error::SystemError("Failed to spawn segment updater thread".to_string())
})?;
let merge_thread_pool = ThreadPoolBuilder::new()
.name_prefix("merge_thread")
.pool_size(NUM_MERGE_THREADS)
.create()
.map_err(|_| {
crate::Error::SystemError("Failed to spawn segment merging thread".to_string())
})?;
let index_meta = index.load_metas()?;
Ok(SegmentUpdater(Arc::new(InnerSegmentUpdater {
active_metas: RwLock::new(Arc::new(index_meta)),
pool,
merge_thread_pool,
index,
segment_manager,
merge_policy: RwLock::new(Arc::new(Box::new(DefaultMergePolicy::default()))),
merging_thread_id: AtomicUsize::default(),
merging_threads: RwLock::new(HashMap::new()),
killed: AtomicBool::new(false),
stamper,
merge_operations: Default::default(),
@@ -179,67 +197,76 @@ impl SegmentUpdater {
}
pub fn get_merge_policy(&self) -> Arc<Box<dyn MergePolicy>> {
self.0.merge_policy.read().unwrap().clone()
self.merge_policy.read().unwrap().clone()
}
pub fn set_merge_policy(&self, merge_policy: Box<dyn MergePolicy>) {
let arc_merge_policy = Arc::new(merge_policy);
*self.0.merge_policy.write().unwrap() = arc_merge_policy;
*self.merge_policy.write().unwrap() = arc_merge_policy;
}
fn get_merging_thread_id(&self) -> usize {
self.0.merging_thread_id.fetch_add(1, Ordering::SeqCst)
}
fn run_async<T: 'static + Send, F: 'static + Send + FnOnce(SegmentUpdater) -> T>(
fn schedule_future<T: 'static + Send, F: Future<Output = crate::Result<T>> + 'static + Send>(
&self,
f: F,
) -> CpuFuture<T, TantivyError> {
let me_clone = self.clone();
self.0.pool.spawn_fn(move || Ok(f(me_clone)))
) -> impl Future<Output = crate::Result<T>> {
let (sender, receiver) = oneshot::channel();
self.pool.spawn_ok(async move {
let _ = sender.send(f.await);
});
receiver.unwrap_or_else(|_| {
let err_msg =
"A segment_updater future did not success. This should never happen.".to_string();
Err(crate::Error::SystemError(err_msg))
})
}
pub fn add_segment(&self, segment_entry: SegmentEntry) -> bool {
self.run_async(|segment_updater| {
segment_updater.0.segment_manager.add_segment(segment_entry);
segment_updater.consider_merge_options();
true
pub fn schedule_add_segment(
&self,
segment_entry: SegmentEntry,
) -> impl Future<Output = crate::Result<()>> {
let segment_updater = self.clone();
self.schedule_future(async move {
segment_updater.segment_manager.add_segment(segment_entry);
segment_updater.consider_merge_options().await;
Ok(())
})
.forget();
true
}
/// Orders `SegmentManager` to remove all segments
pub(crate) fn remove_all_segments(&self) {
self.0.segment_manager.remove_all_segments();
self.segment_manager.remove_all_segments();
}
pub fn kill(&mut self) {
self.0.killed.store(true, Ordering::Release);
self.killed.store(true, Ordering::Release);
}
pub fn is_alive(&self) -> bool {
!self.0.killed.load(Ordering::Acquire)
!self.killed.load(Ordering::Acquire)
}
/// Apply deletes up to the target opstamp to all segments.
///
/// The method returns copies of the segment entries,
/// updated with the delete information.
fn purge_deletes(&self, target_opstamp: Opstamp) -> Result<Vec<SegmentEntry>> {
let mut segment_entries = self.0.segment_manager.segment_entries();
fn purge_deletes(&self, target_opstamp: Opstamp) -> crate::Result<Vec<SegmentEntry>> {
let mut segment_entries = self.segment_manager.segment_entries();
for segment_entry in &mut segment_entries {
let segment = self.0.index.segment(segment_entry.meta().clone());
let segment = self.index.segment(segment_entry.meta().clone());
advance_deletes(segment, segment_entry, target_opstamp)?;
}
Ok(segment_entries)
}
pub fn save_metas(&self, opstamp: Opstamp, commit_message: Option<String>) {
pub fn save_metas(
&self,
opstamp: Opstamp,
commit_message: Option<String>,
) -> crate::Result<()> {
if self.is_alive() {
let index = &self.0.index;
let index = &self.index;
let directory = index.directory();
let mut commited_segment_metas = self.0.segment_manager.committed_segment_metas();
let mut commited_segment_metas = self.segment_manager.committed_segment_metas();
// We sort segment_readers by number of documents.
// This is an heuristic to make multithreading more efficient.
@@ -261,16 +288,18 @@ impl SegmentUpdater {
opstamp,
payload: commit_message,
};
save_metas(&index_meta, directory.box_clone().borrow_mut())
.expect("Could not save metas.");
// TODO add context to the error.
save_metas(&index_meta, directory.box_clone().borrow_mut())?;
self.store_meta(&index_meta);
}
Ok(())
}
pub fn garbage_collect_files(&self) -> CpuFuture<(), TantivyError> {
self.run_async(move |segment_updater| {
segment_updater.garbage_collect_files_exec();
})
pub fn schedule_garbage_collect(
&self,
) -> impl Future<Output = crate::Result<GarbageCollectionResult>> {
let garbage_collect_future = garbage_collect_files(self.clone());
self.schedule_future(garbage_collect_future)
}
/// List the files that are useful to the index.
@@ -278,148 +307,132 @@ impl SegmentUpdater {
/// This does not include lock files, or files that are obsolete
/// but have not yet been deleted by the garbage collector.
fn list_files(&self) -> HashSet<PathBuf> {
let mut files = HashSet::new();
let mut files: HashSet<PathBuf> = self
.index
.list_all_segment_metas()
.into_iter()
.flat_map(|segment_meta| segment_meta.list_files())
.collect();
files.insert(META_FILEPATH.to_path_buf());
for segment_meta in self.0.index.list_all_segment_metas() {
files.extend(segment_meta.list_files());
}
files
}
fn garbage_collect_files_exec(&self) {
info!("Running garbage collection");
let mut index = self.0.index.clone();
index.directory_mut().garbage_collect(|| self.list_files());
}
pub fn commit(&self, opstamp: Opstamp, payload: Option<String>) -> Result<()> {
self.run_async(move |segment_updater| {
pub fn schedule_commit(
&self,
opstamp: Opstamp,
payload: Option<String>,
) -> impl Future<Output = crate::Result<()>> {
let segment_updater: SegmentUpdater = self.clone();
self.schedule_future(async move {
if segment_updater.is_alive() {
let segment_entries = segment_updater
.purge_deletes(opstamp)
.expect("Failed purge deletes");
segment_updater.0.segment_manager.commit(segment_entries);
segment_updater.save_metas(opstamp, payload);
segment_updater.garbage_collect_files_exec();
segment_updater.consider_merge_options();
let segment_entries = segment_updater.purge_deletes(opstamp)?;
segment_updater.segment_manager.commit(segment_entries);
segment_updater.save_metas(opstamp, payload)?;
let _ = garbage_collect_files(segment_updater.clone()).await;
segment_updater.consider_merge_options().await;
}
Ok(())
})
.wait()
}
pub fn start_merge(&self, segment_ids: &[SegmentId]) -> Result<Receiver<SegmentMeta>> {
let commit_opstamp = self.load_metas().opstamp;
let merge_operation = MergeOperation::new(
&self.0.merge_operations,
commit_opstamp,
segment_ids.to_vec(),
);
self.run_async(move |segment_updater| segment_updater.start_merge_impl(merge_operation))
.wait()?
}
fn store_meta(&self, index_meta: &IndexMeta) {
*self.0.active_metas.write().unwrap() = Arc::new(index_meta.clone());
}
fn load_metas(&self) -> Arc<IndexMeta> {
self.0.active_metas.read().unwrap().clone()
*self.active_metas.write().unwrap() = Arc::new(index_meta.clone());
}
fn load_metas(&self) -> Arc<IndexMeta> {
self.active_metas.read().unwrap().clone()
}
pub(crate) fn make_merge_operation(&self, segment_ids: &[SegmentId]) -> MergeOperation {
let commit_opstamp = self.load_metas().opstamp;
MergeOperation::new(&self.merge_operations, commit_opstamp, segment_ids.to_vec())
}
// Starts a merge operation. This function will block until the merge operation is effectively
// started. Note that it does not wait for the merge to terminate.
// The calling thread should not be block for a long time, as this only involve waiting for the
// `SegmentUpdater` queue which in turns only contains lightweight operations.
//
// The merge itself happens on a different thread.
//
// When successful, this function returns a `Future` for a `Result<SegmentMeta>` that represents
// the actual outcome of the merge operation.
//
// It returns an error if for some reason the merge operation could not be started.
//
// At this point an error is not necessarily the sign of a malfunction.
// (e.g. A rollback could have happened, between the instant when the merge operaiton was
// suggested and the moment when it ended up being executed.)
//
// `segment_ids` is required to be non-empty.
fn start_merge_impl(&self, merge_operation: MergeOperation) -> Result<Receiver<SegmentMeta>> {
pub fn start_merge(
&self,
merge_operation: MergeOperation,
) -> crate::Result<impl Future<Output = crate::Result<SegmentMeta>>> {
assert!(
!merge_operation.segment_ids().is_empty(),
"Segment_ids cannot be empty."
);
let segment_updater_clone = self.clone();
let segment_updater = self.clone();
let segment_entries: Vec<SegmentEntry> = self
.0
.segment_manager
.start_merge(merge_operation.segment_ids())?;
// let segment_ids_vec = merge_operation.segment_ids.to_vec();
info!("Starting merge - {:?}", merge_operation.segment_ids());
let merging_thread_id = self.get_merging_thread_id();
info!(
"Starting merge thread #{} - {:?}",
merging_thread_id,
merge_operation.segment_ids()
);
let (merging_future_send, merging_future_recv) = oneshot();
let (merging_future_send, merging_future_recv) =
oneshot::channel::<crate::Result<SegmentMeta>>();
// first we need to apply deletes to our segment.
let merging_join_handle = thread::Builder::new()
.name(format!("mergingthread-{}", merging_thread_id))
.spawn(move || {
// first we need to apply deletes to our segment.
let merge_result = perform_merge(
&merge_operation,
&segment_updater_clone.0.index,
segment_entries,
);
match merge_result {
Ok(after_merge_segment_entry) => {
let merged_segment_meta = after_merge_segment_entry.meta().clone();
segment_updater_clone
.end_merge(merge_operation, after_merge_segment_entry)
.expect("Segment updater thread is corrupted.");
// the future may fail if the listener of the oneshot future
// has been destroyed.
//
// This is not a problem here, so we just ignore any
// possible error.
let _merging_future_res = merging_future_send.send(merged_segment_meta);
}
Err(e) => {
warn!(
"Merge of {:?} was cancelled: {:?}",
merge_operation.segment_ids(),
e
);
// ... cancel merge
if cfg!(test) {
panic!("Merge failed.");
}
// As `merge_operation` will be dropped, the segment in merge state will
// be available for merge again.
// `merging_future_send` will be dropped, sending an error to the future.
self.merge_thread_pool.spawn_ok(async move {
// The fact that `merge_operation` is moved here is important.
// Its lifetime is used to track how many merging thread are currently running,
// as well as which segment is currently in merge and therefore should not be
// candidate for another merge.
match merge(
&segment_updater.index,
segment_entries,
merge_operation.target_opstamp(),
) {
Ok(after_merge_segment_entry) => {
let segment_meta = segment_updater
.end_merge(merge_operation, after_merge_segment_entry)
.await;
let _send_result = merging_future_send.send(segment_meta);
}
Err(e) => {
warn!(
"Merge of {:?} was cancelled: {:?}",
merge_operation.segment_ids().to_vec(),
e
);
// ... cancel merge
if cfg!(test) {
panic!("Merge failed.");
}
}
segment_updater_clone
.0
.merging_threads
.write()
.unwrap()
.remove(&merging_thread_id);
Ok(())
})
.expect("Failed to spawn a thread.");
self.0
.merging_threads
.write()
.unwrap()
.insert(merging_thread_id, merging_join_handle);
Ok(merging_future_recv)
}
});
Ok(merging_future_recv
.unwrap_or_else(|_| Err(crate::Error::SystemError("Merge failed".to_string()))))
}
fn consider_merge_options(&self) {
let merge_segment_ids: HashSet<SegmentId> = self.0.merge_operations.segment_in_merge();
async fn consider_merge_options(&self) {
let merge_segment_ids: HashSet<SegmentId> = self.merge_operations.segment_in_merge();
let (committed_segments, uncommitted_segments) =
get_mergeable_segments(&merge_segment_ids, &self.0.segment_manager);
get_mergeable_segments(&merge_segment_ids, &self.segment_manager);
// Committed segments cannot be merged with uncommitted_segments.
// We therefore consider merges using these two sets of segments independently.
let merge_policy = self.get_merge_policy();
let current_opstamp = self.0.stamper.stamp();
let current_opstamp = self.stamper.stamp();
let mut merge_candidates: Vec<MergeOperation> = merge_policy
.compute_merge_candidates(&uncommitted_segments)
.into_iter()
.map(|merge_candidate| {
MergeOperation::new(&self.0.merge_operations, current_opstamp, merge_candidate.0)
MergeOperation::new(&self.merge_operations, current_opstamp, merge_candidate.0)
})
.collect();
@@ -427,25 +440,18 @@ impl SegmentUpdater {
let committed_merge_candidates = merge_policy
.compute_merge_candidates(&committed_segments)
.into_iter()
.map(|merge_candidate| {
MergeOperation::new(&self.0.merge_operations, commit_opstamp, merge_candidate.0)
.map(|merge_candidate: MergeCandidate| {
MergeOperation::new(&self.merge_operations, commit_opstamp, merge_candidate.0)
})
.collect::<Vec<_>>();
merge_candidates.extend(committed_merge_candidates.into_iter());
for merge_operation in merge_candidates {
match self.start_merge_impl(merge_operation) {
Ok(merge_future) => {
if let Err(e) = merge_future.fuse().poll() {
error!("The merge task failed quickly after starting: {:?}", e);
}
}
Err(err) => {
warn!(
"Starting the merge failed for the following reason. This is not fatal. {}",
err
);
}
if let Err(err) = self.start_merge(merge_operation) {
warn!(
"Starting the merge failed for the following reason. This is not fatal. {}",
err
);
}
}
}
@@ -454,15 +460,17 @@ impl SegmentUpdater {
&self,
merge_operation: MergeOperation,
mut after_merge_segment_entry: SegmentEntry,
) -> Result<()> {
self.run_async(move |segment_updater| {
) -> impl Future<Output = crate::Result<SegmentMeta>> {
let segment_updater = self.clone();
let after_merge_segment_meta = after_merge_segment_entry.meta().clone();
let end_merge_future = self.schedule_future(async move {
info!("End merge {:?}", after_merge_segment_entry.meta());
{
let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone();
if let Some(delete_operation) = delete_cursor.get() {
let committed_opstamp = segment_updater.load_metas().opstamp;
if delete_operation.opstamp < committed_opstamp {
let index = &segment_updater.0.index;
let index = &segment_updater.index;
let segment = index.segment(after_merge_segment_entry.meta().clone());
if let Err(e) = advance_deletes(
segment,
@@ -480,21 +488,26 @@ impl SegmentUpdater {
// ... cancel merge
// `merge_operations` are tracked. As it is dropped, the
// the segment_ids will be available again for merge.
return;
return Err(e);
}
}
}
let previous_metas = segment_updater.load_metas();
segment_updater
.0
let segments_status = segment_updater
.segment_manager
.end_merge(merge_operation.segment_ids(), after_merge_segment_entry);
segment_updater.consider_merge_options();
segment_updater.save_metas(previous_metas.opstamp, previous_metas.payload.clone());
.end_merge(merge_operation.segment_ids(), after_merge_segment_entry)?;
if segments_status == SegmentsStatus::Committed {
segment_updater
.save_metas(previous_metas.opstamp, previous_metas.payload.clone())?;
}
segment_updater.consider_merge_options().await;
} // we drop all possible handle to a now useless `SegmentMeta`.
segment_updater.garbage_collect_files_exec();
})
.wait()
let _ = garbage_collect_files(segment_updater).await;
Ok(())
});
end_merge_future.map_ok(|_| after_merge_segment_meta)
}
/// Wait for current merging threads.
@@ -512,26 +525,13 @@ impl SegmentUpdater {
///
/// Obsolete files will eventually be cleaned up
/// by the directory garbage collector.
pub fn wait_merging_thread(&self) -> Result<()> {
loop {
let merging_threads: HashMap<usize, JoinHandle<Result<()>>> = {
let mut merging_threads = self.0.merging_threads.write().unwrap();
mem::replace(merging_threads.deref_mut(), HashMap::new())
};
if merging_threads.is_empty() {
return Ok(());
pub fn wait_merging_thread(&self) -> crate::Result<()> {
for merge_operation in self.merge_operations.changes_iter() {
if merge_operation.is_empty() {
break;
}
debug!("wait merging thread {}", merging_threads.len());
for (_, merging_thread_handle) in merging_threads {
merging_thread_handle
.join()
.map(|_| ())
.map_err(|_| TantivyError::ErrorInThread("Merging thread failed.".into()))?;
}
// Our merging thread may have queued their completed merged segment.
// Let's wait for that too.
self.run_async(move |_| {}).wait()?;
}
Ok(())
}
}
@@ -687,7 +687,6 @@ mod tests {
index_writer.segment_updater().remove_all_segments();
let seg_vec = index_writer
.segment_updater()
.0
.segment_manager
.segment_entries();
assert!(seg_vec.is_empty());

View File

@@ -13,7 +13,8 @@ use crate::schema::Value;
use crate::schema::{Field, FieldEntry};
use crate::tokenizer::BoxedTokenizer;
use crate::tokenizer::FacetTokenizer;
use crate::tokenizer::{TokenStream, Tokenizer};
use crate::tokenizer::PreTokenizedStream;
use crate::tokenizer::{TokenStream, TokenStreamChain, Tokenizer};
use crate::DocId;
use crate::Opstamp;
use crate::Result;
@@ -158,26 +159,44 @@ impl SegmentWriter {
}
}
FieldType::Str(_) => {
let num_tokens = if let Some(ref mut tokenizer) =
self.tokenizers[field.field_id() as usize]
{
let texts: Vec<&str> = field_values
.iter()
.flat_map(|field_value| match *field_value.value() {
Value::Str(ref text) => Some(text.as_str()),
_ => None,
})
.collect();
if texts.is_empty() {
0
} else {
let mut token_stream = tokenizer.token_stream_texts(&texts[..]);
self.multifield_postings
.index_text(doc_id, field, &mut token_stream)
let mut token_streams: Vec<Box<dyn TokenStream>> = vec![];
let mut offsets = vec![];
let mut total_offset = 0;
for field_value in field_values {
match field_value.value() {
Value::PreTokStr(tok_str) => {
offsets.push(total_offset);
if let Some(last_token) = tok_str.tokens.last() {
total_offset += last_token.offset_to;
}
token_streams
.push(Box::new(PreTokenizedStream::from(tok_str.clone())));
}
Value::Str(ref text) => {
if let Some(ref mut tokenizer) =
self.tokenizers[field.field_id() as usize]
{
offsets.push(total_offset);
total_offset += text.len();
token_streams.push(tokenizer.token_stream(text));
}
}
_ => (),
}
} else {
}
let num_tokens = if token_streams.is_empty() {
0
} else {
let mut token_stream: Box<dyn TokenStream> =
Box::new(TokenStreamChain::new(offsets, token_streams));
self.multifield_postings
.index_text(doc_id, field, &mut token_stream)
};
self.fieldnorms_writer.record(doc_id, field, num_tokens);
}
FieldType::U64(ref int_option) => {

View File

@@ -162,6 +162,11 @@ pub struct IndexReader {
}
impl IndexReader {
#[cfg(test)]
pub(crate) fn index(&self) -> Index {
self.inner.index.clone()
}
/// Update searchers so that they reflect the state of the last
/// `.commit()`.
///

View File

@@ -167,7 +167,7 @@ mod tests {
use super::Pool;
use super::Queue;
use std::iter;
use std::{iter, mem};
#[test]
fn test_pool() {
@@ -197,33 +197,67 @@ mod tests {
fn test_pool_dont_panic_on_empty_pop() {
// When the object pool is exhausted, it shouldn't panic on pop()
use std::sync::Arc;
use std::{thread, time};
use std::thread;
// Wrap the pool in an Arc, same way as its used in `core/index.rs`
let pool = Arc::new(Pool::new());
let pool1 = Arc::new(Pool::new());
// clone pools outside the move scope of each new thread
let pool1 = Arc::clone(&pool);
let pool2 = Arc::clone(&pool);
let pool2 = Arc::clone(&pool1);
let pool3 = Arc::clone(&pool1);
let elements_for_pool = vec![1, 2];
pool.publish_new_generation(elements_for_pool);
pool1.publish_new_generation(elements_for_pool);
let mut threads = vec![];
let sleep_dur = time::Duration::from_millis(10);
// spawn one more thread than there are elements in the pool
let (start_1_send, start_1_recv) = crossbeam::bounded(0);
let (start_2_send, start_2_recv) = crossbeam::bounded(0);
let (start_3_send, start_3_recv) = crossbeam::bounded(0);
let (event_send1, event_recv) = crossbeam::unbounded();
let event_send2 = event_send1.clone();
let event_send3 = event_send1.clone();
threads.push(thread::spawn(move || {
// leasing to make sure it's not dropped before sleep is called
let _leased_searcher = &pool.acquire();
thread::sleep(sleep_dur);
}));
threads.push(thread::spawn(move || {
// leasing to make sure it's not dropped before sleep is called
assert_eq!(start_1_recv.recv(), Ok("start"));
let _leased_searcher = &pool1.acquire();
thread::sleep(sleep_dur);
assert!(event_send1.send("1 acquired").is_ok());
assert_eq!(start_1_recv.recv(), Ok("stop"));
assert!(event_send1.send("1 stopped").is_ok());
mem::drop(_leased_searcher);
}));
threads.push(thread::spawn(move || {
// leasing to make sure it's not dropped before sleep is called
assert_eq!(start_2_recv.recv(), Ok("start"));
let _leased_searcher = &pool2.acquire();
thread::sleep(sleep_dur);
assert!(event_send2.send("2 acquired").is_ok());
assert_eq!(start_2_recv.recv(), Ok("stop"));
mem::drop(_leased_searcher);
assert!(event_send2.send("2 stopped").is_ok());
}));
threads.push(thread::spawn(move || {
assert_eq!(start_3_recv.recv(), Ok("start"));
let _leased_searcher = &pool3.acquire();
assert!(event_send3.send("3 acquired").is_ok());
assert_eq!(start_3_recv.recv(), Ok("stop"));
mem::drop(_leased_searcher);
assert!(event_send3.send("3 stopped").is_ok());
}));
assert!(start_1_send.send("start").is_ok());
assert_eq!(event_recv.recv(), Ok("1 acquired"));
assert!(start_2_send.send("start").is_ok());
assert_eq!(event_recv.recv(), Ok("2 acquired"));
assert!(start_3_send.send("start").is_ok());
assert!(event_recv.try_recv().is_err());
assert!(start_1_send.send("stop").is_ok());
assert_eq!(event_recv.recv(), Ok("1 stopped"));
assert_eq!(event_recv.recv(), Ok("3 acquired"));
assert!(start_3_send.send("stop").is_ok());
assert_eq!(event_recv.recv(), Ok("3 stopped"));
assert!(start_2_send.send("stop").is_ok());
assert_eq!(event_recv.recv(), Ok("2 stopped"));
}
}

View File

@@ -1,6 +1,7 @@
use super::*;
use crate::common::BinarySerializable;
use crate::common::VInt;
use crate::tokenizer::PreTokenizedString;
use crate::DateTime;
use itertools::Itertools;
use std::io::{self, Read, Write};
@@ -29,8 +30,8 @@ impl From<Vec<FieldValue>> for Document {
impl PartialEq for Document {
fn eq(&self, other: &Document) -> bool {
// super slow, but only here for tests
let mut self_field_values = self.field_values.clone();
let mut other_field_values = other.field_values.clone();
let mut self_field_values: Vec<&_> = self.field_values.iter().collect();
let mut other_field_values: Vec<&_> = other.field_values.iter().collect();
self_field_values.sort();
other_field_values.sort();
self_field_values.eq(&other_field_values)
@@ -78,6 +79,16 @@ impl Document {
self.add(FieldValue::new(field, value));
}
/// Add a pre-tokenized text field.
pub fn add_pre_tokenized_text(
&mut self,
field: Field,
pre_tokenized_text: &PreTokenizedString,
) {
let value = Value::PreTokStr(pre_tokenized_text.clone());
self.add(FieldValue::new(field, value));
}
/// Add a u64 field
pub fn add_u64(&mut self, field: Field, value: u64) {
self.add(FieldValue::new(field, Value::U64(value)));

View File

@@ -15,6 +15,7 @@ impl Field {
}
/// Returns a u32 identifying uniquely a field within a schema.
#[allow(clippy::trivially_copy_pass_by_ref)]
pub fn field_id(&self) -> u32 {
self.0
}

View File

@@ -1,11 +1,11 @@
use base64::decode;
use crate::schema::{IntOptions, TextOptions};
use crate::schema::Facet;
use crate::schema::IndexRecordOption;
use crate::schema::TextFieldIndexing;
use crate::schema::Value;
use crate::schema::{IntOptions, TextOptions};
use crate::tokenizer::PreTokenizedString;
use serde_json::Value as JsonValue;
/// Possible error that may occur while parsing a field value
@@ -169,6 +169,28 @@ impl FieldType {
Err(ValueParsingError::TypeError(msg))
}
},
JsonValue::Object(_) => match *self {
FieldType::Str(_) => {
if let Ok(tok_str_val) =
serde_json::from_value::<PreTokenizedString>(json.clone())
{
Ok(Value::PreTokStr(tok_str_val))
} else {
let msg = format!(
"Json value {:?} cannot be translated to PreTokenizedString.",
json
);
Err(ValueParsingError::TypeError(msg))
}
}
_ => {
let msg = format!(
"Json value not supported error {:?}. Expected {:?}",
json, self
);
Err(ValueParsingError::TypeError(msg))
}
},
_ => {
let msg = format!(
"Json value not supported error {:?}. Expected {:?}",
@@ -184,7 +206,9 @@ impl FieldType {
mod tests {
use super::FieldType;
use crate::schema::field_type::ValueParsingError;
use crate::schema::TextOptions;
use crate::schema::Value;
use crate::tokenizer::{PreTokenizedString, Token};
#[test]
fn test_bytes_value_from_json() {
@@ -205,4 +229,71 @@ mod tests {
_ => panic!("Expected parse failure for invalid base64"),
}
}
#[test]
fn test_pre_tok_str_value_from_json() {
let pre_tokenized_string_json = r#"{
"text": "The Old Man",
"tokens": [
{
"offset_from": 0,
"offset_to": 3,
"position": 0,
"text": "The",
"position_length": 1
},
{
"offset_from": 4,
"offset_to": 7,
"position": 1,
"text": "Old",
"position_length": 1
},
{
"offset_from": 8,
"offset_to": 11,
"position": 2,
"text": "Man",
"position_length": 1
}
]
}"#;
let expected_value = Value::PreTokStr(PreTokenizedString {
text: String::from("The Old Man"),
tokens: vec![
Token {
offset_from: 0,
offset_to: 3,
position: 0,
text: String::from("The"),
position_length: 1,
},
Token {
offset_from: 4,
offset_to: 7,
position: 1,
text: String::from("Old"),
position_length: 1,
},
Token {
offset_from: 8,
offset_to: 11,
position: 2,
text: String::from("Man"),
position_length: 1,
},
],
});
let deserialized_value = FieldType::Str(TextOptions::default())
.value_from_json(&serde_json::from_str(pre_tokenized_string_json).unwrap())
.unwrap();
assert_eq!(deserialized_value, expected_value);
let serialized_value_json = serde_json::to_string_pretty(&expected_value).unwrap();
assert_eq!(serialized_value_json, pre_tokenized_string_json);
}
}

View File

@@ -1,4 +1,5 @@
use crate::schema::Facet;
use crate::tokenizer::PreTokenizedString;
use crate::DateTime;
use serde::de::Visitor;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
@@ -10,6 +11,8 @@ use std::{cmp::Ordering, fmt};
pub enum Value {
/// The str type is used for any text information.
Str(String),
/// Pre-tokenized str type,
PreTokStr(PreTokenizedString),
/// Unsigned 64-bits Integer `u64`
U64(u64),
/// Signed 64-bits Integer `i64`
@@ -29,6 +32,7 @@ impl Ord for Value {
fn cmp(&self, other: &Self) -> Ordering {
match (self, other) {
(Value::Str(l), Value::Str(r)) => l.cmp(r),
(Value::PreTokStr(l), Value::PreTokStr(r)) => l.cmp(r),
(Value::U64(l), Value::U64(r)) => l.cmp(r),
(Value::I64(l), Value::I64(r)) => l.cmp(r),
(Value::Date(l), Value::Date(r)) => l.cmp(r),
@@ -44,6 +48,8 @@ impl Ord for Value {
}
(Value::Str(_), _) => Ordering::Less,
(_, Value::Str(_)) => Ordering::Greater,
(Value::PreTokStr(_), _) => Ordering::Less,
(_, Value::PreTokStr(_)) => Ordering::Greater,
(Value::U64(_), _) => Ordering::Less,
(_, Value::U64(_)) => Ordering::Greater,
(Value::I64(_), _) => Ordering::Less,
@@ -65,6 +71,7 @@ impl Serialize for Value {
{
match *self {
Value::Str(ref v) => serializer.serialize_str(v),
Value::PreTokStr(ref v) => v.serialize(serializer),
Value::U64(u) => serializer.serialize_u64(u),
Value::I64(u) => serializer.serialize_i64(u),
Value::F64(u) => serializer.serialize_f64(u),
@@ -124,6 +131,15 @@ impl Value {
}
}
/// Returns the tokenized text, provided the value is of the `PreTokStr` type.
/// (Returns None if the value is not of the `PreTokStr` type).
pub fn tokenized_text(&self) -> Option<&PreTokenizedString> {
match *self {
Value::PreTokStr(ref tok_text) => Some(tok_text),
_ => None,
}
}
/// Returns the u64-value, provided the value is of the `U64` type.
///
/// # Panics
@@ -217,10 +233,17 @@ impl From<Vec<u8>> for Value {
}
}
impl From<PreTokenizedString> for Value {
fn from(pretokenized_string: PreTokenizedString) -> Value {
Value::PreTokStr(pretokenized_string)
}
}
mod binary_serialize {
use super::Value;
use crate::common::{f64_to_u64, u64_to_f64, BinarySerializable};
use crate::schema::Facet;
use crate::tokenizer::PreTokenizedString;
use chrono::{TimeZone, Utc};
use std::io::{self, Read, Write};
@@ -231,6 +254,11 @@ mod binary_serialize {
const BYTES_CODE: u8 = 4;
const DATE_CODE: u8 = 5;
const F64_CODE: u8 = 6;
const EXT_CODE: u8 = 7;
// extended types
const TOK_STR_CODE: u8 = 0;
impl BinarySerializable for Value {
fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
@@ -239,6 +267,18 @@ mod binary_serialize {
TEXT_CODE.serialize(writer)?;
text.serialize(writer)
}
Value::PreTokStr(ref tok_str) => {
EXT_CODE.serialize(writer)?;
TOK_STR_CODE.serialize(writer)?;
if let Ok(text) = serde_json::to_string(tok_str) {
text.serialize(writer)
} else {
Err(io::Error::new(
io::ErrorKind::Other,
"Failed to dump Value::PreTokStr(_) to json.",
))
}
}
Value::U64(ref val) => {
U64_CODE.serialize(writer)?;
val.serialize(writer)
@@ -290,6 +330,30 @@ mod binary_serialize {
}
HIERARCHICAL_FACET_CODE => Ok(Value::Facet(Facet::deserialize(reader)?)),
BYTES_CODE => Ok(Value::Bytes(Vec::<u8>::deserialize(reader)?)),
EXT_CODE => {
let ext_type_code = u8::deserialize(reader)?;
match ext_type_code {
TOK_STR_CODE => {
let str_val = String::deserialize(reader)?;
if let Ok(value) = serde_json::from_str::<PreTokenizedString>(&str_val)
{
Ok(Value::PreTokStr(value))
} else {
Err(io::Error::new(
io::ErrorKind::Other,
"Failed to parse string data as Value::PreTokStr(_).",
))
}
}
_ => Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"No extened field type is associated with code {:?}",
ext_type_code
),
)),
}
}
_ => Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("No field type is associated with code {:?}", type_code),

View File

@@ -331,9 +331,8 @@ mod tests {
use std::collections::BTreeMap;
use std::iter::Iterator;
const TEST_TEXT: &'static str =
r#"Rust is a systems programming language sponsored by Mozilla which
describes it as a "safe, concurrent, practical language", supporting functional and
const TEST_TEXT: &'static str = r#"Rust is a systems programming language sponsored by
Mozilla which describes it as a "safe, concurrent, practical language", supporting functional and
imperative-procedural paradigms. Rust is syntactically similar to C++[according to whom?],
but its designers intend it to provide better memory safety while still maintaining
performance.
@@ -363,13 +362,13 @@ Survey in 2016, 2017, and 2018."#;
let snippet = select_best_fragment_combination(&fragments[..], &TEST_TEXT);
assert_eq!(
snippet.fragments,
"Rust is a systems programming language sponsored by \
Mozilla which\ndescribes it as a \"safe"
"Rust is a systems programming language sponsored by\n\
Mozilla which describes it as a \"safe"
);
assert_eq!(
snippet.to_html(),
"<b>Rust</b> is a systems programming <b>language</b> \
sponsored by Mozilla which\ndescribes it as a &quot;safe"
sponsored by\nMozilla which describes it as a &quot;safe"
)
}

View File

@@ -136,6 +136,7 @@ mod simple_tokenizer;
mod stemmer;
mod stop_word_filter;
mod token_stream_chain;
mod tokenized_string;
mod tokenizer;
mod tokenizer_manager;
@@ -152,7 +153,9 @@ pub use self::stop_word_filter::StopWordFilter;
pub(crate) use self::token_stream_chain::TokenStreamChain;
pub use self::tokenizer::BoxedTokenizer;
pub use self::tokenized_string::{PreTokenizedStream, PreTokenizedString};
pub use self::tokenizer::{Token, TokenFilter, TokenStream, Tokenizer};
pub use self::tokenizer_manager::TokenizerManager;
/// Maximum authorized len (in bytes) for a token.

View File

@@ -0,0 +1,189 @@
use crate::tokenizer::{Token, TokenStream, TokenStreamChain};
use std::cmp::Ordering;
/// Struct representing pre-tokenized text
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct PreTokenizedString {
/// Original text
pub text: String,
/// Tokens derived from the text
pub tokens: Vec<Token>,
}
impl Ord for PreTokenizedString {
fn cmp(&self, other: &Self) -> Ordering {
self.text.cmp(&other.text)
}
}
impl PartialOrd for PreTokenizedString {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
/// TokenStream implementation which wraps PreTokenizedString
pub struct PreTokenizedStream {
tokenized_string: PreTokenizedString,
current_token: i64,
}
impl From<PreTokenizedString> for PreTokenizedStream {
fn from(s: PreTokenizedString) -> PreTokenizedStream {
PreTokenizedStream {
tokenized_string: s,
current_token: -1,
}
}
}
impl PreTokenizedStream {
/// Creates a TokenStream from PreTokenizedString array
pub fn chain_tokenized_strings<'a>(
tok_strings: &'a [&'a PreTokenizedString],
) -> Box<dyn TokenStream + 'a> {
if tok_strings.len() == 1 {
Box::new(PreTokenizedStream::from((*tok_strings[0]).clone()))
} else {
let mut offsets = vec![];
let mut total_offset = 0;
for &tok_string in tok_strings {
offsets.push(total_offset);
if let Some(last_token) = tok_string.tokens.last() {
total_offset += last_token.offset_to;
}
}
let token_streams: Vec<_> = tok_strings
.iter()
.map(|tok_string| PreTokenizedStream::from((*tok_string).clone()))
.collect();
Box::new(TokenStreamChain::new(offsets, token_streams))
}
}
}
impl TokenStream for PreTokenizedStream {
fn advance(&mut self) -> bool {
self.current_token += 1;
self.current_token < self.tokenized_string.tokens.len() as i64
}
fn token(&self) -> &Token {
assert!(
self.current_token >= 0,
"TokenStream not initialized. You should call advance() at least once."
);
&self.tokenized_string.tokens[self.current_token as usize]
}
fn token_mut(&mut self) -> &mut Token {
assert!(
self.current_token >= 0,
"TokenStream not initialized. You should call advance() at least once."
);
&mut self.tokenized_string.tokens[self.current_token as usize]
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::tokenizer::Token;
#[test]
fn test_tokenized_stream() {
let tok_text = PreTokenizedString {
text: String::from("A a"),
tokens: vec![
Token {
offset_from: 0,
offset_to: 1,
position: 0,
text: String::from("A"),
position_length: 1,
},
Token {
offset_from: 2,
offset_to: 3,
position: 1,
text: String::from("a"),
position_length: 1,
},
],
};
let mut token_stream = PreTokenizedStream::from(tok_text.clone());
for expected_token in tok_text.tokens {
assert!(token_stream.advance());
assert_eq!(token_stream.token(), &expected_token);
}
assert!(!token_stream.advance());
}
#[test]
fn test_chain_tokenized_strings() {
let tok_text = PreTokenizedString {
text: String::from("A a"),
tokens: vec![
Token {
offset_from: 0,
offset_to: 1,
position: 0,
text: String::from("A"),
position_length: 1,
},
Token {
offset_from: 2,
offset_to: 3,
position: 1,
text: String::from("a"),
position_length: 1,
},
],
};
let chain_parts = vec![&tok_text, &tok_text];
let mut token_stream = PreTokenizedStream::chain_tokenized_strings(&chain_parts[..]);
let expected_tokens = vec![
Token {
offset_from: 0,
offset_to: 1,
position: 0,
text: String::from("A"),
position_length: 1,
},
Token {
offset_from: 2,
offset_to: 3,
position: 1,
text: String::from("a"),
position_length: 1,
},
Token {
offset_from: 3,
offset_to: 4,
position: 3,
text: String::from("A"),
position_length: 1,
},
Token {
offset_from: 5,
offset_to: 6,
position: 4,
text: String::from("a"),
position_length: 1,
},
];
for expected_token in expected_tokens {
assert!(token_stream.advance());
assert_eq!(token_stream.token(), &expected_token);
}
assert!(!token_stream.advance());
}
}

View File

@@ -4,7 +4,7 @@ use crate::tokenizer::TokenStreamChain;
use std::borrow::{Borrow, BorrowMut};
/// Token
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct Token {
/// Offset (byte index) of the first character of the token.
/// Offsets shall not be modified by token filters.

View File

@@ -28,11 +28,11 @@ fn test_failpoints_managed_directory_gc_if_delete_fails() {
// The initial 1*off is there to allow for the removal of the
// lock file.
fail::cfg("RAMDirectory::delete", "1*off->1*return").unwrap();
managed_directory.garbage_collect(Default::default);
assert!(managed_directory.garbage_collect(Default::default).is_ok());
assert!(managed_directory.exists(test_path));
// running the gc a second time should remove the file.
managed_directory.garbage_collect(Default::default);
assert!(managed_directory.garbage_collect(Default::default).is_ok());
assert!(
!managed_directory.exists(test_path),
"The file should have been deleted"