From 7ba771ed1ba2c009f4b695bf53a69d206f5ff291 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Tue, 18 May 2021 14:33:36 +0900 Subject: [PATCH] Replaced RawDocument by OwnedBytes (#1046) --- src/indexer/merger.rs | 23 ++++++++++------ src/indexer/segment_writer.rs | 6 ++--- src/store/mod.rs | 1 - src/store/reader.rs | 51 +++++++++-------------------------- 4 files changed, 29 insertions(+), 52 deletions(-) diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 9ac9ce81f..b65b5431a 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -1,4 +1,5 @@ use super::doc_id_mapping::DocIdMapping; +use crate::error::DataCorruption; use crate::fastfield::DeleteBitSet; use crate::fastfield::FastFieldReader; use crate::fastfield::FastFieldSerializer; @@ -927,19 +928,25 @@ impl IndexMerger { .collect(); if let Some(doc_id_mapping) = doc_id_mapping { for (old_doc_id, reader_with_ordinal) in doc_id_mapping { - let store_reader = &mut document_iterators[reader_with_ordinal.ordinal as usize]; - let raw_doc = store_reader.next().expect(&format!( - "unexpected missing document in docstore on merge, doc id {:?}", - old_doc_id - ))?; - store_writer.store_bytes(raw_doc.get_bytes())?; + let doc_bytes_it = &mut document_iterators[reader_with_ordinal.ordinal as usize]; + if let Some(doc_bytes_res) = doc_bytes_it.next() { + let doc_bytes = doc_bytes_res?; + store_writer.store_bytes(&doc_bytes)?; + } else { + return Err(DataCorruption::comment_only(&format!( + "unexpected missing document in docstore on merge, doc id {:?}", + old_doc_id + )) + .into()); + } } } else { for reader in &self.readers { let store_reader = reader.get_store_reader()?; if reader.num_deleted_docs() > 0 { - for raw_doc in store_reader.iter_raw(reader.delete_bitset()) { - store_writer.store_bytes(raw_doc?.get_bytes())?; + for doc_bytes_res in store_reader.iter_raw(reader.delete_bitset()) { + let doc_bytes = doc_bytes_res?; + store_writer.store_bytes(&doc_bytes)?; } } else { store_writer.stack(&store_reader)?; diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index 4ff505aa2..55b6ede7e 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -354,10 +354,8 @@ fn write( .open_read(SegmentComponent::TempStore)?, )?; for old_doc_id in doc_id_map.iter_old_doc_ids() { - let raw_doc = store_read.get_raw(*old_doc_id)?; - serializer - .get_store_writer() - .store_bytes(raw_doc.get_bytes())?; + let doc_bytes = store_read.get_document_bytes(*old_doc_id)?; + serializer.get_store_writer().store_bytes(&doc_bytes)?; } // TODO delete temp store } diff --git a/src/store/mod.rs b/src/store/mod.rs index d7b12a6e3..d9652c0af 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -36,7 +36,6 @@ and should rely on either mod index; mod reader; mod writer; -pub use self::reader::RawDocument; pub use self::reader::StoreReader; pub use self::writer::StoreWriter; diff --git a/src/store/reader.rs b/src/store/reader.rs index b6f25a0e8..e96d28233 100644 --- a/src/store/reader.rs +++ b/src/store/reader.rs @@ -15,7 +15,7 @@ use std::sync::{Arc, Mutex}; const LRU_CACHE_CAPACITY: usize = 100; -type Block = Arc>; +type Block = OwnedBytes; type BlockCache = Arc>>; @@ -74,7 +74,7 @@ impl StoreReader { let mut decompressed_block = vec![]; decompress(compressed_block.as_slice(), &mut decompressed_block)?; - let block = Arc::new(decompressed_block); + let block = OwnedBytes::new(decompressed_block); self.cache .lock() .unwrap() @@ -93,9 +93,8 @@ impl StoreReader { /// It should not be called to score documents /// for instance. pub fn get(&self, doc_id: DocId) -> crate::Result { - let raw_doc = self.get_raw(doc_id)?; - let mut cursor = raw_doc.get_bytes(); - Ok(Document::deserialize(&mut cursor)?) + let mut doc_bytes = self.get_document_bytes(doc_id)?; + Ok(Document::deserialize(&mut doc_bytes)?) } /// Reads raw bytes of a given document. Returns `RawDocument`, which contains the block of a document and its start and end @@ -106,7 +105,7 @@ impl StoreReader { /// so accessing docs from the same compressed block should be faster. /// For that reason a store reader should be kept and reused. /// - pub fn get_raw(&self, doc_id: DocId) -> crate::Result { + pub fn get_document_bytes(&self, doc_id: DocId) -> crate::Result { let checkpoint = self.block_checkpoint(doc_id).ok_or_else(|| { crate::TantivyError::InvalidArgument(format!("Failed to lookup Doc #{}.", doc_id)) })?; @@ -121,11 +120,7 @@ impl StoreReader { let doc_length = VInt::deserialize(&mut cursor)?.val() as usize; let start_pos = cursor_len_before - cursor.len(); let end_pos = cursor_len_before - cursor.len() + doc_length; - Ok(RawDocument { - block, - start_pos, - end_pos, - }) + Ok(block.slice(start_pos..end_pos)) } /// Iterator over all Documents in their order as they are stored in the doc store. @@ -135,10 +130,9 @@ impl StoreReader { &'b self, delete_bitset: Option<&'a DeleteBitSet>, ) -> impl Iterator> + 'b { - self.iter_raw(delete_bitset).map(|raw_doc| { - let raw_doc = raw_doc?; - let mut cursor = raw_doc.get_bytes(); - Ok(Document::deserialize(&mut cursor)?) + self.iter_raw(delete_bitset).map(|doc_bytes_res| { + let mut doc_bytes = doc_bytes_res?; + Ok(Document::deserialize(&mut doc_bytes)?) }) } @@ -148,7 +142,7 @@ impl StoreReader { pub(crate) fn iter_raw<'a: 'b, 'b>( &'b self, delete_bitset: Option<&'a DeleteBitSet>, - ) -> impl Iterator> + 'b { + ) -> impl Iterator> + 'b { let last_docid = self .block_checkpoints() .last() @@ -214,13 +208,9 @@ impl StoreReader { } }; let end_pos = block_start_pos + doc_length; - let raw_doc = RawDocument { - block, - start_pos: block_start_pos, - end_pos, - }; + let doc_bytes = block.slice(block_start_pos..end_pos); block_start_pos = end_pos; - Ok(raw_doc) + Ok(doc_bytes) }) } @@ -230,23 +220,6 @@ impl StoreReader { } } -/// Get the bytes of a serialized `Document` in a decompressed block. -pub struct RawDocument { - /// the block of data containing multiple documents - block: Arc>, - /// start position of the document in the block - start_pos: usize, - /// end position of the document in the block - end_pos: usize, -} - -impl RawDocument { - /// Get the bytes of a serialized `Document` in a decompressed block. - pub fn get_bytes(&self) -> &[u8] { - &self.block[self.start_pos..self.end_pos] - } -} - fn split_file(data: FileSlice) -> io::Result<(FileSlice, FileSlice)> { let (data, footer_len_bytes) = data.split_from_end(size_of::()); let serialized_offset: OwnedBytes = footer_len_bytes.read_bytes()?;