This commit is contained in:
Paul Masurel
2018-01-04 12:08:34 +09:00
parent f7b0392bd5
commit ef109927b3
11 changed files with 82 additions and 110 deletions

View File

@@ -20,10 +20,7 @@ fn main() {
}
}
fn run_example(index_path: &Path) -> tantivy::Result<()> {
// # Defining the schema
//
// The Tantivy index requires a very strict schema.
@@ -31,7 +28,6 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> {
// and for each field, its type and "the way it should
// be indexed".
// first we need to define a schema ...
let mut schema_builder = SchemaBuilder::default();
@@ -62,8 +58,6 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> {
let schema = schema_builder.build();
// # Indexing documents
//
// Let's create a brand new index.
@@ -72,7 +66,6 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> {
// with our schema in the directory.
let index = Index::create(index_path, schema.clone())?;
// To insert document we need an index writer.
// There must be only one writer at a time.
// This single `IndexWriter` is already
@@ -85,7 +78,6 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> {
// Let's index our documents!
// We first need a handle on the title and the body field.
// ### Create a document "manually".
//
// We can create a document manually, by setting the fields
@@ -98,7 +90,7 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> {
old_man_doc.add_text(
body,
"He was an old man who fished alone in a skiff in the Gulf Stream and \
he had gone eighty-four days now without taking a fish.",
he had gone eighty-four days now without taking a fish.",
);
// ... and add it to the `IndexWriter`.
@@ -145,7 +137,6 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> {
// Indexing 5 million articles of the English wikipedia takes
// around 4 minutes on my computer!
// ### Committing
//
// At this point our documents are not searchable.
@@ -167,7 +158,6 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> {
// tantivy behaves as if has rolled back to its last
// commit.
// # Searching
//
// Let's search our index. Start by reloading
@@ -192,7 +182,6 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> {
// A ticket has been opened regarding this problem.
let query = query_parser.parse_query("sea whale")?;
// A query defines a set of documents, as
// well as the way they should be scored.
//

View File

@@ -133,8 +133,7 @@ where
addr + 8 <= data.len(),
"The fast field field should have been padded with 7 bytes."
);
let val_unshifted_unmasked: u64 =
unsafe { *(data[addr..].as_ptr() as *const u64) };
let val_unshifted_unmasked: u64 = unsafe { *(data[addr..].as_ptr() as *const u64) };
let val_shifted = (val_unshifted_unmasked >> bit_shift) as u64;
(val_shifted & mask)
} else {
@@ -165,8 +164,7 @@ where
for output_val in output.iter_mut() {
let addr = addr_in_bits >> 3;
let bit_shift = addr_in_bits & 7;
let val_unshifted_unmasked: u64 =
unsafe { *(data[addr..].as_ptr() as *const u64) };
let val_unshifted_unmasked: u64 = unsafe { *(data[addr..].as_ptr() as *const u64) };
let val_shifted = (val_unshifted_unmasked >> bit_shift) as u64;
*output_val = val_shifted & mask;
addr_in_bits += num_bits;

View File

@@ -25,9 +25,7 @@ fn compress_sorted(vals: &[u32], output: &mut [u8], offset: u32) -> usize {
}
fn uncompress_sorted(compressed_data: &[u8], output: &mut [u32], offset: u32) -> usize {
unsafe {
simdcomp::uncompress_sorted(compressed_data.as_ptr(), output.as_mut_ptr(), offset)
}
unsafe { simdcomp::uncompress_sorted(compressed_data.as_ptr(), output.as_mut_ptr(), offset) }
}
fn compress_unsorted(vals: &[u32], output: &mut [u8]) -> usize {

View File

@@ -114,7 +114,7 @@ impl Index {
Index::create_from_metas(directory, &metas)
}
pub(crate) fn load_metas(&self) -> Result<IndexMeta> {
pub fn load_metas(&self) -> Result<IndexMeta> {
load_metas(self.directory())
}

View File

@@ -14,8 +14,7 @@ pub struct IndexMeta {
pub segments: Vec<SegmentMeta>,
pub schema: Schema,
pub opstamp: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub payload: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")] pub payload: Option<String>,
}
impl IndexMeta {
@@ -29,14 +28,12 @@ impl IndexMeta {
}
}
#[cfg(test)]
mod tests {
use serde_json;
use super::IndexMeta;
use schema::{TEXT, SchemaBuilder};
use schema::{SchemaBuilder, TEXT};
#[test]
fn test_serialize_metas() {
@@ -49,9 +46,9 @@ mod tests {
segments: Vec::new(),
schema: schema,
opstamp: 0u64,
payload: None
payload: None,
};
let json = serde_json::ser::to_string(&index_metas).expect("serialization failed");
assert_eq!(json, r#"{"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","tokenizer":"default"},"stored":false}}],"opstamp":0}"#);
}
}
}

View File

@@ -59,9 +59,7 @@ mod murmurhash2 {
/// Returns (the heap size in bytes, the hash table size in number of bits)
pub(crate) fn split_memory(per_thread_memory_budget: usize) -> (usize, usize) {
let table_size_limit: usize = per_thread_memory_budget / 3;
let compute_table_size = |num_bits: usize| {
(1 << num_bits) * mem::size_of::<KeyValue>()
};
let compute_table_size = |num_bits: usize| (1 << num_bits) * mem::size_of::<KeyValue>();
let table_num_bits: usize = (1..)
.into_iter()
.take_while(|num_bits: &usize| compute_table_size(*num_bits) < table_size_limit)

View File

@@ -173,10 +173,8 @@ pub fn compute_deleted_bitset(
// that may be affected by the delete operation.
let limit_doc = doc_opstamps.compute_doc_limit(delete_op.opstamp);
let inverted_index = segment_reader.inverted_index(delete_op.term.field());
if let Some(mut docset) = inverted_index.read_postings(
&delete_op.term,
IndexRecordOption::Basic,
)
if let Some(mut docset) =
inverted_index.read_postings(&delete_op.term, IndexRecordOption::Basic)
{
while docset.advance() {
let deleted_doc = docset.doc();
@@ -338,15 +336,13 @@ impl IndexWriter {
join_handle
.join()
.expect("Indexing Worker thread panicked")
.chain_err(|| {
ErrorKind::ErrorInThread("Error in indexing worker thread.".into())
})?;
.chain_err(|| ErrorKind::ErrorInThread("Error in indexing worker thread.".into()))?;
}
drop(self.workers_join_handle);
let result = self.segment_updater.wait_merging_thread().chain_err(|| {
ErrorKind::ErrorInThread("Failed to join merging thread.".into())
});
let result = self.segment_updater
.wait_merging_thread()
.chain_err(|| ErrorKind::ErrorInThread("Failed to join merging thread.".into()));
if let Err(ref e) = result {
error!("Some merging thread failed {:?}", e);
@@ -359,10 +355,8 @@ impl IndexWriter {
pub fn add_segment(&mut self, segment_meta: SegmentMeta) {
let delete_cursor = self.delete_queue.cursor();
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor, None);
self.segment_updater.add_segment(
self.generation,
segment_entry,
);
self.segment_updater
.add_segment(self.generation, segment_entry);
}
#[doc(hidden)]
@@ -388,8 +382,7 @@ impl IndexWriter {
let join_handle: JoinHandle<Result<()>> = thread::Builder::new()
.name(format!(
"indexing thread {} for gen {}",
self.worker_id,
generation
self.worker_id, generation
))
.spawn(move || {
loop {
@@ -469,9 +462,10 @@ impl IndexWriter {
///
/// Returns the former segment_ready channel.
fn recreate_document_channel(&mut self) -> DocumentReceiver {
let (mut document_sender, mut document_receiver): (DocumentSender,
DocumentReceiver) =
chan::sync(PIPELINE_MAX_SIZE_IN_DOCS);
let (mut document_sender, mut document_receiver): (
DocumentSender,
DocumentReceiver,
) = chan::sync(PIPELINE_MAX_SIZE_IN_DOCS);
swap(&mut self.document_sender, &mut document_sender);
swap(&mut self.document_receiver, &mut document_receiver);
document_receiver
@@ -495,9 +489,9 @@ impl IndexWriter {
let document_receiver = self.document_receiver.clone();
// take the directory lock to create a new index_writer.
let directory_lock = self._directory_lock.take().expect(
"The IndexWriter does not have any lock. This is a bug, please report.",
);
let directory_lock = self._directory_lock
.take()
.expect("The IndexWriter does not have any lock. This is a bug, please report.");
let new_index_writer: IndexWriter = open_index_writer(
&self.index,
@@ -522,7 +516,6 @@ impl IndexWriter {
Ok(())
}
/// Prepares a commit.
///
/// Calling `prepare_commit()` will cut the indexing
@@ -555,7 +548,7 @@ impl IndexWriter {
// This will move uncommitted segments to the state of
// committed segments.
info!("committing {}", self.committed_opstamp);
info!("Preparing commit");
// this will drop the current document channel
// and recreate a new one channels.
@@ -568,9 +561,9 @@ impl IndexWriter {
);
for worker_handle in former_workers_join_handle {
let indexing_worker_result = worker_handle.join().map_err(|e| {
Error::from_kind(ErrorKind::ErrorInThread(format!("{:?}", e)))
})?;
let indexing_worker_result = worker_handle
.join()
.map_err(|e| Error::from_kind(ErrorKind::ErrorInThread(format!("{:?}", e))))?;
indexing_worker_result?;
// add a new worker for the next generation.
@@ -579,7 +572,7 @@ impl IndexWriter {
let commit_opstamp = self.stamper.stamp();
let prepared_commit = PreparedCommit::new(self, commit_opstamp);
info!("Preparing commit {}", commit_opstamp);
info!("Prepared commit {}", commit_opstamp);
Ok(prepared_commit)
}
@@ -773,9 +766,9 @@ mod tests {
}
// this should create 8 segments and trigger a merge.
index_writer.commit().expect("commit failed");
index_writer.wait_merging_threads().expect(
"waiting merging thread failed",
);
index_writer
.wait_merging_threads()
.expect("waiting merging thread failed");
index.load_searchers().unwrap();
assert_eq!(num_docs_containing("a"), 200);

View File

@@ -31,10 +31,9 @@ impl<'a> PreparedCommit<'a> {
pub fn commit(self) -> Result<u64> {
info!("committing {}", self.opstamp);
self.index_writer.segment_updater().commit(
self.opstamp,
self.payload,
)?;
self.index_writer
.segment_updater()
.commit(self.opstamp, self.payload)?;
Ok(self.opstamp)
}
}

View File

@@ -102,8 +102,7 @@ fn perform_merge(
let mut file_protections: Vec<FileProtection> = vec![];
for segment_id in segment_ids {
if let Some(mut segment_entry) =
segment_updater.0.segment_manager.segment_entry(segment_id)
if let Some(mut segment_entry) = segment_updater.0.segment_manager.segment_entry(segment_id)
{
let segment = index.segment(segment_entry.meta().clone());
if let Some(file_protection) =
@@ -135,13 +134,12 @@ fn perform_merge(
// ... we just serialize this index merger in our new segment
// to merge the two segments.
let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment).expect(
"Creating index serializer failed",
);
let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment)
.expect("Creating index serializer failed");
let num_docs = merger.write(segment_serializer).expect(
"Serializing merged index failed",
);
let num_docs = merger
.write(segment_serializer)
.expect("Serializing merged index failed");
let mut segment_meta = SegmentMeta::new(merged_segment.id());
segment_meta.set_max_doc(num_docs);
@@ -266,20 +264,22 @@ impl SegmentUpdater {
fn garbage_collect_files_exec(&self) {
info!("Running garbage collection");
let mut index = self.0.index.clone();
index.directory_mut().garbage_collect(
|| self.0.segment_manager.list_files(),
);
index
.directory_mut()
.garbage_collect(|| self.0.segment_manager.list_files());
}
pub fn commit(&self, opstamp: u64, payload: Option<String>) -> Result<()> {
self.run_async(move |segment_updater| if segment_updater.is_alive() {
let segment_entries = segment_updater.purge_deletes(opstamp).expect(
"Failed purge deletes",
);
segment_updater.0.segment_manager.commit(segment_entries);
segment_updater.save_metas(opstamp, payload);
segment_updater.garbage_collect_files_exec();
segment_updater.consider_merge_options();
self.run_async(move |segment_updater| {
if segment_updater.is_alive() {
let segment_entries = segment_updater
.purge_deletes(opstamp)
.expect("Failed purge deletes");
segment_updater.0.segment_manager.commit(segment_entries);
segment_updater.save_metas(opstamp, payload);
segment_updater.garbage_collect_files_exec();
segment_updater.consider_merge_options();
}
}).wait()
}
@@ -343,10 +343,11 @@ impl SegmentUpdater {
.remove(&merging_thread_id);
Ok(())
});
self.0.merging_threads.write().unwrap().insert(
merging_thread_id,
merging_join_handle,
);
self.0
.merging_threads
.write()
.unwrap()
.insert(merging_thread_id, merging_join_handle);
merging_future_recv
}
@@ -369,10 +370,9 @@ impl SegmentUpdater {
before_merge_segment_ids: &[SegmentId],
after_merge_segment_entry: SegmentId,
) {
self.0.segment_manager.cancel_merge(
before_merge_segment_ids,
after_merge_segment_entry,
);
self.0
.segment_manager
.cancel_merge(before_merge_segment_ids, after_merge_segment_entry);
}
fn end_merge(
@@ -385,9 +385,10 @@ impl SegmentUpdater {
let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone();
let mut _file_protection_opt = None;
if let Some(delete_operation) = delete_cursor.get() {
let committed_opstamp = segment_updater.0
.index.load_metas()
let committed_opstamp = segment_updater
.0
.index
.load_metas()
.expect("Failed to read opstamp")
.opstamp;
if delete_operation.opstamp < committed_opstamp {
@@ -404,8 +405,7 @@ impl SegmentUpdater {
Err(e) => {
error!(
"Merge of {:?} was cancelled (advancing deletes failed): {:?}",
before_merge_segment_ids,
e
before_merge_segment_ids, e
);
// ... cancel merge
if cfg!(test) {
@@ -420,10 +420,10 @@ impl SegmentUpdater {
}
}
}
segment_updater.0.segment_manager.end_merge(
&before_merge_segment_ids,
after_merge_segment_entry,
);
segment_updater
.0
.segment_manager
.end_merge(&before_merge_segment_ids, after_merge_segment_entry);
segment_updater.consider_merge_options();
info!("save metas");
let previous_metas = segment_updater.0.index.load_metas().unwrap();
@@ -459,9 +459,10 @@ impl SegmentUpdater {
}
debug!("wait merging thread {}", new_merging_threads.len());
for (_, merging_thread_handle) in new_merging_threads {
merging_thread_handle.join().map(|_| ()).map_err(|_| {
ErrorKind::ErrorInThread("Merging thread failed.".into())
})?;
merging_thread_handle
.join()
.map(|_| ())
.map_err(|_| ErrorKind::ErrorInThread("Merging thread failed.".into()))?;
}
// Our merging thread may have queued their completed
self.run_async(move |_| {}).wait()?;
@@ -527,9 +528,9 @@ mod tests {
assert_eq!(index.searcher().num_docs(), 302);
{
index_writer.wait_merging_threads().expect(
"waiting for merging threads",
);
index_writer
.wait_merging_threads()
.expect("waiting for merging threads");
}
index.load_searchers().unwrap();

View File

@@ -715,8 +715,7 @@ mod tests {
let mut schema_builder = SchemaBuilder::default();
let text_field = schema_builder.add_text_field("text", TEXT);
let other_text_field = schema_builder.add_text_field("text2", TEXT);
let document =
doc!(text_field => "tantivy",
let document = doc!(text_field => "tantivy",
text_field => "some other value",
other_text_field => "short");
assert_eq!(document.len(), 3);

View File

@@ -373,7 +373,7 @@ mod test {
#[test]
pub fn test_parse_nonindexed_field_yields_error() {
let query_parser = make_query_parser();
let query_parser = make_query_parser();
let is_not_indexed_err = |query: &str| {
let result: Result<Box<Query>, QueryParserError> = query_parser.parse_query(query);