diff --git a/src/common/composite_file.rs b/src/common/composite_file.rs index 34cfe2a59..c4573a7f7 100644 --- a/src/common/composite_file.rs +++ b/src/common/composite_file.rs @@ -186,7 +186,7 @@ mod test { use super::{CompositeFile, CompositeWrite}; use crate::common::BinarySerializable; use crate::common::VInt; - use crate::directory::{Directory, RAMDirectory}; + use crate::directory::{Directory, RAMDirectory, ReadOnlyDirectory}; use crate::schema::Field; use std::io::Write; use std::path::Path; diff --git a/src/core/segment.rs b/src/core/segment.rs index 41dc6c91e..910ce1371 100644 --- a/src/core/segment.rs +++ b/src/core/segment.rs @@ -4,7 +4,7 @@ use crate::core::SegmentId; use crate::core::SegmentMeta; use crate::directory::error::{OpenReadError, OpenWriteError}; use crate::directory::Directory; -use crate::directory::{ReadOnlySource, WritePtr}; +use crate::directory::{ReadOnlyDirectory, ReadOnlySource, WritePtr}; use crate::indexer::segment_serializer::SegmentSerializer; use crate::schema::Schema; use crate::Opstamp; diff --git a/src/directory/bundle_directory.rs b/src/directory/bundle_directory.rs new file mode 100644 index 000000000..a2f1776d4 --- /dev/null +++ b/src/directory/bundle_directory.rs @@ -0,0 +1,97 @@ +use crate::directory::directory::ReadOnlyDirectory; +use crate::directory::error::OpenReadError; +use crate::directory::ReadOnlySource; +use crate::error::DataCorruption; +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +#[derive(Clone)] +struct BundleDirectory { + source_map: Arc>, +} + +impl BundleDirectory { + pub fn from_source(source: ReadOnlySource) -> Result { + let mut index_offset_buf = [0u8; 8]; + let (body_idx, footer_offset) = source.split_from_end(8); + index_offset_buf.copy_from_slice(footer_offset.as_slice()); + let offset = u64::from_le_bytes(index_offset_buf); + let (body_source, idx_source) = body_idx.split(offset as usize); + let idx: HashMap = serde_json::from_slice(idx_source.as_slice()) + .map_err(|err| { + let msg = format!("Failed to read index from bundle. {:?}", err); + DataCorruption::comment_only(msg) + })?; + let source_map: HashMap = idx + .into_iter() + .map(|(path, (start, stop))| { + let source = body_source.slice(start as usize, stop as usize); + (path, source) + }) + .collect(); + Ok(BundleDirectory { + source_map: Arc::new(source_map), + }) + } +} + +impl ReadOnlyDirectory for BundleDirectory { + fn open_read(&self, path: &Path) -> Result { + self.source_map + .get(path) + .cloned() + .ok_or_else(|| OpenReadError::FileDoesNotExist(path.to_path_buf())) + } + + fn exists(&self, path: &Path) -> bool { + self.source_map.contains_key(path) + } + + fn atomic_read(&self, path: &Path) -> Result, OpenReadError> { + let source = self + .source_map + .get(path) + .ok_or_else(|| OpenReadError::FileDoesNotExist(path.to_path_buf()))?; + Ok(source.as_slice().to_vec()) + } +} + +#[cfg(test)] +mod tests { + use super::BundleDirectory; + use crate::directory::{RAMDirectory, ReadOnlyDirectory, TerminatingWrite}; + use crate::Directory; + use std::io::Write; + use std::path::Path; + + #[test] + fn test_bundle_directory() { + let mut ram_directory = RAMDirectory::default(); + let test_path_atomic = Path::new("testpath_atomic"); + let test_path_wrt = Path::new("testpath_wrt"); + assert!(ram_directory + .atomic_write(test_path_atomic, b"titi") + .is_ok()); + { + let mut test_wrt = ram_directory.open_write(test_path_wrt).unwrap(); + assert!(test_wrt.write_all(b"toto").is_ok()); + assert!(test_wrt.terminate().is_ok()); + } + let mut dest_directory = RAMDirectory::default(); + let bundle_path = Path::new("bundle"); + let mut wrt = dest_directory.open_write(bundle_path).unwrap(); + assert!(ram_directory.serialize_bundle(&mut wrt).is_ok()); + assert!(wrt.terminate().is_ok()); + let source = dest_directory.open_read(bundle_path).unwrap(); + let bundle_directory = BundleDirectory::from_source(source).unwrap(); + assert_eq!( + &bundle_directory.atomic_read(test_path_atomic).unwrap()[..], + b"titi" + ); + assert_eq!( + &bundle_directory.open_read(test_path_wrt).unwrap()[..], + b"toto" + ); + } +} diff --git a/src/directory/directory.rs b/src/directory/directory.rs index 78642a00d..0c922a338 100644 --- a/src/directory/directory.rs +++ b/src/directory/directory.rs @@ -100,17 +100,7 @@ fn retry_policy(is_blocking: bool) -> RetryPolicy { } } -/// Write-once read many (WORM) abstraction for where -/// tantivy's data should be stored. -/// -/// There are currently two implementations of `Directory` -/// -/// - The [`MMapDirectory`](struct.MmapDirectory.html), this -/// should be your default choice. -/// - The [`RAMDirectory`](struct.RAMDirectory.html), which -/// should be used mostly for tests. -/// -pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static { +pub trait ReadOnlyDirectory { /// Opens a virtual file for read. /// /// Once a virtual file is open, its data may not @@ -122,6 +112,31 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static { /// You should only use this to read files create with [Directory::open_write]. fn open_read(&self, path: &Path) -> result::Result; + /// Returns true iff the file exists + fn exists(&self, path: &Path) -> bool; + + /// Reads the full content file that has been written using + /// atomic_write. + /// + /// This should only be used for small files. + /// + /// You should only use this to read files create with [Directory::atomic_write]. + fn atomic_read(&self, path: &Path) -> Result, OpenReadError>; +} + +/// Write-once read many (WORM) abstraction for where +/// tantivy's data should be stored. +/// +/// There are currently two implementations of `Directory` +/// +/// - The [`MMapDirectory`](struct.MmapDirectory.html), this +/// should be your default choice. +/// - The [`RAMDirectory`](struct.RAMDirectory.html), which +/// should be used mostly for tests. +/// +pub trait Directory: + DirectoryClone + ReadOnlyDirectory + fmt::Debug + Send + Sync + 'static +{ /// Removes a file /// /// Removing a file will not affect an eventual @@ -131,9 +146,6 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static { /// `DeleteError::DoesNotExist`. fn delete(&self, path: &Path) -> result::Result<(), DeleteError>; - /// Returns true iff the file exists - fn exists(&self, path: &Path) -> bool; - /// Opens a writer for the *virtual file* associated with /// a Path. /// @@ -155,14 +167,6 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static { /// The file may not previously exist. fn open_write(&mut self, path: &Path) -> Result; - /// Reads the full content file that has been written using - /// atomic_write. - /// - /// This should only be used for small files. - /// - /// You should only use this to read files create with [Directory::atomic_write]. - fn atomic_read(&self, path: &Path) -> Result, OpenReadError>; - /// Atomically replace the content of a file with data. /// /// This calls ensure that reads can never *observe* diff --git a/src/directory/managed_directory.rs b/src/directory/managed_directory.rs index 1874119d5..821b8c04b 100644 --- a/src/directory/managed_directory.rs +++ b/src/directory/managed_directory.rs @@ -10,6 +10,7 @@ use crate::directory::{WatchCallback, WatchHandle}; use crate::error::DataCorruption; use crate::Directory; +use crate::directory::directory::ReadOnlyDirectory; use crc32fast::Hasher; use serde_json; use std::collections::HashSet; @@ -264,14 +265,6 @@ impl ManagedDirectory { } impl Directory for ManagedDirectory { - fn open_read(&self, path: &Path) -> result::Result { - let read_only_source = self.directory.open_read(path)?; - let (footer, reader) = Footer::extract_footer(read_only_source) - .map_err(|err| IOError::with_path(path.to_path_buf(), err))?; - footer.is_compatible()?; - Ok(reader) - } - fn open_write(&mut self, path: &Path) -> result::Result { self.register_file_as_managed(path) .map_err(|e| IOError::with_path(path.to_owned(), e))?; @@ -289,18 +282,10 @@ impl Directory for ManagedDirectory { self.directory.atomic_write(path, data) } - fn atomic_read(&self, path: &Path) -> result::Result, OpenReadError> { - self.directory.atomic_read(path) - } - fn delete(&self, path: &Path) -> result::Result<(), DeleteError> { self.directory.delete(path) } - fn exists(&self, path: &Path) -> bool { - self.directory.exists(path) - } - fn acquire_lock(&self, lock: &Lock) -> result::Result { self.directory.acquire_lock(lock) } @@ -310,6 +295,24 @@ impl Directory for ManagedDirectory { } } +impl ReadOnlyDirectory for ManagedDirectory { + fn open_read(&self, path: &Path) -> result::Result { + let read_only_source = self.directory.open_read(path)?; + let (footer, reader) = Footer::extract_footer(read_only_source) + .map_err(|err| IOError::with_path(path.to_path_buf(), err))?; + footer.is_compatible()?; + Ok(reader) + } + + fn exists(&self, path: &Path) -> bool { + self.directory.exists(path) + } + + fn atomic_read(&self, path: &Path) -> result::Result, OpenReadError> { + self.directory.atomic_read(path) + } +} + impl Clone for ManagedDirectory { fn clone(&self) -> ManagedDirectory { ManagedDirectory { @@ -323,7 +326,9 @@ impl Clone for ManagedDirectory { #[cfg(test)] mod tests_mmap_specific { - use crate::directory::{Directory, ManagedDirectory, MmapDirectory, TerminatingWrite}; + use crate::directory::{ + Directory, ManagedDirectory, MmapDirectory, ReadOnlyDirectory, TerminatingWrite, + }; use std::collections::HashSet; use std::fs::OpenOptions; use std::io::Write; diff --git a/src/directory/mmap_directory.rs b/src/directory/mmap_directory.rs index 0c48a0775..166964e80 100644 --- a/src/directory/mmap_directory.rs +++ b/src/directory/mmap_directory.rs @@ -6,6 +6,7 @@ use self::notify::RawEvent; use self::notify::RecursiveMode; use self::notify::Watcher; use crate::core::META_FILEPATH; +use crate::directory::directory::ReadOnlyDirectory; use crate::directory::error::LockError; use crate::directory::error::{ DeleteError, IOError, OpenDirectoryError, OpenReadError, OpenWriteError, @@ -407,24 +408,6 @@ impl TerminatingWrite for SafeFileWriter { } impl Directory for MmapDirectory { - fn open_read(&self, path: &Path) -> result::Result { - debug!("Open Read {:?}", path); - let full_path = self.resolve_path(path); - - let mut mmap_cache = self.inner.mmap_cache.write().map_err(|_| { - let msg = format!( - "Failed to acquired write lock \ - on mmap cache while reading {:?}", - path - ); - IOError::with_path(path.to_owned(), make_io_err(msg)) - })?; - Ok(mmap_cache - .get_mmap(&full_path)? - .map(ReadOnlySource::from) - .unwrap_or_else(ReadOnlySource::empty)) - } - /// Any entry associated to the path in the mmap will be /// removed before the file is deleted. fn delete(&self, path: &Path) -> result::Result<(), DeleteError> { @@ -443,11 +426,6 @@ impl Directory for MmapDirectory { } } - fn exists(&self, path: &Path) -> bool { - let full_path = self.resolve_path(path); - full_path.exists() - } - fn open_write(&mut self, path: &Path) -> Result { debug!("Open Write {:?}", path); let full_path = self.resolve_path(path); @@ -478,25 +456,6 @@ impl Directory for MmapDirectory { Ok(BufWriter::new(Box::new(writer))) } - fn atomic_read(&self, path: &Path) -> Result, OpenReadError> { - let full_path = self.resolve_path(path); - let mut buffer = Vec::new(); - match File::open(&full_path) { - Ok(mut file) => { - file.read_to_end(&mut buffer) - .map_err(|e| IOError::with_path(path.to_owned(), e))?; - Ok(buffer) - } - Err(e) => { - if e.kind() == io::ErrorKind::NotFound { - Err(OpenReadError::FileDoesNotExist(path.to_owned())) - } else { - Err(IOError::with_path(path.to_owned(), e).into()) - } - } - } - } - fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> { debug!("Atomic Write {:?}", path); let full_path = self.resolve_path(path); @@ -530,6 +489,50 @@ impl Directory for MmapDirectory { } } +impl ReadOnlyDirectory for MmapDirectory { + fn open_read(&self, path: &Path) -> result::Result { + debug!("Open Read {:?}", path); + let full_path = self.resolve_path(path); + + let mut mmap_cache = self.inner.mmap_cache.write().map_err(|_| { + let msg = format!( + "Failed to acquired write lock \ + on mmap cache while reading {:?}", + path + ); + IOError::with_path(path.to_owned(), make_io_err(msg)) + })?; + Ok(mmap_cache + .get_mmap(&full_path)? + .map(ReadOnlySource::from) + .unwrap_or_else(ReadOnlySource::empty)) + } + + fn exists(&self, path: &Path) -> bool { + let full_path = self.resolve_path(path); + full_path.exists() + } + + fn atomic_read(&self, path: &Path) -> Result, OpenReadError> { + let full_path = self.resolve_path(path); + let mut buffer = Vec::new(); + match File::open(&full_path) { + Ok(mut file) => { + file.read_to_end(&mut buffer) + .map_err(|e| IOError::with_path(path.to_owned(), e))?; + Ok(buffer) + } + Err(e) => { + if e.kind() == io::ErrorKind::NotFound { + Err(OpenReadError::FileDoesNotExist(path.to_owned())) + } else { + Err(IOError::with_path(path.to_owned(), e).into()) + } + } + } + } +} + #[cfg(test)] mod tests { diff --git a/src/directory/mod.rs b/src/directory/mod.rs index df5e55d81..5e7baae85 100644 --- a/src/directory/mod.rs +++ b/src/directory/mod.rs @@ -7,6 +7,7 @@ WORM directory abstraction. #[cfg(feature = "mmap")] mod mmap_directory; +mod bundle_directory; mod directory; mod directory_lock; mod footer; @@ -19,7 +20,7 @@ mod watch_event_router; pub mod error; pub use self::directory::DirectoryLock; -pub use self::directory::{Directory, DirectoryClone}; +pub use self::directory::{Directory, DirectoryClone, ReadOnlyDirectory}; pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK}; pub use self::ram_directory::RAMDirectory; pub use self::read_only_source::ReadOnlySource; diff --git a/src/directory/ram_directory.rs b/src/directory/ram_directory.rs index 88fc6e6bd..d4549aef5 100644 --- a/src/directory/ram_directory.rs +++ b/src/directory/ram_directory.rs @@ -1,5 +1,6 @@ use crate::common::CountingWriter; use crate::core::META_FILEPATH; +use crate::directory::directory::ReadOnlyDirectory; use crate::directory::error::{DeleteError, OpenReadError, OpenWriteError}; use crate::directory::AntiCallToken; use crate::directory::WatchCallbackList; @@ -117,7 +118,7 @@ impl InnerDirectory { self.fs.values().map(|f| f.len()).sum() } - fn serialize_bundle(self, wrt: &mut WritePtr) -> io::Result<()> { + fn serialize_bundle(&self, wrt: &mut WritePtr) -> io::Result<()> { let mut counting_writer = CountingWriter::wrap(wrt); let mut file_index: HashMap = HashMap::default(); for (path, source) in &self.fs { @@ -126,8 +127,8 @@ impl InnerDirectory { let stop = counting_writer.written_bytes(); file_index.insert(path.to_path_buf(), (start, stop)); } - serde_json::to_writer(&mut counting_writer, &file_index)?; let index_offset = counting_writer.written_bytes(); + serde_json::to_writer(&mut counting_writer, &file_index)?; let index_offset_buffer = index_offset.to_le_bytes(); counting_writer.write_all(&index_offset_buffer[..])?; Ok(()) @@ -167,29 +168,12 @@ impl RAMDirectory { /// This method will fail, write nothing, and return an error if a /// clone of this repository exists. pub fn serialize_bundle(self, wrt: &mut WritePtr) -> io::Result<()> { - let inner_directory = self.try_unwrap().map_err(|_| { - io::Error::new( - io::ErrorKind::Other, - "Serialize bundle requires that \ - there are no other existing copy of the directory." - .to_string(), - ) - })?; - inner_directory.serialize_bundle(wrt) - } - - fn try_unwrap(self) -> Result { - let inner_directory_lock = Arc::try_unwrap(self.fs).map_err(|_| ())?; - let inner_directory = inner_directory_lock.into_inner().map_err(|_| ())?; - Ok(inner_directory) + let inner_directory_rlock = self.fs.read().unwrap(); + inner_directory_rlock.serialize_bundle(wrt) } } impl Directory for RAMDirectory { - fn open_read(&self, path: &Path) -> result::Result { - self.fs.read().unwrap().open_read(path) - } - fn delete(&self, path: &Path) -> result::Result<(), DeleteError> { fail_point!("RAMDirectory::delete", |_| { use crate::directory::error::IOError; @@ -199,10 +183,6 @@ impl Directory for RAMDirectory { self.fs.write().unwrap().delete(path) } - fn exists(&self, path: &Path) -> bool { - self.fs.read().unwrap().exists(path) - } - fn open_write(&mut self, path: &Path) -> Result { let mut fs = self.fs.write().unwrap(); let path_buf = PathBuf::from(path); @@ -216,10 +196,6 @@ impl Directory for RAMDirectory { } } - fn atomic_read(&self, path: &Path) -> Result, OpenReadError> { - Ok(self.open_read(path)?.as_slice().to_owned()) - } - fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> { fail_point!("RAMDirectory::atomic_write", |msg| Err(io::Error::new( io::ErrorKind::Other, @@ -243,3 +219,17 @@ impl Directory for RAMDirectory { Ok(self.fs.write().unwrap().watch(watch_callback)) } } + +impl ReadOnlyDirectory for RAMDirectory { + fn open_read(&self, path: &Path) -> result::Result { + self.fs.read().unwrap().open_read(path) + } + + fn exists(&self, path: &Path) -> bool { + self.fs.read().unwrap().exists(path) + } + + fn atomic_read(&self, path: &Path) -> Result, OpenReadError> { + Ok(self.open_read(path)?.as_slice().to_owned()) + } +} diff --git a/src/error.rs b/src/error.rs index c4752141b..b93185bf5 100644 --- a/src/error.rs +++ b/src/error.rs @@ -25,10 +25,10 @@ impl DataCorruption { } } - pub fn comment_only(comment: String) -> DataCorruption { + pub fn comment_only(comment: TS) -> DataCorruption { DataCorruption { filepath: None, - comment, + comment: comment.to_string(), } } } diff --git a/src/fastfield/mod.rs b/src/fastfield/mod.rs index 8c9a1c68b..a19a9fe4d 100644 --- a/src/fastfield/mod.rs +++ b/src/fastfield/mod.rs @@ -179,7 +179,7 @@ mod tests { use super::*; use crate::common::CompositeFile; - use crate::directory::{Directory, RAMDirectory, WritePtr}; + use crate::directory::{Directory, RAMDirectory, ReadOnlyDirectory, WritePtr}; use crate::fastfield::FastFieldReader; use crate::merge_policy::NoMergePolicy; use crate::schema::Field; diff --git a/src/fastfield/reader.rs b/src/fastfield/reader.rs index 867163b99..d08455dc0 100644 --- a/src/fastfield/reader.rs +++ b/src/fastfield/reader.rs @@ -4,7 +4,7 @@ use crate::common::compute_num_bits; use crate::common::BinarySerializable; use crate::common::CompositeFile; use crate::directory::ReadOnlySource; -use crate::directory::{Directory, RAMDirectory, WritePtr}; +use crate::directory::{Directory, RAMDirectory, ReadOnlyDirectory, WritePtr}; use crate::fastfield::{FastFieldSerializer, FastFieldsWriter}; use crate::schema::Schema; use crate::schema::FAST; diff --git a/src/store/mod.rs b/src/store/mod.rs index bb15301b7..983b34f86 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -57,7 +57,7 @@ use self::compression_snap::{compress, decompress}; pub mod tests { use super::*; - use crate::directory::{Directory, RAMDirectory, WritePtr}; + use crate::directory::{Directory, RAMDirectory, ReadOnlyDirectory, WritePtr}; use crate::schema::Document; use crate::schema::FieldValue; use crate::schema::Schema; diff --git a/src/termdict/mod.rs b/src/termdict/mod.rs index 9ada12708..12d4a5672 100644 --- a/src/termdict/mod.rs +++ b/src/termdict/mod.rs @@ -36,7 +36,7 @@ pub use self::termdict::{TermDictionary, TermDictionaryBuilder}; mod tests { use super::{TermDictionary, TermDictionaryBuilder, TermStreamer}; use crate::core::Index; - use crate::directory::{Directory, RAMDirectory, ReadOnlySource}; + use crate::directory::{Directory, RAMDirectory, ReadOnlyDirectory, ReadOnlySource}; use crate::postings::TermInfo; use crate::schema::{Document, FieldType, Schema, TEXT}; use std::path::PathBuf; diff --git a/tests/failpoints/mod.rs b/tests/failpoints/mod.rs index 658fadbc1..b63c70c68 100644 --- a/tests/failpoints/mod.rs +++ b/tests/failpoints/mod.rs @@ -1,6 +1,8 @@ use fail; use std::path::Path; -use tantivy::directory::{Directory, ManagedDirectory, RAMDirectory, TerminatingWrite}; +use tantivy::directory::{ + Directory, ManagedDirectory, RAMDirectory, ReadOnlyDirectory, TerminatingWrite, +}; use tantivy::doc; use tantivy::schema::{Schema, TEXT}; use tantivy::{Index, Term};