From 8ebbf6b336db41df1c81632b0988d32612fe2835 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Sat, 30 Jun 2018 13:11:41 +0900 Subject: [PATCH] Issue/325 (#330) * Introducing a SegmentMea inventory. * Depending on census=0.1 * Cargo fmt --- Cargo.toml | 1 + appveyor.yml | 2 +- src/collector/facet_collector.rs | 12 +- src/collector/top_collector.rs | 6 +- src/common/composite_file.rs | 3 +- src/compression/mod.rs | 9 +- src/compression/stream.rs | 3 +- src/core/index.rs | 10 +- src/core/pool.rs | 6 +- src/core/searcher.rs | 9 +- src/core/segment.rs | 52 +----- src/core/segment_meta.rs | 100 ++++++++--- src/core/segment_reader.rs | 15 +- src/directory/error.rs | 8 +- src/directory/managed_directory.rs | 104 +---------- src/directory/mmap_directory.rs | 6 +- src/directory/mod.rs | 2 +- src/directory/ram_directory.rs | 3 +- src/fastfield/delete.rs | 3 +- src/fastfield/facet_reader.rs | 3 +- src/indexer/delete_queue.rs | 12 +- src/indexer/index_writer.rs | 32 ++-- src/indexer/log_merge_policy.rs | 58 ++++--- src/indexer/merger.rs | 10 +- src/indexer/segment_manager.rs | 51 +++--- src/indexer/segment_register.rs | 40 ++--- src/indexer/segment_updater.rs | 212 ++++++++++++----------- src/lib.rs | 2 + src/postings/postings_writer.rs | 3 +- src/postings/recorder.rs | 3 +- src/postings/segment_postings.rs | 6 +- src/postings/serializer.rs | 12 +- src/query/boolean_query/boolean_query.rs | 3 +- src/query/query_parser/logical_ast.rs | 15 +- src/query/query_parser/query_grammar.rs | 25 ++- src/query/query_parser/query_parser.rs | 91 ++++++---- src/query/query_parser/user_input_ast.rs | 14 +- src/query/range_query.rs | 2 +- src/store/skiplist/skiplist_builder.rs | 3 +- src/termdict/termdict.rs | 3 +- 40 files changed, 440 insertions(+), 514 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 11c48c1c0..b85650c40 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,6 +45,7 @@ rust-stemmers = "0.1.0" downcast = { version="0.9" } matches = "0.1" bitpacking = "0.5" +census = "0.1" fnv = "1.0.6" [target.'cfg(windows)'.dependencies] diff --git a/appveyor.yml b/appveyor.yml index 59dc55318..caf4a4aa6 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -4,7 +4,7 @@ os: Visual Studio 2015 environment: matrix: - - channel: nightly + - channel: stable target: x86_64-pc-windows-msvc install: diff --git a/src/collector/facet_collector.rs b/src/collector/facet_collector.rs index 73401ffcf..fffcc3bfe 100644 --- a/src/collector/facet_collector.rs +++ b/src/collector/facet_collector.rs @@ -342,19 +342,16 @@ impl FacetCollector { pub fn harvest(mut self) -> FacetCounts { self.finalize_segment(); - let collapsed_facet_ords: Vec<&[u64]> = self - .segment_counters + let collapsed_facet_ords: Vec<&[u64]> = self.segment_counters .iter() .map(|segment_counter| &segment_counter.facet_ords[..]) .collect(); - let collapsed_facet_counts: Vec<&[u64]> = self - .segment_counters + let collapsed_facet_counts: Vec<&[u64]> = self.segment_counters .iter() .map(|segment_counter| &segment_counter.facet_counts[..]) .collect(); - let facet_streams = self - .segment_counters + let facet_streams = self.segment_counters .iter() .map(|seg_counts| seg_counts.facet_reader.facet_dict().range().into_stream()) .collect::>(); @@ -405,8 +402,7 @@ impl Collector for FacetCollector { fn collect(&mut self, doc: DocId, _: Score) { let facet_reader: &mut FacetReader = unsafe { - &mut *self - .ff_reader + &mut *self.ff_reader .as_ref() .expect("collect() was called before set_segment. This should never happen.") .get() diff --git a/src/collector/top_collector.rs b/src/collector/top_collector.rs index 0ba9c86f1..8d2829b73 100644 --- a/src/collector/top_collector.rs +++ b/src/collector/top_collector.rs @@ -161,13 +161,11 @@ impl Collector for TopCollector { fn collect(&mut self, doc: DocId, score: Score) { if self.at_capacity() { // It's ok to unwrap as long as a limit of 0 is forbidden. - let limit_doc: GlobalScoredDoc = *self - .heap + let limit_doc: GlobalScoredDoc = *self.heap .peek() .expect("Top collector with size 0 is forbidden"); if limit_doc.score < score { - let mut mut_head = self - .heap + let mut mut_head = self.heap .peek_mut() .expect("Top collector with size 0 is forbidden"); mut_head.score = score; diff --git a/src/common/composite_file.rs b/src/common/composite_file.rs index 6a41268fa..257e2b579 100644 --- a/src/common/composite_file.rs +++ b/src/common/composite_file.rs @@ -72,8 +72,7 @@ impl CompositeWrite { let footer_offset = self.write.written_bytes(); VInt(self.offsets.len() as u64).serialize(&mut self.write)?; - let mut offset_fields: Vec<_> = self - .offsets + let mut offset_fields: Vec<_> = self.offsets .iter() .map(|(file_addr, offset)| (*offset, *file_addr)) .collect(); diff --git a/src/compression/mod.rs b/src/compression/mod.rs index 47681e358..0e6a1899f 100644 --- a/src/compression/mod.rs +++ b/src/compression/mod.rs @@ -34,8 +34,7 @@ impl BlockEncoder { let num_bits = self.bitpacker.num_bits_sorted(offset, block); self.output[0] = num_bits; let written_size = - 1 + self - .bitpacker + 1 + self.bitpacker .compress_sorted(offset, block, &mut self.output[1..], num_bits); &self.output[..written_size] } @@ -43,8 +42,7 @@ impl BlockEncoder { pub fn compress_block_unsorted(&mut self, block: &[u32]) -> &[u8] { let num_bits = self.bitpacker.num_bits(block); self.output[0] = num_bits; - let written_size = 1 + self - .bitpacker + let written_size = 1 + self.bitpacker .compress(block, &mut self.output[1..], num_bits); &self.output[..written_size] } @@ -85,8 +83,7 @@ impl BlockDecoder { pub fn uncompress_block_unsorted<'a>(&mut self, compressed_data: &'a [u8]) -> usize { let num_bits = compressed_data[0]; self.output_len = COMPRESSION_BLOCK_SIZE; - 1 + self - .bitpacker + 1 + self.bitpacker .decompress(&compressed_data[1..], &mut self.output, num_bits) } diff --git a/src/compression/stream.rs b/src/compression/stream.rs index 7cf99baef..762792a9b 100644 --- a/src/compression/stream.rs +++ b/src/compression/stream.rs @@ -42,8 +42,7 @@ impl CompressedIntStream { // no need to read. self.cached_next_addr } else { - let next_addr = addr + self - .block_decoder + let next_addr = addr + self.block_decoder .uncompress_block_unsorted(self.buffer.slice_from(addr)); self.cached_addr = addr; self.cached_next_addr = next_addr; diff --git a/src/core/index.rs b/src/core/index.rs index cb7c4ede7..86fa97d82 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -191,8 +191,7 @@ impl Index { /// Returns the list of segments that are searchable pub fn searchable_segments(&self) -> Result> { - Ok(self - .searchable_segment_metas()? + Ok(self.searchable_segment_metas()? .into_iter() .map(|segment_meta| self.segment(segment_meta)) .collect()) @@ -205,8 +204,8 @@ impl Index { /// Creates a new segment. pub fn new_segment(&self) -> Segment { - let segment_meta = SegmentMeta::new(SegmentId::generate_random()); - create_segment(self.clone(), segment_meta) + let segment_meta = SegmentMeta::new(SegmentId::generate_random(), 0); + self.segment(segment_meta) } /// Return a reference to the index directory. @@ -227,8 +226,7 @@ impl Index { /// Returns the list of segment ids that are searchable. pub fn searchable_segment_ids(&self) -> Result> { - Ok(self - .searchable_segment_metas()? + Ok(self.searchable_segment_metas()? .iter() .map(|segment_meta| segment_meta.id()) .collect()) diff --git a/src/core/pool.rs b/src/core/pool.rs index bc36c21f0..64a894d4c 100644 --- a/src/core/pool.rs +++ b/src/core/pool.rs @@ -87,8 +87,7 @@ impl Deref for LeasedItem { type Target = T; fn deref(&self) -> &T { - &self - .gen_item + &self.gen_item .as_ref() .expect("Unwrapping a leased item should never fail") .item // unwrap is safe here @@ -97,8 +96,7 @@ impl Deref for LeasedItem { impl DerefMut for LeasedItem { fn deref_mut(&mut self) -> &mut T { - &mut self - .gen_item + &mut self.gen_item .as_mut() .expect("Unwrapping a mut leased item should never fail") .item // unwrap is safe here diff --git a/src/core/searcher.rs b/src/core/searcher.rs index fe1a13796..8f36b58ea 100644 --- a/src/core/searcher.rs +++ b/src/core/searcher.rs @@ -78,8 +78,7 @@ impl Searcher { /// Return the field searcher associated to a `Field`. pub fn field(&self, field: Field) -> FieldSearcher { - let inv_index_readers = self - .segment_readers + let inv_index_readers = self.segment_readers .iter() .map(|segment_reader| segment_reader.inverted_index(field)) .collect::>(); @@ -99,8 +98,7 @@ impl FieldSearcher { /// Returns a Stream over all of the sorted unique terms of /// for the given field. pub fn terms(&self) -> TermMerger { - let term_streamers: Vec<_> = self - .inv_index_readers + let term_streamers: Vec<_> = self.inv_index_readers .iter() .map(|inverted_index| inverted_index.terms().stream()) .collect(); @@ -110,8 +108,7 @@ impl FieldSearcher { impl fmt::Debug for Searcher { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let segment_ids = self - .segment_readers + let segment_ids = self.segment_readers .iter() .map(|segment_reader| segment_reader.segment_id()) .collect::>(); diff --git a/src/core/segment.rs b/src/core/segment.rs index b0a7e48c7..a747cfaec 100644 --- a/src/core/segment.rs +++ b/src/core/segment.rs @@ -4,7 +4,7 @@ use core::SegmentId; use core::SegmentMeta; use directory::error::{OpenReadError, OpenWriteError}; use directory::Directory; -use directory::{FileProtection, ReadOnlySource, WritePtr}; +use directory::{ReadOnlySource, WritePtr}; use indexer::segment_serializer::SegmentSerializer; use schema::Schema; use std::fmt; @@ -28,6 +28,7 @@ impl fmt::Debug for Segment { /// Creates a new segment given an `Index` and a `SegmentId` /// /// The function is here to make it private outside `tantivy`. +/// #[doc(hidden)] pub fn create_segment(index: Index, meta: SegmentMeta) -> Segment { Segment { index, meta } } @@ -49,8 +50,11 @@ impl Segment { } #[doc(hidden)] - pub fn set_delete_meta(&mut self, num_deleted_docs: u32, opstamp: u64) { - self.meta.set_delete_meta(num_deleted_docs, opstamp); + pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: u64) -> Segment { + Segment { + index: self.index, + meta: self.meta.with_delete_meta(num_deleted_docs, opstamp), + } } /// Returns the segment's id. @@ -66,16 +70,6 @@ impl Segment { self.meta.relative_path(component) } - /// Protects a specific component file from being deleted. - /// - /// Returns a FileProtection object. The file is guaranteed - /// to not be garbage collected as long as this `FileProtection` object - /// lives. - pub fn protect_from_delete(&self, component: SegmentComponent) -> FileProtection { - let path = self.relative_path(component); - self.index.directory().protect_file_from_delete(&path) - } - /// Open one of the component file for a *regular* read. pub fn open_read( &self, @@ -105,35 +99,3 @@ pub trait SerializableSegment { /// The number of documents in the segment. fn write(&self, serializer: SegmentSerializer) -> Result; } - -#[cfg(test)] -mod tests { - - use core::SegmentComponent; - use directory::Directory; - use schema::SchemaBuilder; - use std::collections::HashSet; - use Index; - - #[test] - fn test_segment_protect_component() { - let mut index = Index::create_in_ram(SchemaBuilder::new().build()); - let segment = index.new_segment(); - let path = segment.relative_path(SegmentComponent::POSTINGS); - - let directory = index.directory_mut(); - directory.atomic_write(&*path, &vec![0u8]).unwrap(); - - let living_files = HashSet::new(); - { - let _file_protection = segment.protect_from_delete(SegmentComponent::POSTINGS); - assert!(directory.exists(&*path)); - directory.garbage_collect(|| living_files.clone()); - assert!(directory.exists(&*path)); - } - - directory.garbage_collect(|| living_files); - assert!(!directory.exists(&*path)); - } - -} diff --git a/src/core/segment_meta.rs b/src/core/segment_meta.rs index c8d50046a..5a8f429f0 100644 --- a/src/core/segment_meta.rs +++ b/src/core/segment_meta.rs @@ -1,8 +1,15 @@ use super::SegmentComponent; +use census::{Inventory, TrackedObject}; use core::SegmentId; +use serde; use std::collections::HashSet; +use std::fmt; use std::path::PathBuf; +lazy_static! { + static ref INVENTORY: Inventory = { Inventory::new() }; +} + #[derive(Clone, Debug, Serialize, Deserialize)] struct DeleteMeta { num_deleted_docs: u32, @@ -13,32 +20,72 @@ struct DeleteMeta { /// /// For instance the number of docs it contains, /// how many are deleted, etc. -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone)] pub struct SegmentMeta { - segment_id: SegmentId, - max_doc: u32, - deletes: Option, + tracked: TrackedObject, +} + +impl fmt::Debug for SegmentMeta { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + self.tracked.fmt(f) + } +} + +impl serde::Serialize for SegmentMeta { + fn serialize( + &self, + serializer: S, + ) -> Result<::Ok, ::Error> + where + S: serde::Serializer, + { + self.tracked.serialize(serializer) + } +} + +impl<'a> serde::Deserialize<'a> for SegmentMeta { + fn deserialize(deserializer: D) -> Result>::Error> + where + D: serde::Deserializer<'a>, + { + let inner = InnerSegmentMeta::deserialize(deserializer)?; + let tracked = INVENTORY.track(inner); + Ok(SegmentMeta { tracked: tracked }) + } } impl SegmentMeta { - /// Creates a new segment meta for - /// a segment with no deletes and no documents. - pub fn new(segment_id: SegmentId) -> SegmentMeta { - SegmentMeta { + /// Lists all living `SegmentMeta` object at the time of the call. + pub fn all() -> Vec { + INVENTORY + .list() + .into_iter() + .map(|inner| SegmentMeta { tracked: inner }) + .collect::>() + } + + /// Creates a new `SegmentMeta` object. + #[doc(hidden)] + pub fn new(segment_id: SegmentId, max_doc: u32) -> SegmentMeta { + let inner = InnerSegmentMeta { segment_id, - max_doc: 0, + max_doc, deletes: None, + }; + SegmentMeta { + tracked: INVENTORY.track(inner), } } /// Returns the segment id. pub fn id(&self) -> SegmentId { - self.segment_id + self.tracked.segment_id } /// Returns the number of deleted documents. pub fn num_deleted_docs(&self) -> u32 { - self.deletes + self.tracked + .deletes .as_ref() .map(|delete_meta| delete_meta.num_deleted_docs) .unwrap_or(0u32) @@ -80,7 +127,7 @@ impl SegmentMeta { /// and all the doc ids contains in this segment /// are exactly (0..max_doc). pub fn max_doc(&self) -> u32 { - self.max_doc + self.tracked.max_doc } /// Return the number of documents in the segment. @@ -91,25 +138,36 @@ impl SegmentMeta { /// Returns the opstamp of the last delete operation /// taken in account in this segment. pub fn delete_opstamp(&self) -> Option { - self.deletes.as_ref().map(|delete_meta| delete_meta.opstamp) + self.tracked + .deletes + .as_ref() + .map(|delete_meta| delete_meta.opstamp) } /// Returns true iff the segment meta contains /// delete information. pub fn has_deletes(&self) -> bool { - self.deletes.is_some() + self.num_deleted_docs() > 0 } #[doc(hidden)] - pub fn set_max_doc(&mut self, max_doc: u32) { - self.max_doc = max_doc; - } - - #[doc(hidden)] - pub fn set_delete_meta(&mut self, num_deleted_docs: u32, opstamp: u64) { - self.deletes = Some(DeleteMeta { + pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: u64) -> SegmentMeta { + let delete_meta = DeleteMeta { num_deleted_docs, opstamp, + }; + let tracked = self.tracked.map(move |inner_meta| InnerSegmentMeta { + segment_id: inner_meta.segment_id, + max_doc: inner_meta.max_doc, + deletes: Some(delete_meta), }); + SegmentMeta { tracked } } } + +#[derive(Clone, Debug, Serialize, Deserialize)] +struct InnerSegmentMeta { + segment_id: SegmentId, + max_doc: u32, + deletes: Option, +} diff --git a/src/core/segment_reader.rs b/src/core/segment_reader.rs index 2ba5610f4..ef8fd38aa 100644 --- a/src/core/segment_reader.rs +++ b/src/core/segment_reader.rs @@ -156,13 +156,11 @@ impl SegmentReader { &FieldType::Bytes => {} _ => return Err(FastFieldNotAvailableError::new(field_entry)), } - let idx_reader = self - .fast_fields_composite + let idx_reader = self.fast_fields_composite .open_read_with_idx(field, 0) .ok_or_else(|| FastFieldNotAvailableError::new(field_entry)) .map(FastFieldReader::open)?; - let values = self - .fast_fields_composite + let values = self.fast_fields_composite .open_read_with_idx(field, 1) .ok_or_else(|| FastFieldNotAvailableError::new(field_entry))?; Ok(BytesFastFieldReader::open(idx_reader, values)) @@ -274,8 +272,7 @@ impl SegmentReader { /// term dictionary associated to a specific field, /// and opening the posting list associated to any term. pub fn inverted_index(&self, field: Field) -> Arc { - if let Some(inv_idx_reader) = self - .inv_idx_reader_cache + if let Some(inv_idx_reader) = self.inv_idx_reader_cache .read() .expect("Lock poisoned. This should never happen") .get(&field) @@ -304,13 +301,11 @@ impl SegmentReader { let postings_source = postings_source_opt.unwrap(); - let termdict_source = self - .termdict_composite + let termdict_source = self.termdict_composite .open_read(field) .expect("Failed to open field term dictionary in composite file. Is the field indexed"); - let positions_source = self - .positions_composite + let positions_source = self.positions_composite .open_read(field) .expect("Index corrupted. Failed to open field positions in composite file."); diff --git a/src/directory/error.rs b/src/directory/error.rs index 12145e8a4..8d4b5e572 100644 --- a/src/directory/error.rs +++ b/src/directory/error.rs @@ -173,9 +173,6 @@ pub enum DeleteError { /// Any kind of IO error that happens when /// interacting with the underlying IO device. IOError(IOError), - /// The file may not be deleted because it is - /// protected. - FileProtected(PathBuf), } impl From for DeleteError { @@ -190,9 +187,6 @@ impl fmt::Display for DeleteError { DeleteError::FileDoesNotExist(ref path) => { write!(f, "the file '{:?}' does not exist", path) } - DeleteError::FileProtected(ref path) => { - write!(f, "the file '{:?}' is protected and can't be deleted", path) - } DeleteError::IOError(ref err) => { write!(f, "an io error occurred while deleting a file: '{}'", err) } @@ -207,7 +201,7 @@ impl StdError for DeleteError { fn cause(&self) -> Option<&StdError> { match *self { - DeleteError::FileDoesNotExist(_) | DeleteError::FileProtected(_) => None, + DeleteError::FileDoesNotExist(_) => None, DeleteError::IOError(ref err) => Some(err), } } diff --git a/src/directory/managed_directory.rs b/src/directory/managed_directory.rs index 5ff1047df..bbcb49674 100644 --- a/src/directory/managed_directory.rs +++ b/src/directory/managed_directory.rs @@ -3,9 +3,7 @@ use directory::error::{DeleteError, IOError, OpenReadError, OpenWriteError}; use directory::{ReadOnlySource, WritePtr}; use error::{ErrorKind, Result, ResultExt}; use serde_json; -use std::collections::HashMap; use std::collections::HashSet; -use std::fmt; use std::io; use std::io::Write; use std::path::{Path, PathBuf}; @@ -32,37 +30,6 @@ pub struct ManagedDirectory { #[derive(Debug, Default)] struct MetaInformation { managed_paths: HashSet, - protected_files: HashMap, -} - -/// A `FileProtection` prevents the garbage collection of a file. -/// -/// See `ManagedDirectory.protect_file_from_delete`. -pub struct FileProtection { - directory: ManagedDirectory, - path: PathBuf, -} - -fn unprotect_file_from_delete(directory: &ManagedDirectory, path: &Path) { - let mut meta_informations_wlock = directory - .meta_informations - .write() - .expect("Managed file lock poisoned"); - if let Some(counter_ref_mut) = meta_informations_wlock.protected_files.get_mut(path) { - (*counter_ref_mut) -= 1; - } -} - -impl fmt::Debug for FileProtection { - fn fmt(&self, formatter: &mut fmt::Formatter) -> result::Result<(), fmt::Error> { - write!(formatter, "FileProtectionFor({:?})", self.path) - } -} - -impl Drop for FileProtection { - fn drop(&mut self) { - unprotect_file_from_delete(&self.directory, &*self.path); - } } /// Saves the file containing the list of existing files @@ -89,7 +56,6 @@ impl ManagedDirectory { directory: Box::new(directory), meta_informations: Arc::new(RwLock::new(MetaInformation { managed_paths: managed_files, - protected_files: HashMap::default(), })), }) } @@ -117,8 +83,7 @@ impl ManagedDirectory { let mut files_to_delete = vec![]; { // releasing the lock as .delete() will use it too. - let meta_informations_rlock = self - .meta_informations + let meta_informations_rlock = self.meta_informations .read() .expect("Managed directory rlock poisoned in garbage collect."); @@ -159,9 +124,6 @@ impl ManagedDirectory { error!("Failed to delete {:?}", file_to_delete); } } - DeleteError::FileProtected(_) => { - // this is expected. - } } } } @@ -171,8 +133,7 @@ impl ManagedDirectory { if !deleted_files.is_empty() { // update the list of managed files by removing // the file that were removed. - let mut meta_informations_wlock = self - .meta_informations + let mut meta_informations_wlock = self.meta_informations .write() .expect("Managed directory wlock poisoned (2)."); { @@ -187,29 +148,6 @@ impl ManagedDirectory { } } - /// Protects a file from being garbage collected. - /// - /// The method returns a `FileProtection` object. - /// The file will not be garbage collected as long as the - /// `FileProtection` object is kept alive. - pub fn protect_file_from_delete(&self, path: &Path) -> FileProtection { - let pathbuf = path.to_owned(); - { - let mut meta_informations_wlock = self - .meta_informations - .write() - .expect("Managed file lock poisoned on protect"); - *meta_informations_wlock - .protected_files - .entry(pathbuf.clone()) - .or_insert(0) += 1; - } - FileProtection { - directory: self.clone(), - path: pathbuf.clone(), - } - } - /// Registers a file as managed /// /// This method must be called before the file is @@ -218,8 +156,7 @@ impl ManagedDirectory { /// will not lead to garbage files that will /// never get removed. fn register_file_as_managed(&mut self, filepath: &Path) -> io::Result<()> { - let mut meta_wlock = self - .meta_informations + let mut meta_wlock = self.meta_informations .write() .expect("Managed file lock poisoned"); let has_changed = meta_wlock.managed_paths.insert(filepath.to_owned()); @@ -251,17 +188,6 @@ impl Directory for ManagedDirectory { } fn delete(&self, path: &Path) -> result::Result<(), DeleteError> { - { - let metas_rlock = self - .meta_informations - .read() - .expect("poisoned lock in managed directory meta"); - if let Some(counter) = metas_rlock.protected_files.get(path) { - if *counter > 0 { - return Err(DeleteError::FileProtected(path.to_owned())); - } - } - } self.directory.delete(path) } @@ -377,28 +303,4 @@ mod tests { } } - #[test] - #[cfg(feature = "mmap")] - fn test_managed_directory_protect() { - let tempdir = TempDir::new("index").unwrap(); - let tempdir_path = PathBuf::from(tempdir.path()); - let living_files = HashSet::new(); - - let mmap_directory = MmapDirectory::open(&tempdir_path).unwrap(); - let mut managed_directory = ManagedDirectory::new(mmap_directory).unwrap(); - managed_directory - .atomic_write(*TEST_PATH1, &vec![0u8, 1u8]) - .unwrap(); - assert!(managed_directory.exists(*TEST_PATH1)); - - { - let _file_protection = managed_directory.protect_file_from_delete(*TEST_PATH1); - managed_directory.garbage_collect(|| living_files.clone()); - assert!(managed_directory.exists(*TEST_PATH1)); - } - - managed_directory.garbage_collect(|| living_files.clone()); - assert!(!managed_directory.exists(*TEST_PATH1)); - } - } diff --git a/src/directory/mmap_directory.rs b/src/directory/mmap_directory.rs index 920c9d817..5ffdc35d4 100644 --- a/src/directory/mmap_directory.rs +++ b/src/directory/mmap_directory.rs @@ -32,8 +32,7 @@ fn open_mmap(full_path: &Path) -> result::Result, OpenReadE } })?; - let meta_data = file - .metadata() + let meta_data = file.metadata() .map_err(|e| IOError::with_path(full_path.to_owned(), e))?; if meta_data.len() == 0 { // if the file size is 0, it will not be possible @@ -310,8 +309,7 @@ impl Directory for MmapDirectory { // when the last reference is gone. mmap_cache.cache.remove(&full_path); match fs::remove_file(&full_path) { - Ok(_) => self - .sync_directory() + Ok(_) => self.sync_directory() .map_err(|e| IOError::with_path(path.to_owned(), e).into()), Err(e) => { if e.kind() == io::ErrorKind::NotFound { diff --git a/src/directory/mod.rs b/src/directory/mod.rs index 736c31e73..72301430b 100644 --- a/src/directory/mod.rs +++ b/src/directory/mod.rs @@ -25,7 +25,7 @@ pub use self::read_only_source::ReadOnlySource; #[cfg(feature = "mmap")] pub use self::mmap_directory::MmapDirectory; -pub(crate) use self::managed_directory::{FileProtection, ManagedDirectory}; +pub(crate) use self::managed_directory::ManagedDirectory; pub(crate) use self::read_only_source::SourceRead; /// Synonym of Seek + Write diff --git a/src/directory/ram_directory.rs b/src/directory/ram_directory.rs index dcc215f9b..383643836 100644 --- a/src/directory/ram_directory.rs +++ b/src/directory/ram_directory.rs @@ -170,8 +170,7 @@ impl Directory for RAMDirectory { let path_buf = PathBuf::from(path); let vec_writer = VecWriter::new(path_buf.clone(), self.fs.clone()); - let exists = self - .fs + let exists = self.fs .write(path_buf.clone(), &Vec::new()) .map_err(|err| IOError::with_path(path.to_owned(), err))?; diff --git a/src/fastfield/delete.rs b/src/fastfield/delete.rs index 15ed658ce..3f8a0eb5b 100644 --- a/src/fastfield/delete.rs +++ b/src/fastfield/delete.rs @@ -41,8 +41,7 @@ pub struct DeleteBitSet { impl DeleteBitSet { /// Opens a delete bitset given its data source. pub fn open(data: ReadOnlySource) -> DeleteBitSet { - let num_deleted: usize = data - .as_slice() + let num_deleted: usize = data.as_slice() .iter() .map(|b| b.count_ones() as usize) .sum(); diff --git a/src/fastfield/facet_reader.rs b/src/fastfield/facet_reader.rs index 92a917089..182b17989 100644 --- a/src/fastfield/facet_reader.rs +++ b/src/fastfield/facet_reader.rs @@ -56,8 +56,7 @@ impl FacetReader { /// Given a term ordinal returns the term associated to it. pub fn facet_from_ord(&self, facet_ord: TermOrdinal, output: &mut Facet) { - let found_term = self - .term_dict + let found_term = self.term_dict .ord_to_term(facet_ord as u64, output.inner_buffer_mut()); assert!(found_term, "Term ordinal {} no found.", facet_ord); } diff --git a/src/indexer/delete_queue.rs b/src/indexer/delete_queue.rs index f921b7523..4c2597fbb 100644 --- a/src/indexer/delete_queue.rs +++ b/src/indexer/delete_queue.rs @@ -52,8 +52,7 @@ impl DeleteQueue { // // Past delete operations are not accessible. pub fn cursor(&self) -> DeleteCursor { - let last_block = self - .inner + let last_block = self.inner .read() .expect("Read lock poisoned when opening delete queue cursor") .last_block @@ -93,8 +92,7 @@ impl DeleteQueue { // be some unflushed operations. // fn flush(&self) -> Option> { - let mut self_wlock = self - .inner + let mut self_wlock = self.inner .write() .expect("Failed to acquire write lock on delete queue writer"); @@ -134,8 +132,7 @@ impl From for NextBlock { impl NextBlock { fn next_block(&self) -> Option> { { - let next_read_lock = self - .0 + let next_read_lock = self.0 .read() .expect("Failed to acquire write lock in delete queue"); if let InnerNextBlock::Closed(ref block) = *next_read_lock { @@ -144,8 +141,7 @@ impl NextBlock { } let next_block; { - let mut next_write_lock = self - .0 + let mut next_write_lock = self.0 .write() .expect("Failed to acquire write lock in delete queue"); match *next_write_lock { diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 28e566e8c..1f372a535 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -9,7 +9,6 @@ use core::SegmentComponent; use core::SegmentId; use core::SegmentMeta; use core::SegmentReader; -use directory::FileProtection; use docset::DocSet; use error::{Error, ErrorKind, Result, ResultExt}; use fastfield::write_delete_bitset; @@ -216,15 +215,13 @@ pub fn advance_deletes( mut segment: Segment, segment_entry: &mut SegmentEntry, target_opstamp: u64, -) -> Result> { - let mut file_protect: Option = None; +) -> Result<()> { { - if let Some(previous_opstamp) = segment_entry.meta().delete_opstamp() { + if segment_entry.meta().delete_opstamp() == Some(target_opstamp) { // We are already up-to-date here. - if target_opstamp == previous_opstamp { - return Ok(file_protect); - } + return Ok(()); } + let segment_reader = SegmentReader::open(&segment)?; let max_doc = segment_reader.max_doc(); @@ -243,6 +240,7 @@ pub fn advance_deletes( target_opstamp, )?; + // TODO optimize for doc in 0u32..max_doc { if segment_reader.is_deleted(doc) { delete_bitset.insert(doc as usize); @@ -251,14 +249,13 @@ pub fn advance_deletes( let num_deleted_docs = delete_bitset.len(); if num_deleted_docs > 0 { - segment.set_delete_meta(num_deleted_docs as u32, target_opstamp); - file_protect = Some(segment.protect_from_delete(SegmentComponent::DELETE)); + segment = segment.with_delete_meta(num_deleted_docs as u32, target_opstamp); let mut delete_file = segment.open_write(SegmentComponent::DELETE)?; write_delete_bitset(&delete_bitset, &mut delete_file)?; } } - segment_entry.set_meta(segment.meta().clone()); - Ok(file_protect) + segment_entry.set_meta((*segment.meta()).clone()); + Ok(()) } fn index_documents( @@ -299,8 +296,7 @@ fn index_documents( let doc_opstamps: Vec = segment_writer.finalize()?; - let mut segment_meta = SegmentMeta::new(segment_id); - segment_meta.set_max_doc(num_docs); + let segment_meta = SegmentMeta::new(segment_id, num_docs); let last_docstamp: u64 = *(doc_opstamps.last().unwrap()); @@ -342,8 +338,7 @@ impl IndexWriter { } drop(self.workers_join_handle); - let result = self - .segment_updater + let result = self.segment_updater .wait_merging_thread() .chain_err(|| ErrorKind::ErrorInThread("Failed to join merging thread.".into())); @@ -448,7 +443,9 @@ impl IndexWriter { } /// Merges a given list of segments - pub fn merge(&mut self, segment_ids: &[SegmentId]) -> Receiver { + /// + /// `segment_ids` is required to be non-empty. + pub fn merge(&mut self, segment_ids: &[SegmentId]) -> Result> { self.segment_updater.start_merge(segment_ids) } @@ -488,8 +485,7 @@ impl IndexWriter { let document_receiver = self.document_receiver.clone(); // take the directory lock to create a new index_writer. - let directory_lock = self - ._directory_lock + let directory_lock = self._directory_lock .take() .expect("The IndexWriter does not have any lock. This is a bug, please report."); diff --git a/src/indexer/log_merge_policy.rs b/src/indexer/log_merge_policy.rs index 9e5edf5e8..e653b641f 100644 --- a/src/indexer/log_merge_policy.rs +++ b/src/indexer/log_merge_policy.rs @@ -116,15 +116,17 @@ mod tests { assert!(result_list.is_empty()); } - fn seg_meta(num_docs: u32) -> SegmentMeta { - let mut segment_metas = SegmentMeta::new(SegmentId::generate_random()); - segment_metas.set_max_doc(num_docs); - segment_metas + fn create_random_segment_meta(num_docs: u32) -> SegmentMeta { + SegmentMeta::new(SegmentId::generate_random(), num_docs) } #[test] fn test_log_merge_policy_pair() { - let test_input = vec![seg_meta(10), seg_meta(10), seg_meta(10)]; + let test_input = vec![ + create_random_segment_meta(10), + create_random_segment_meta(10), + create_random_segment_meta(10), + ]; let result_list = test_merge_policy().compute_merge_candidates(&test_input); assert_eq!(result_list.len(), 1); } @@ -137,17 +139,17 @@ mod tests { // * one with the 3 * 1000-docs segments // no MergeCandidate expected for the 2 * 10_000-docs segments as min_merge_size=3 let test_input = vec![ - seg_meta(10), - seg_meta(10), - seg_meta(10), - seg_meta(1000), - seg_meta(1000), - seg_meta(1000), - seg_meta(10000), - seg_meta(10000), - seg_meta(10), - seg_meta(10), - seg_meta(10), + create_random_segment_meta(10), + create_random_segment_meta(10), + create_random_segment_meta(10), + create_random_segment_meta(1000), + create_random_segment_meta(1000), + create_random_segment_meta(1000), + create_random_segment_meta(10000), + create_random_segment_meta(10000), + create_random_segment_meta(10), + create_random_segment_meta(10), + create_random_segment_meta(10), ]; let result_list = test_merge_policy().compute_merge_candidates(&test_input); assert_eq!(result_list.len(), 2); @@ -157,12 +159,12 @@ mod tests { fn test_log_merge_policy_within_levels() { // multiple levels all get merged correctly let test_input = vec![ - seg_meta(10), // log2(10) = ~3.32 (> 3.58 - 0.75) - seg_meta(11), // log2(11) = ~3.46 - seg_meta(12), // log2(12) = ~3.58 - seg_meta(800), // log2(800) = ~9.64 (> 9.97 - 0.75) - seg_meta(1000), // log2(1000) = ~9.97 - seg_meta(1000), + create_random_segment_meta(10), // log2(10) = ~3.32 (> 3.58 - 0.75) + create_random_segment_meta(11), // log2(11) = ~3.46 + create_random_segment_meta(12), // log2(12) = ~3.58 + create_random_segment_meta(800), // log2(800) = ~9.64 (> 9.97 - 0.75) + create_random_segment_meta(1000), // log2(1000) = ~9.97 + create_random_segment_meta(1000), ]; // log2(1000) = ~9.97 let result_list = test_merge_policy().compute_merge_candidates(&test_input); assert_eq!(result_list.len(), 2); @@ -171,12 +173,12 @@ mod tests { fn test_log_merge_policy_small_segments() { // segments under min_layer_size are merged together let test_input = vec![ - seg_meta(1), - seg_meta(1), - seg_meta(1), - seg_meta(2), - seg_meta(2), - seg_meta(2), + create_random_segment_meta(1), + create_random_segment_meta(1), + create_random_segment_meta(1), + create_random_segment_meta(2), + create_random_segment_meta(2), + create_random_segment_meta(2), ]; let result_list = test_merge_policy().compute_merge_candidates(&test_input); assert_eq!(result_list.len(), 1); diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index b04c7c9d4..1a5d4c026 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -440,8 +440,7 @@ impl IndexMerger { ) -> Result> { let mut positions_buffer: Vec = Vec::with_capacity(1_000); let mut delta_computer = DeltaComputer::new(); - let field_readers = self - .readers + let field_readers = self.readers .iter() .map(|reader| reader.inverted_index(indexed_field)) .collect::>(); @@ -737,6 +736,7 @@ mod tests { let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap(); index_writer .merge(&segment_ids) + .expect("Failed to initiate merge") .wait() .expect("Merging failed"); index_writer.wait_merging_threads().unwrap(); @@ -980,6 +980,7 @@ mod tests { .expect("Searchable segments failed."); index_writer .merge(&segment_ids) + .expect("Failed to initiate merge") .wait() .expect("Merging failed"); index.load_searchers().unwrap(); @@ -1076,6 +1077,7 @@ mod tests { .expect("Searchable segments failed."); index_writer .merge(&segment_ids) + .expect("Failed to initiate merge") .wait() .expect("Merging failed"); index.load_searchers().unwrap(); @@ -1129,6 +1131,7 @@ mod tests { .expect("Searchable segments failed."); index_writer .merge(&segment_ids) + .expect("Failed to initiate merge") .wait() .expect("Merging failed"); index.load_searchers().unwrap(); @@ -1219,6 +1222,7 @@ mod tests { let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap(); index_writer .merge(&segment_ids) + .expect("Failed to initiate merge") .wait() .expect("Merging failed"); index_writer.wait_merging_threads().unwrap(); @@ -1290,6 +1294,7 @@ mod tests { let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap(); index_writer .merge(&segment_ids) + .expect("Failed to initiate merge") .wait() .expect("Merging failed"); index_writer.wait_merging_threads().unwrap(); @@ -1392,6 +1397,7 @@ mod tests { let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap(); index_writer .merge(&segment_ids) + .expect("Failed to initiate merge") .wait() .expect("Merging failed"); index_writer.wait_merging_threads().unwrap(); diff --git a/src/indexer/segment_manager.rs b/src/indexer/segment_manager.rs index 7e53a4d00..b82af0823 100644 --- a/src/indexer/segment_manager.rs +++ b/src/indexer/segment_manager.rs @@ -2,6 +2,8 @@ use super::segment_register::SegmentRegister; use core::SegmentId; use core::SegmentMeta; use core::{LOCKFILE_FILEPATH, META_FILEPATH}; +use error::ErrorKind; +use error::Result as TantivyResult; use indexer::delete_queue::DeleteCursor; use indexer::SegmentEntry; use std::collections::hash_set::HashSet; @@ -64,8 +66,9 @@ impl SegmentManager { /// Returns all of the segment entries (committed or uncommitted) pub fn segment_entries(&self) -> Vec { - let mut segment_entries = self.read().uncommitted.segment_entries(); - segment_entries.extend(self.read().committed.segment_entries()); + let registers_lock = self.read(); + let mut segment_entries = registers_lock.uncommitted.segment_entries(); + segment_entries.extend(registers_lock.committed.segment_entries()); segment_entries } @@ -76,32 +79,15 @@ impl SegmentManager { } pub fn list_files(&self) -> HashSet { - let registers_lock = self.read(); let mut files = HashSet::new(); files.insert(META_FILEPATH.clone()); files.insert(LOCKFILE_FILEPATH.clone()); - - let segment_metas: Vec = registers_lock - .committed - .get_all_segments() - .into_iter() - .chain(registers_lock.uncommitted.get_all_segments().into_iter()) - .chain(registers_lock.writing.iter().cloned().map(SegmentMeta::new)) - .collect(); - for segment_meta in segment_metas { + for segment_meta in SegmentMeta::all() { files.extend(segment_meta.list_files()); } files } - pub fn segment_entry(&self, segment_id: &SegmentId) -> Option { - let registers = self.read(); - registers - .committed - .segment_entry(segment_id) - .or_else(|| registers.uncommitted.segment_entry(segment_id)) - } - // Lock poisoning should never happen : // The lock is acquired and released within this class, // and the operations cannot panic. @@ -126,19 +112,38 @@ impl SegmentManager { } } - pub fn start_merge(&self, segment_ids: &[SegmentId]) { + /// Marks a list of segments as in merge. + /// + /// Returns an error if some segments are missing, or if + /// the `segment_ids` are not either all committed or all + /// uncommitted. + pub fn start_merge(&self, segment_ids: &[SegmentId]) -> TantivyResult> { let mut registers_lock = self.write(); + let mut segment_entries = vec![]; if registers_lock.uncommitted.contains_all(segment_ids) { for segment_id in segment_ids { - registers_lock.uncommitted.start_merge(segment_id); + let segment_entry = registers_lock.uncommitted + .start_merge(segment_id) + .expect("Segment id not found {}. Should never happen because of the contains all if-block."); + segment_entries.push(segment_entry); } } else if registers_lock.committed.contains_all(segment_ids) { + for segment_id in segment_ids { + let segment_entry = registers_lock.committed + .start_merge(segment_id) + .expect("Segment id not found {}. Should never happen because of the contains all if-block."); + segment_entries.push(segment_entry); + } for segment_id in segment_ids { registers_lock.committed.start_merge(segment_id); } } else { - error!("Merge operation sent for segments that are not all uncommited or commited."); + let error_msg = "Merge operation sent for segments that are not \ + all uncommited or commited." + .to_string(); + bail!(ErrorKind::InvalidArgument(error_msg)) } + Ok(segment_entries) } pub fn cancel_merge( diff --git a/src/indexer/segment_register.rs b/src/indexer/segment_register.rs index 0c2d7cc43..c455d3091 100644 --- a/src/indexer/segment_register.rs +++ b/src/indexer/segment_register.rs @@ -3,8 +3,7 @@ use core::SegmentMeta; use indexer::delete_queue::DeleteCursor; use indexer::segment_entry::SegmentEntry; use std::collections::HashMap; -use std::fmt; -use std::fmt::{Debug, Formatter}; +use std::fmt::{self, Debug, Formatter}; /// The segment register keeps track /// of the list of segment, their size as well @@ -39,13 +38,6 @@ impl SegmentRegister { self.segment_states.len() } - pub fn get_all_segments(&self) -> Vec { - self.segment_states - .values() - .map(|segment_entry| segment_entry.meta().clone()) - .collect() - } - pub fn get_mergeable_segments(&self) -> Vec { self.segment_states .values() @@ -59,8 +51,7 @@ impl SegmentRegister { } pub fn segment_metas(&self) -> Vec { - let mut segment_ids: Vec = self - .segment_states + let mut segment_ids: Vec = self.segment_states .values() .map(|segment_entry| segment_entry.meta().clone()) .collect(); @@ -68,10 +59,6 @@ impl SegmentRegister { segment_ids } - pub fn segment_entry(&self, segment_id: &SegmentId) -> Option { - self.segment_states.get(segment_id).cloned() - } - pub fn contains_all(&mut self, segment_ids: &[SegmentId]) -> bool { segment_ids .iter() @@ -94,11 +81,13 @@ impl SegmentRegister { .cancel_merge(); } - pub fn start_merge(&mut self, segment_id: &SegmentId) { - self.segment_states - .get_mut(segment_id) - .expect("Received a merge notification for a segment that is not registered") - .start_merge(); + pub fn start_merge(&mut self, segment_id: &SegmentId) -> Option { + if let Some(segment_entry) = self.segment_states.get_mut(segment_id) { + segment_entry.start_merge(); + Some(segment_entry.clone()) + } else { + None + } } pub fn new(segment_metas: Vec, delete_cursor: &DeleteCursor) -> SegmentRegister { @@ -110,6 +99,11 @@ impl SegmentRegister { } SegmentRegister { segment_states } } + + #[cfg(test)] + pub fn segment_entry(&self, segment_id: &SegmentId) -> Option { + self.segment_states.get(segment_id).cloned() + } } #[cfg(test)] @@ -138,7 +132,7 @@ mod tests { let segment_id_merged = SegmentId::generate_random(); { - let segment_meta = SegmentMeta::new(segment_id_a); + let segment_meta = SegmentMeta::new(segment_id_a, 0u32); let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None); segment_register.add_segment_entry(segment_entry); } @@ -151,7 +145,7 @@ mod tests { ); assert_eq!(segment_ids(&segment_register), vec![segment_id_a]); { - let segment_meta = SegmentMeta::new(segment_id_b); + let segment_meta = SegmentMeta::new(segment_id_b, 0u32); let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None); segment_register.add_segment_entry(segment_entry); } @@ -181,7 +175,7 @@ mod tests { segment_register.remove_segment(&segment_id_a); segment_register.remove_segment(&segment_id_b); { - let segment_meta_merged = SegmentMeta::new(segment_id_merged); + let segment_meta_merged = SegmentMeta::new(segment_id_merged, 0u32); let segment_entry = SegmentEntry::new(segment_meta_merged, delete_queue.cursor(), None); segment_register.add_segment_entry(segment_entry); } diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index 7d4598660..60ae86ba4 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -7,11 +7,11 @@ use core::SegmentMeta; use core::SerializableSegment; use core::META_FILEPATH; use directory::Directory; -use directory::FileProtection; -use error::{Error, ErrorKind, Result}; +use error::{Error, ErrorKind, Result, ResultExt}; use futures::oneshot; use futures::sync::oneshot::Receiver; use futures::Future; +use futures_cpupool::Builder as CpuPoolBuilder; use futures_cpupool::CpuFuture; use futures_cpupool::CpuPool; use indexer::delete_queue::DeleteCursor; @@ -29,8 +29,7 @@ use std::collections::HashMap; use std::io::Write; use std::mem; use std::ops::DerefMut; -use std::sync::atomic::Ordering; -use std::sync::atomic::{AtomicBool, AtomicUsize}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; use std::sync::RwLock; use std::thread; @@ -87,38 +86,19 @@ pub fn save_metas( pub struct SegmentUpdater(Arc); fn perform_merge( - segment_ids: &[SegmentId], - segment_updater: &SegmentUpdater, + index: &Index, + mut segment_entries: Vec, mut merged_segment: Segment, target_opstamp: u64, ) -> Result { // first we need to apply deletes to our segment. - info!("Start merge: {:?}", segment_ids); - let index = &segment_updater.0.index; + // TODO add logging let schema = index.schema(); - let mut segment_entries = vec![]; - let mut file_protections: Vec = vec![]; - - for segment_id in segment_ids { - if let Some(mut segment_entry) = segment_updater.0.segment_manager.segment_entry(segment_id) - { - let segment = index.segment(segment_entry.meta().clone()); - if let Some(file_protection) = - advance_deletes(segment, &mut segment_entry, target_opstamp)? - { - file_protections.push(file_protection); - } - segment_entries.push(segment_entry); - } else { - error!("Error, had to abort merge as some of the segment is not managed anymore."); - let msg = format!( - "Segment {:?} requested for merge is not managed.", - segment_id - ); - bail!(ErrorKind::InvalidArgument(msg)); - } + for segment_entry in &mut segment_entries { + let segment = index.segment(segment_entry.meta().clone()); + advance_deletes(segment, segment_entry, target_opstamp)?; } let delete_cursor = segment_entries[0].delete_cursor().clone(); @@ -135,13 +115,13 @@ fn perform_merge( // to merge the two segments. let segment_serializer = SegmentSerializer::for_segment(&mut merged_segment) - .expect("Creating index serializer failed"); + .chain_err(|| "Creating index serializer failed")?; let num_docs = merger .write(segment_serializer) - .expect("Serializing merged index failed"); - let mut segment_meta = SegmentMeta::new(merged_segment.id()); - segment_meta.set_max_doc(num_docs); + .chain_err(|| "Serializing merged index failed")?; + + let segment_meta = SegmentMeta::new(merged_segment.id(), num_docs); let after_merge_segment_entry = SegmentEntry::new(segment_meta.clone(), delete_cursor, None); Ok(after_merge_segment_entry) @@ -167,8 +147,12 @@ impl SegmentUpdater { ) -> Result { let segments = index.searchable_segment_metas()?; let segment_manager = SegmentManager::from_segments(segments, delete_cursor); + let pool = CpuPoolBuilder::new() + .name_prefix("segment_updater") + .pool_size(1) + .create(); Ok(SegmentUpdater(Arc::new(InnerSegmentUpdater { - pool: CpuPool::new(1), + pool, index, segment_manager, merge_policy: RwLock::new(Box::new(DefaultMergePolicy::default())), @@ -283,69 +267,85 @@ impl SegmentUpdater { }).wait() } - pub fn start_merge(&self, segment_ids: &[SegmentId]) -> Receiver { - self.0.segment_manager.start_merge(segment_ids); + pub fn start_merge(&self, segment_ids: &[SegmentId]) -> Result> { + //let future_merged_segment = */ + let segment_ids_vec = segment_ids.to_vec(); + self.run_async(move |segment_updater| { + segment_updater.start_merge_impl(&segment_ids_vec[..]) + }).wait()? + } + + // `segment_ids` is required to be non-empty. + fn start_merge_impl(&self, segment_ids: &[SegmentId]) -> Result> { + assert!(!segment_ids.is_empty(), "Segment_ids cannot be empty."); + let segment_updater_clone = self.clone(); + let segment_entries: Vec = self.0.segment_manager.start_merge(segment_ids)?; let segment_ids_vec = segment_ids.to_vec(); let merging_thread_id = self.get_merging_thread_id(); + info!( + "Starting merge thread #{} - {:?}", + merging_thread_id, segment_ids + ); let (merging_future_send, merging_future_recv) = oneshot(); - if segment_ids.is_empty() { - return merging_future_recv; - } - let target_opstamp = self.0.stamper.stamp(); - let merging_join_handle = thread::spawn(move || { - // first we need to apply deletes to our segment. - let merged_segment = segment_updater_clone.new_segment(); - let merged_segment_id = merged_segment.id(); - let merge_result = perform_merge( - &segment_ids_vec, - &segment_updater_clone, - merged_segment, - target_opstamp, - ); - match merge_result { - Ok(after_merge_segment_entry) => { - let merged_segment_meta = after_merge_segment_entry.meta().clone(); - segment_updater_clone - .end_merge(segment_ids_vec, after_merge_segment_entry) - .expect("Segment updater thread is corrupted."); + // first we need to apply deletes to our segment. + let merging_join_handle = thread::Builder::new() + .name(format!("mergingthread-{}", merging_thread_id)) + .spawn(move || { + // first we need to apply deletes to our segment. + let merged_segment = segment_updater_clone.new_segment(); + let merged_segment_id = merged_segment.id(); + let merge_result = perform_merge( + &segment_updater_clone.0.index, + segment_entries, + merged_segment, + target_opstamp, + ); - // the future may fail if the listener of the oneshot future - // has been destroyed. - // - // This is not a problem here, so we just ignore any - // possible error. - let _merging_future_res = merging_future_send.send(merged_segment_meta); - } - Err(e) => { - error!("Merge of {:?} was cancelled: {:?}", segment_ids_vec, e); - // ... cancel merge - if cfg!(test) { - panic!("Merge failed."); + match merge_result { + Ok(after_merge_segment_entry) => { + let merged_segment_meta = after_merge_segment_entry.meta().clone(); + segment_updater_clone + .end_merge(segment_ids_vec, after_merge_segment_entry) + .expect("Segment updater thread is corrupted."); + + // the future may fail if the listener of the oneshot future + // has been destroyed. + // + // This is not a problem here, so we just ignore any + // possible error. + let _merging_future_res = merging_future_send.send(merged_segment_meta); + } + Err(e) => { + warn!("Merge of {:?} was cancelled: {:?}", segment_ids_vec, e); + // ... cancel merge + if cfg!(test) { + panic!("Merge failed."); + } + segment_updater_clone.cancel_merge(&segment_ids_vec, merged_segment_id); + // merging_future_send will be dropped, sending an error to the future. } - segment_updater_clone.cancel_merge(&segment_ids_vec, merged_segment_id); - // merging_future_send will be dropped, sending an error to the future. } - } - segment_updater_clone - .0 - .merging_threads - .write() - .unwrap() - .remove(&merging_thread_id); - Ok(()) - }); + segment_updater_clone + .0 + .merging_threads + .write() + .unwrap() + .remove(&merging_thread_id); + Ok(()) + }) + .expect("Failed to spawn a thread."); self.0 .merging_threads .write() .unwrap() .insert(merging_thread_id, merging_join_handle); - merging_future_recv + Ok(merging_future_recv) } fn consider_merge_options(&self) { @@ -358,8 +358,18 @@ impl SegmentUpdater { let committed_merge_candidates = merge_policy.compute_merge_candidates(&committed_segments); merge_candidates.extend_from_slice(&committed_merge_candidates[..]); for MergeCandidate(segment_metas) in merge_candidates { - if let Err(e) = self.start_merge(&segment_metas).fuse().poll() { - error!("The merge task failed quickly after starting: {:?}", e); + match self.start_merge_impl(&segment_metas) { + Ok(merge_future) => { + if let Err(e) = merge_future.fuse().poll() { + error!("The merge task failed quickly after starting: {:?}", e); + } + } + Err(err) => { + warn!( + "Starting the merge failed for the following reason. This is not fatal. {}", + err + ); + } } } } @@ -382,7 +392,6 @@ impl SegmentUpdater { self.run_async(move |segment_updater| { info!("End merge {:?}", after_merge_segment_entry.meta()); let mut delete_cursor = after_merge_segment_entry.delete_cursor().clone(); - let mut _file_protection_opt = None; if let Some(delete_operation) = delete_cursor.get() { let committed_opstamp = segment_updater .0 @@ -393,29 +402,22 @@ impl SegmentUpdater { if delete_operation.opstamp < committed_opstamp { let index = &segment_updater.0.index; let segment = index.segment(after_merge_segment_entry.meta().clone()); - match advance_deletes( - segment, - &mut after_merge_segment_entry, - committed_opstamp, - ) { - Ok(file_protection_opt_res) => { - _file_protection_opt = file_protection_opt_res; - } - Err(e) => { - error!( - "Merge of {:?} was cancelled (advancing deletes failed): {:?}", - before_merge_segment_ids, e - ); - // ... cancel merge - if cfg!(test) { - panic!("Merge failed."); - } - segment_updater.cancel_merge( - &before_merge_segment_ids, - after_merge_segment_entry.segment_id(), - ); - return; + if let Err(e) = + advance_deletes(segment, &mut after_merge_segment_entry, committed_opstamp) + { + error!( + "Merge of {:?} was cancelled (advancing deletes failed): {:?}", + before_merge_segment_ids, e + ); + // ... cancel merge + if cfg!(test) { + panic!("Merge failed."); } + segment_updater.cancel_merge( + &before_merge_segment_ids, + after_merge_segment_entry.segment_id(), + ); + return; } } } diff --git a/src/lib.rs b/src/lib.rs index a23209708..cf94a1dad 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -180,6 +180,8 @@ mod macros; pub use error::{Error, ErrorKind, ResultExt}; +extern crate census; + /// Tantivy result. pub type Result = std::result::Result; diff --git a/src/postings/postings_writer.rs b/src/postings/postings_writer.rs index 5967a4827..fe56795e7 100644 --- a/src/postings/postings_writer.rs +++ b/src/postings/postings_writer.rs @@ -94,8 +94,7 @@ impl MultiFieldPostingsWriter { &self, serializer: &mut InvertedIndexSerializer, ) -> Result>> { - let mut term_offsets: Vec<(&[u8], Addr, UnorderedTermId)> = self - .term_index + let mut term_offsets: Vec<(&[u8], Addr, UnorderedTermId)> = self.term_index .iter() .map(|(term_bytes, addr, bucket_id)| (term_bytes, addr, bucket_id as UnorderedTermId)) .collect(); diff --git a/src/postings/recorder.rs b/src/postings/recorder.rs index c355a78ba..e787ba5e9 100644 --- a/src/postings/recorder.rs +++ b/src/postings/recorder.rs @@ -107,8 +107,7 @@ impl Recorder for TermFrequencyRecorder { fn serialize(&self, serializer: &mut FieldSerializer, heap: &MemoryArena) -> io::Result<()> { // the last document has not been closed... // its term freq is self.current_tf. - let mut doc_iter = self - .stack + let mut doc_iter = self.stack .iter(heap) .chain(Some(self.current_tf).into_iter()); diff --git a/src/postings/segment_postings.rs b/src/postings/segment_postings.rs index 578a256b6..185732451 100644 --- a/src/postings/segment_postings.rs +++ b/src/postings/segment_postings.rs @@ -399,8 +399,7 @@ impl BlockSegmentPostings { /// Returns false iff there was no remaining blocks. pub fn advance(&mut self) -> bool { if self.num_bitpacked_blocks > 0 { - let num_consumed_bytes = self - .doc_decoder + let num_consumed_bytes = self.doc_decoder .uncompress_block_sorted(self.remaining_data.as_ref(), self.doc_offset); self.remaining_data.advance(num_consumed_bytes); match self.freq_reading_option { @@ -410,8 +409,7 @@ impl BlockSegmentPostings { self.remaining_data.advance(num_bytes_to_skip); } FreqReadingOption::ReadFreq => { - let num_consumed_bytes = self - .freq_decoder + let num_consumed_bytes = self.freq_decoder .uncompress_block_unsorted(self.remaining_data.as_ref()); self.remaining_data.advance(num_consumed_bytes); } diff --git a/src/postings/serializer.rs b/src/postings/serializer.rs index 24765816f..5368fc38f 100644 --- a/src/postings/serializer.rs +++ b/src/postings/serializer.rs @@ -160,8 +160,7 @@ impl<'a> FieldSerializer<'a> { } fn current_term_info(&self) -> TermInfo { - let (filepos, offset) = self - .positions_serializer_opt + let (filepos, offset) = self.positions_serializer_opt .as_ref() .map(|positions_serializer| positions_serializer.addr()) .unwrap_or((0u64, 0u8)); @@ -273,8 +272,7 @@ impl PostingsSerializer { if self.doc_ids.len() == COMPRESSION_BLOCK_SIZE { { // encode the doc ids - let block_encoded: &[u8] = self - .block_encoder + let block_encoded: &[u8] = self.block_encoder .compress_block_sorted(&self.doc_ids, self.last_doc_id_encoded); self.last_doc_id_encoded = self.doc_ids[self.doc_ids.len() - 1]; self.postings_write.write_all(block_encoded)?; @@ -300,16 +298,14 @@ impl PostingsSerializer { // In that case, the remaining part is encoded // using variable int encoding. { - let block_encoded = self - .block_encoder + let block_encoded = self.block_encoder .compress_vint_sorted(&self.doc_ids, self.last_doc_id_encoded); self.postings_write.write_all(block_encoded)?; self.doc_ids.clear(); } // ... Idem for term frequencies if self.termfreq_enabled { - let block_encoded = self - .block_encoder + let block_encoded = self.block_encoder .compress_vint_unsorted(&self.term_freqs[..]); self.postings_write.write_all(block_encoded)?; self.term_freqs.clear(); diff --git a/src/query/boolean_query/boolean_query.rs b/src/query/boolean_query/boolean_query.rs index 2a0c1f113..286d9f449 100644 --- a/src/query/boolean_query/boolean_query.rs +++ b/src/query/boolean_query/boolean_query.rs @@ -41,8 +41,7 @@ impl From)>> for BooleanQuery { impl Query for BooleanQuery { fn weight(&self, searcher: &Searcher, scoring_enabled: bool) -> Result> { - let sub_weights = self - .subqueries + let sub_weights = self.subqueries .iter() .map(|&(ref occur, ref subquery)| { Ok((*occur, subquery.weight(searcher, scoring_enabled)?)) diff --git a/src/query/query_parser/logical_ast.rs b/src/query/query_parser/logical_ast.rs index 1897880ac..eefb9f7c5 100644 --- a/src/query/query_parser/logical_ast.rs +++ b/src/query/query_parser/logical_ast.rs @@ -1,15 +1,20 @@ use query::Occur; use schema::Field; use schema::Term; +use schema::Type; use std::fmt; use std::ops::Bound; -use schema::Type; #[derive(Clone)] pub enum LogicalLiteral { Term(Term), Phrase(Vec), - Range { field: Field, value_type: Type, lower: Bound, upper: Bound }, + Range { + field: Field, + value_type: Type, + lower: Bound, + upper: Bound, + }, All, } @@ -59,7 +64,11 @@ impl fmt::Debug for LogicalLiteral { match *self { LogicalLiteral::Term(ref term) => write!(formatter, "{:?}", term), LogicalLiteral::Phrase(ref terms) => write!(formatter, "\"{:?}\"", terms), - LogicalLiteral::Range { ref lower, ref upper, .. } => write!(formatter, "({:?} TO {:?})", lower, upper), + LogicalLiteral::Range { + ref lower, + ref upper, + .. + } => write!(formatter, "({:?} TO {:?})", lower, upper), LogicalLiteral::All => write!(formatter, "*"), } } diff --git a/src/query/query_parser/query_grammar.rs b/src/query/query_parser/query_grammar.rs index 1eceece1c..29ad158df 100644 --- a/src/query/query_parser/query_grammar.rs +++ b/src/query/query_parser/query_grammar.rs @@ -4,15 +4,16 @@ use combine::*; use query::query_parser::user_input_ast::UserInputBound; fn field>() -> impl Parser { - (letter(), many(satisfy(|c: char| c.is_alphanumeric() || c == '_'))) - .map(|(s1, s2): (char, String)| format!("{}{}", s1, s2)) + ( + letter(), + many(satisfy(|c: char| c.is_alphanumeric() || c == '_')), + ).map(|(s1, s2): (char, String)| format!("{}{}", s1, s2)) } fn word>() -> impl Parser { many1(satisfy(|c: char| c.is_alphanumeric())) } - fn negative_number>() -> impl Parser { (char('-'), many1(satisfy(|c: char| c.is_numeric()))) .map(|(s1, s2): (char, String)| format!("{}{}", s1, s2)) @@ -45,9 +46,7 @@ where } fn range>(input: I) -> ParseResult { - let term_val = || { - word().or(negative_number()) - }; + let term_val = || word().or(negative_number()); let lower_bound = { let excl = (char('{'), term_val()).map(|(_, w)| UserInputBound::Exclusive(w)); let incl = (char('['), term_val()).map(|(_, w)| UserInputBound::Inclusive(w)); @@ -59,8 +58,18 @@ fn range>(input: I) -> ParseResult { // TODO: this backtracking should be unnecessary try(excl).or(incl) }; - (optional((field(), char(':')).map(|x| x.0)), lower_bound, spaces(), string("TO"), spaces(), upper_bound) - .map(|(field, lower, _, _, _, upper)| UserInputAST::Range { field, lower, upper }) + ( + optional((field(), char(':')).map(|x| x.0)), + lower_bound, + spaces(), + string("TO"), + spaces(), + upper_bound, + ).map(|(field, lower, _, _, _, upper)| UserInputAST::Range { + field, + lower, + upper, + }) .parse_stream(input) } diff --git a/src/query/query_parser/query_parser.rs b/src/query/query_parser/query_parser.rs index 55e0845c1..43d11000f 100644 --- a/src/query/query_parser/query_parser.rs +++ b/src/query/query_parser/query_parser.rs @@ -2,21 +2,21 @@ use super::logical_ast::*; use super::query_grammar::parse_to_ast; use super::user_input_ast::*; use core::Index; +use query::AllQuery; use query::BooleanQuery; use query::Occur; use query::PhraseQuery; use query::Query; +use query::RangeQuery; use query::TermQuery; use schema::IndexRecordOption; use schema::{Field, Schema}; use schema::{FieldType, Term}; +use std::borrow::Cow; use std::num::ParseIntError; +use std::ops::Bound; use std::str::FromStr; use tokenizer::TokenizerManager; -use std::ops::Bound; -use query::RangeQuery; -use query::AllQuery; -use std::borrow::Cow; /// Possible error that may happen when parsing a query. #[derive(Debug, PartialEq, Eq)] @@ -177,7 +177,7 @@ impl QueryParser { fn compute_terms_for_string( &self, field: Field, - phrase: &str + phrase: &str, ) -> Result, QueryParserError> { let field_entry = self.schema.get_field_entry(field); let field_type = field_entry.field_type(); @@ -240,9 +240,7 @@ impl QueryParser { )) } } - FieldType::HierarchicalFacet => { - Ok(vec![Term::from_field_text(field, phrase)]) - } + FieldType::HierarchicalFacet => Ok(vec![Term::from_field_text(field, phrase)]), FieldType::Bytes => { let field_name = self.schema.get_field_name(field).to_string(); Err(QueryParserError::FieldNotIndexed(field_name)) @@ -258,7 +256,9 @@ impl QueryParser { let terms = self.compute_terms_for_string(field, phrase)?; match terms.len() { 0 => Ok(None), - 1 => Ok(Some(LogicalLiteral::Term(terms.into_iter().next().unwrap()))), + 1 => Ok(Some(LogicalLiteral::Term( + terms.into_iter().next().unwrap(), + ))), _ => Ok(Some(LogicalLiteral::Phrase(terms))), } } @@ -271,10 +271,14 @@ impl QueryParser { } } - fn resolve_bound(&self, field: Field, bound: &UserInputBound) -> Result, QueryParserError> { + fn resolve_bound( + &self, + field: Field, + bound: &UserInputBound, + ) -> Result, QueryParserError> { let terms = self.compute_terms_for_string(field, bound.term_str())?; if terms.len() != 1 { - return Err(QueryParserError::RangeMustNotHavePhrase) + return Err(QueryParserError::RangeMustNotHavePhrase); } let term = terms.into_iter().next().unwrap(); match *bound { @@ -283,7 +287,10 @@ impl QueryParser { } } - fn resolved_fields(&self, given_field: &Option) -> Result, QueryParserError> { + fn resolved_fields( + &self, + given_field: &Option, + ) -> Result, QueryParserError> { match *given_field { None => { if self.default_fields.is_empty() { @@ -291,7 +298,7 @@ impl QueryParser { } else { Ok(Cow::from(&self.default_fields[..])) } - }, + } Some(ref field) => Ok(Cow::from(vec![self.resolve_field_name(&*field)?])), } } @@ -319,28 +326,41 @@ impl QueryParser { let (occur, logical_sub_queries) = self.compute_logical_ast_with_occur(*subquery)?; Ok((compose_occur(Occur::Must, occur), logical_sub_queries)) } - UserInputAST::Range { field, lower, upper } => { + UserInputAST::Range { + field, + lower, + upper, + } => { let fields = self.resolved_fields(&field)?; - let mut clauses = fields.iter().map(|&field| { - let field_entry = self.schema.get_field_entry(field); - let value_type = field_entry.field_type().value_type(); - Ok(LogicalAST::Leaf(Box::new(LogicalLiteral::Range { - field, - value_type, - lower: self.resolve_bound(field, &lower)?, - upper: self.resolve_bound(field, &upper)?, - }))) - }).collect::, QueryParserError>>()?; + let mut clauses = fields + .iter() + .map(|&field| { + let field_entry = self.schema.get_field_entry(field); + let value_type = field_entry.field_type().value_type(); + Ok(LogicalAST::Leaf(Box::new(LogicalLiteral::Range { + field, + value_type, + lower: self.resolve_bound(field, &lower)?, + upper: self.resolve_bound(field, &upper)?, + }))) + }) + .collect::, QueryParserError>>()?; let result_ast = if clauses.len() == 1 { clauses.pop().unwrap() } else { - LogicalAST::Clause(clauses.into_iter().map(|clause| (Occur::Should, clause)).collect()) + LogicalAST::Clause( + clauses + .into_iter() + .map(|clause| (Occur::Should, clause)) + .collect(), + ) }; Ok((Occur::Should, result_ast)) } - UserInputAST::All => { - Ok((Occur::Should, LogicalAST::Leaf(Box::new(LogicalLiteral::All)))) - } + UserInputAST::All => Ok(( + Occur::Should, + LogicalAST::Leaf(Box::new(LogicalLiteral::All)), + )), UserInputAST::Leaf(literal) => { let term_phrases: Vec<(Field, String)> = match literal.field_name { Some(ref field_name) => { @@ -403,9 +423,12 @@ fn convert_literal_to_query(logical_literal: LogicalLiteral) -> Box { match logical_literal { LogicalLiteral::Term(term) => Box::new(TermQuery::new(term, IndexRecordOption::WithFreqs)), LogicalLiteral::Phrase(terms) => Box::new(PhraseQuery::new(terms)), - LogicalLiteral::Range { field, value_type, lower, upper } => { - Box::new(RangeQuery::new_term_bounds(field, value_type, lower, upper)) - }, + LogicalLiteral::Range { + field, + value_type, + lower, + upper, + } => Box::new(RangeQuery::new_term_bounds(field, value_type, lower, upper)), LogicalLiteral::All => Box::new(AllQuery), } } @@ -611,11 +634,7 @@ mod test { Excluded(Term([0, 0, 0, 0, 116, 111, 116, 111])))", false, ); - test_parse_query_to_logical_ast_helper( - "*", - "*", - false, - ); + test_parse_query_to_logical_ast_helper("*", "*", false); } #[test] diff --git a/src/query/query_parser/user_input_ast.rs b/src/query/query_parser/user_input_ast.rs index 21138e978..96606915d 100644 --- a/src/query/query_parser/user_input_ast.rs +++ b/src/query/query_parser/user_input_ast.rs @@ -46,7 +46,11 @@ pub enum UserInputAST { Clause(Vec>), Not(Box), Must(Box), - Range { field: Option, lower: UserInputBound, upper: UserInputBound }, + Range { + field: Option, + lower: UserInputBound, + upper: UserInputBound, + }, All, Leaf(Box), } @@ -75,7 +79,11 @@ impl fmt::Debug for UserInputAST { Ok(()) } UserInputAST::Not(ref subquery) => write!(formatter, "-({:?})", subquery), - UserInputAST::Range { ref field, ref lower, ref upper } => { + UserInputAST::Range { + ref field, + ref lower, + ref upper, + } => { if let &Some(ref field) = field { write!(formatter, "{}:", field)?; } @@ -83,7 +91,7 @@ impl fmt::Debug for UserInputAST { write!(formatter, " TO ")?; upper.display_upper(formatter)?; Ok(()) - }, + } UserInputAST::All => write!(formatter, "*"), UserInputAST::Leaf(ref subquery) => write!(formatter, "{:?}", subquery), } diff --git a/src/query/range_query.rs b/src/query/range_query.rs index 90453e4e5..17d09657f 100644 --- a/src/query/range_query.rs +++ b/src/query/range_query.rs @@ -97,7 +97,7 @@ impl RangeQuery { field: Field, value_type: Type, left_bound: Bound, - right_bound: Bound + right_bound: Bound, ) -> RangeQuery { let verify_and_unwrap_term = |val: &Term| { assert_eq!(field, val.field()); diff --git a/src/store/skiplist/skiplist_builder.rs b/src/store/skiplist/skiplist_builder.rs index 8ffc57332..6a698a2c7 100644 --- a/src/store/skiplist/skiplist_builder.rs +++ b/src/store/skiplist/skiplist_builder.rs @@ -72,8 +72,7 @@ impl SkipListBuilder { let mut skip_pointer = self.data_layer.insert(key, dest)?; loop { skip_pointer = match skip_pointer { - Some((skip_doc_id, skip_offset)) => self - .get_skip_layer(layer_id) + Some((skip_doc_id, skip_offset)) => self.get_skip_layer(layer_id) .insert(skip_doc_id, &skip_offset)?, None => { return Ok(()); diff --git a/src/termdict/termdict.rs b/src/termdict/termdict.rs index 03738e694..f633211ef 100644 --- a/src/termdict/termdict.rs +++ b/src/termdict/termdict.rs @@ -164,8 +164,7 @@ impl TermDictionary { let fst = self.fst_index.as_fst(); let mut node = fst.root(); while ord != 0 || !node.is_final() { - if let Some(transition) = node - .transitions() + if let Some(transition) = node.transitions() .take_while(|transition| transition.out.value() <= ord) .last() {