From ef109927b31184abeeef957e8715ae4024312267 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Thu, 4 Jan 2018 12:08:34 +0900 Subject: [PATCH] rustfmt --- examples/simple_search.rs | 13 +-- src/common/bitpacker.rs | 6 +- src/compression/pack/compression_pack_simd.rs | 4 +- src/core/index.rs | 2 +- src/core/index_meta.rs | 11 +-- src/datastruct/stacker/hashmap.rs | 4 +- src/indexer/index_writer.rs | 55 ++++++------ src/indexer/prepared_commit.rs | 7 +- src/indexer/segment_updater.rs | 85 ++++++++++--------- src/lib.rs | 3 +- src/query/query_parser/query_parser.rs | 2 +- 11 files changed, 82 insertions(+), 110 deletions(-) diff --git a/examples/simple_search.rs b/examples/simple_search.rs index 38071695a..316d503cb 100644 --- a/examples/simple_search.rs +++ b/examples/simple_search.rs @@ -20,10 +20,7 @@ fn main() { } } - fn run_example(index_path: &Path) -> tantivy::Result<()> { - - // # Defining the schema // // The Tantivy index requires a very strict schema. @@ -31,7 +28,6 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> { // and for each field, its type and "the way it should // be indexed". - // first we need to define a schema ... let mut schema_builder = SchemaBuilder::default(); @@ -62,8 +58,6 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> { let schema = schema_builder.build(); - - // # Indexing documents // // Let's create a brand new index. @@ -72,7 +66,6 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> { // with our schema in the directory. let index = Index::create(index_path, schema.clone())?; - // To insert document we need an index writer. // There must be only one writer at a time. // This single `IndexWriter` is already @@ -85,7 +78,6 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> { // Let's index our documents! // We first need a handle on the title and the body field. - // ### Create a document "manually". // // We can create a document manually, by setting the fields @@ -98,7 +90,7 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> { old_man_doc.add_text( body, "He was an old man who fished alone in a skiff in the Gulf Stream and \ - he had gone eighty-four days now without taking a fish.", + he had gone eighty-four days now without taking a fish.", ); // ... and add it to the `IndexWriter`. @@ -145,7 +137,6 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> { // Indexing 5 million articles of the English wikipedia takes // around 4 minutes on my computer! - // ### Committing // // At this point our documents are not searchable. @@ -167,7 +158,6 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> { // tantivy behaves as if has rolled back to its last // commit. - // # Searching // // Let's search our index. Start by reloading @@ -192,7 +182,6 @@ fn run_example(index_path: &Path) -> tantivy::Result<()> { // A ticket has been opened regarding this problem. let query = query_parser.parse_query("sea whale")?; - // A query defines a set of documents, as // well as the way they should be scored. // diff --git a/src/common/bitpacker.rs b/src/common/bitpacker.rs index 9f9b4125b..30142f8f5 100644 --- a/src/common/bitpacker.rs +++ b/src/common/bitpacker.rs @@ -133,8 +133,7 @@ where addr + 8 <= data.len(), "The fast field field should have been padded with 7 bytes." ); - let val_unshifted_unmasked: u64 = - unsafe { *(data[addr..].as_ptr() as *const u64) }; + let val_unshifted_unmasked: u64 = unsafe { *(data[addr..].as_ptr() as *const u64) }; let val_shifted = (val_unshifted_unmasked >> bit_shift) as u64; (val_shifted & mask) } else { @@ -165,8 +164,7 @@ where for output_val in output.iter_mut() { let addr = addr_in_bits >> 3; let bit_shift = addr_in_bits & 7; - let val_unshifted_unmasked: u64 = - unsafe { *(data[addr..].as_ptr() as *const u64) }; + let val_unshifted_unmasked: u64 = unsafe { *(data[addr..].as_ptr() as *const u64) }; let val_shifted = (val_unshifted_unmasked >> bit_shift) as u64; *output_val = val_shifted & mask; addr_in_bits += num_bits; diff --git a/src/compression/pack/compression_pack_simd.rs b/src/compression/pack/compression_pack_simd.rs index 2db372630..2a900e9ed 100644 --- a/src/compression/pack/compression_pack_simd.rs +++ b/src/compression/pack/compression_pack_simd.rs @@ -25,9 +25,7 @@ fn compress_sorted(vals: &[u32], output: &mut [u8], offset: u32) -> usize { } fn uncompress_sorted(compressed_data: &[u8], output: &mut [u32], offset: u32) -> usize { - unsafe { - simdcomp::uncompress_sorted(compressed_data.as_ptr(), output.as_mut_ptr(), offset) - } + unsafe { simdcomp::uncompress_sorted(compressed_data.as_ptr(), output.as_mut_ptr(), offset) } } fn compress_unsorted(vals: &[u32], output: &mut [u8]) -> usize { diff --git a/src/core/index.rs b/src/core/index.rs index 2ec390e45..0ba36cd08 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -114,7 +114,7 @@ impl Index { Index::create_from_metas(directory, &metas) } - pub(crate) fn load_metas(&self) -> Result { + pub fn load_metas(&self) -> Result { load_metas(self.directory()) } diff --git a/src/core/index_meta.rs b/src/core/index_meta.rs index 636117166..a7c11ea88 100644 --- a/src/core/index_meta.rs +++ b/src/core/index_meta.rs @@ -14,8 +14,7 @@ pub struct IndexMeta { pub segments: Vec, pub schema: Schema, pub opstamp: u64, - #[serde(skip_serializing_if = "Option::is_none")] - pub payload: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub payload: Option, } impl IndexMeta { @@ -29,14 +28,12 @@ impl IndexMeta { } } - #[cfg(test)] mod tests { use serde_json; use super::IndexMeta; - use schema::{TEXT, SchemaBuilder}; - + use schema::{SchemaBuilder, TEXT}; #[test] fn test_serialize_metas() { @@ -49,9 +46,9 @@ mod tests { segments: Vec::new(), schema: schema, opstamp: 0u64, - payload: None + payload: None, }; let json = serde_json::ser::to_string(&index_metas).expect("serialization failed"); assert_eq!(json, r#"{"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","tokenizer":"default"},"stored":false}}],"opstamp":0}"#); } -} \ No newline at end of file +} diff --git a/src/datastruct/stacker/hashmap.rs b/src/datastruct/stacker/hashmap.rs index e7d18f1b2..475a706f0 100644 --- a/src/datastruct/stacker/hashmap.rs +++ b/src/datastruct/stacker/hashmap.rs @@ -59,9 +59,7 @@ mod murmurhash2 { /// Returns (the heap size in bytes, the hash table size in number of bits) pub(crate) fn split_memory(per_thread_memory_budget: usize) -> (usize, usize) { let table_size_limit: usize = per_thread_memory_budget / 3; - let compute_table_size = |num_bits: usize| { - (1 << num_bits) * mem::size_of::() - }; + let compute_table_size = |num_bits: usize| (1 << num_bits) * mem::size_of::(); let table_num_bits: usize = (1..) .into_iter() .take_while(|num_bits: &usize| compute_table_size(*num_bits) < table_size_limit) diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 23b2dee4f..35b9d4459 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -173,10 +173,8 @@ pub fn compute_deleted_bitset( // that may be affected by the delete operation. let limit_doc = doc_opstamps.compute_doc_limit(delete_op.opstamp); let inverted_index = segment_reader.inverted_index(delete_op.term.field()); - if let Some(mut docset) = inverted_index.read_postings( - &delete_op.term, - IndexRecordOption::Basic, - ) + if let Some(mut docset) = + inverted_index.read_postings(&delete_op.term, IndexRecordOption::Basic) { while docset.advance() { let deleted_doc = docset.doc(); @@ -338,15 +336,13 @@ impl IndexWriter { join_handle .join() .expect("Indexing Worker thread panicked") - .chain_err(|| { - ErrorKind::ErrorInThread("Error in indexing worker thread.".into()) - })?; + .chain_err(|| ErrorKind::ErrorInThread("Error in indexing worker thread.".into()))?; } drop(self.workers_join_handle); - let result = self.segment_updater.wait_merging_thread().chain_err(|| { - ErrorKind::ErrorInThread("Failed to join merging thread.".into()) - }); + let result = self.segment_updater + .wait_merging_thread() + .chain_err(|| ErrorKind::ErrorInThread("Failed to join merging thread.".into())); if let Err(ref e) = result { error!("Some merging thread failed {:?}", e); @@ -359,10 +355,8 @@ impl IndexWriter { pub fn add_segment(&mut self, segment_meta: SegmentMeta) { let delete_cursor = self.delete_queue.cursor(); let segment_entry = SegmentEntry::new(segment_meta, delete_cursor, None); - self.segment_updater.add_segment( - self.generation, - segment_entry, - ); + self.segment_updater + .add_segment(self.generation, segment_entry); } #[doc(hidden)] @@ -388,8 +382,7 @@ impl IndexWriter { let join_handle: JoinHandle> = thread::Builder::new() .name(format!( "indexing thread {} for gen {}", - self.worker_id, - generation + self.worker_id, generation )) .spawn(move || { loop { @@ -469,9 +462,10 @@ impl IndexWriter { /// /// Returns the former segment_ready channel. fn recreate_document_channel(&mut self) -> DocumentReceiver { - let (mut document_sender, mut document_receiver): (DocumentSender, - DocumentReceiver) = - chan::sync(PIPELINE_MAX_SIZE_IN_DOCS); + let (mut document_sender, mut document_receiver): ( + DocumentSender, + DocumentReceiver, + ) = chan::sync(PIPELINE_MAX_SIZE_IN_DOCS); swap(&mut self.document_sender, &mut document_sender); swap(&mut self.document_receiver, &mut document_receiver); document_receiver @@ -495,9 +489,9 @@ impl IndexWriter { let document_receiver = self.document_receiver.clone(); // take the directory lock to create a new index_writer. - let directory_lock = self._directory_lock.take().expect( - "The IndexWriter does not have any lock. This is a bug, please report.", - ); + let directory_lock = self._directory_lock + .take() + .expect("The IndexWriter does not have any lock. This is a bug, please report."); let new_index_writer: IndexWriter = open_index_writer( &self.index, @@ -522,7 +516,6 @@ impl IndexWriter { Ok(()) } - /// Prepares a commit. /// /// Calling `prepare_commit()` will cut the indexing @@ -555,7 +548,7 @@ impl IndexWriter { // This will move uncommitted segments to the state of // committed segments. - info!("committing {}", self.committed_opstamp); + info!("Preparing commit"); // this will drop the current document channel // and recreate a new one channels. @@ -568,9 +561,9 @@ impl IndexWriter { ); for worker_handle in former_workers_join_handle { - let indexing_worker_result = worker_handle.join().map_err(|e| { - Error::from_kind(ErrorKind::ErrorInThread(format!("{:?}", e))) - })?; + let indexing_worker_result = worker_handle + .join() + .map_err(|e| Error::from_kind(ErrorKind::ErrorInThread(format!("{:?}", e))))?; indexing_worker_result?; // add a new worker for the next generation. @@ -579,7 +572,7 @@ impl IndexWriter { let commit_opstamp = self.stamper.stamp(); let prepared_commit = PreparedCommit::new(self, commit_opstamp); - info!("Preparing commit {}", commit_opstamp); + info!("Prepared commit {}", commit_opstamp); Ok(prepared_commit) } @@ -773,9 +766,9 @@ mod tests { } // this should create 8 segments and trigger a merge. index_writer.commit().expect("commit failed"); - index_writer.wait_merging_threads().expect( - "waiting merging thread failed", - ); + index_writer + .wait_merging_threads() + .expect("waiting merging thread failed"); index.load_searchers().unwrap(); assert_eq!(num_docs_containing("a"), 200); diff --git a/src/indexer/prepared_commit.rs b/src/indexer/prepared_commit.rs index 7853d2130..02c79ed8b 100644 --- a/src/indexer/prepared_commit.rs +++ b/src/indexer/prepared_commit.rs @@ -31,10 +31,9 @@ impl<'a> PreparedCommit<'a> { pub fn commit(self) -> Result { info!("committing {}", self.opstamp); - self.index_writer.segment_updater().commit( - self.opstamp, - self.payload, - )?; + self.index_writer + .segment_updater() + .commit(self.opstamp, self.payload)?; Ok(self.opstamp) } } diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index bf4a9cc08..6dc54a4dd 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -102,8 +102,7 @@ fn perform_merge( let mut file_protections: Vec = vec![]; for segment_id in segment_ids { - if let Some(mut segment_entry) = - segment_updater.0.segment_manager.segment_entry(segment_id) + if let Some(mut segment_entry) = segment_updater.0.segment_manager.segment_entry(segment_id) { let segment = index.segment(segment_entry.meta().clone()); if let Some(file_protection) = @@ -135,13 +134,12 @@ fn perform_merge( // ... we just serialize this index merger in our new segment // to merge the two segments. - let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment).expect( - "Creating index serializer failed", - ); + let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment) + .expect("Creating index serializer failed"); - let num_docs = merger.write(segment_serializer).expect( - "Serializing merged index failed", - ); + let num_docs = merger + .write(segment_serializer) + .expect("Serializing merged index failed"); let mut segment_meta = SegmentMeta::new(merged_segment.id()); segment_meta.set_max_doc(num_docs); @@ -266,20 +264,22 @@ impl SegmentUpdater { fn garbage_collect_files_exec(&self) { info!("Running garbage collection"); let mut index = self.0.index.clone(); - index.directory_mut().garbage_collect( - || self.0.segment_manager.list_files(), - ); + index + .directory_mut() + .garbage_collect(|| self.0.segment_manager.list_files()); } pub fn commit(&self, opstamp: u64, payload: Option) -> Result<()> { - self.run_async(move |segment_updater| if segment_updater.is_alive() { - let segment_entries = segment_updater.purge_deletes(opstamp).expect( - "Failed purge deletes", - ); - segment_updater.0.segment_manager.commit(segment_entries); - segment_updater.save_metas(opstamp, payload); - segment_updater.garbage_collect_files_exec(); - segment_updater.consider_merge_options(); + self.run_async(move |segment_updater| { + if segment_updater.is_alive() { + let segment_entries = segment_updater + .purge_deletes(opstamp) + .expect("Failed purge deletes"); + segment_updater.0.segment_manager.commit(segment_entries); + segment_updater.save_metas(opstamp, payload); + segment_updater.garbage_collect_files_exec(); + segment_updater.consider_merge_options(); + } }).wait() } @@ -343,10 +343,11 @@ impl SegmentUpdater { .remove(&merging_thread_id); Ok(()) }); - self.0.merging_threads.write().unwrap().insert( - merging_thread_id, - merging_join_handle, - ); + self.0 + .merging_threads + .write() + .unwrap() + .insert(merging_thread_id, merging_join_handle); merging_future_recv } @@ -369,10 +370,9 @@ impl SegmentUpdater { before_merge_segment_ids: &[SegmentId], after_merge_segment_entry: SegmentId, ) { - self.0.segment_manager.cancel_merge( - before_merge_segment_ids, - after_merge_segment_entry, - ); + self.0 + .segment_manager + .cancel_merge(before_merge_segment_ids, after_merge_segment_entry); } fn end_merge( @@ -385,9 +385,10 @@ impl SegmentUpdater { let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone(); let mut _file_protection_opt = None; if let Some(delete_operation) = delete_cursor.get() { - - let committed_opstamp = segment_updater.0 - .index.load_metas() + let committed_opstamp = segment_updater + .0 + .index + .load_metas() .expect("Failed to read opstamp") .opstamp; if delete_operation.opstamp < committed_opstamp { @@ -404,8 +405,7 @@ impl SegmentUpdater { Err(e) => { error!( "Merge of {:?} was cancelled (advancing deletes failed): {:?}", - before_merge_segment_ids, - e + before_merge_segment_ids, e ); // ... cancel merge if cfg!(test) { @@ -420,10 +420,10 @@ impl SegmentUpdater { } } } - segment_updater.0.segment_manager.end_merge( - &before_merge_segment_ids, - after_merge_segment_entry, - ); + segment_updater + .0 + .segment_manager + .end_merge(&before_merge_segment_ids, after_merge_segment_entry); segment_updater.consider_merge_options(); info!("save metas"); let previous_metas = segment_updater.0.index.load_metas().unwrap(); @@ -459,9 +459,10 @@ impl SegmentUpdater { } debug!("wait merging thread {}", new_merging_threads.len()); for (_, merging_thread_handle) in new_merging_threads { - merging_thread_handle.join().map(|_| ()).map_err(|_| { - ErrorKind::ErrorInThread("Merging thread failed.".into()) - })?; + merging_thread_handle + .join() + .map(|_| ()) + .map_err(|_| ErrorKind::ErrorInThread("Merging thread failed.".into()))?; } // Our merging thread may have queued their completed self.run_async(move |_| {}).wait()?; @@ -527,9 +528,9 @@ mod tests { assert_eq!(index.searcher().num_docs(), 302); { - index_writer.wait_merging_threads().expect( - "waiting for merging threads", - ); + index_writer + .wait_merging_threads() + .expect("waiting for merging threads"); } index.load_searchers().unwrap(); diff --git a/src/lib.rs b/src/lib.rs index af0c19e16..c71636e96 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -715,8 +715,7 @@ mod tests { let mut schema_builder = SchemaBuilder::default(); let text_field = schema_builder.add_text_field("text", TEXT); let other_text_field = schema_builder.add_text_field("text2", TEXT); - let document = - doc!(text_field => "tantivy", + let document = doc!(text_field => "tantivy", text_field => "some other value", other_text_field => "short"); assert_eq!(document.len(), 3); diff --git a/src/query/query_parser/query_parser.rs b/src/query/query_parser/query_parser.rs index d1804a677..57b741eb9 100644 --- a/src/query/query_parser/query_parser.rs +++ b/src/query/query_parser/query_parser.rs @@ -373,7 +373,7 @@ mod test { #[test] pub fn test_parse_nonindexed_field_yields_error() { - let query_parser = make_query_parser(); + let query_parser = make_query_parser(); let is_not_indexed_err = |query: &str| { let result: Result, QueryParserError> = query_parser.parse_query(query);