Added ReadOnlyDirectory and implemented Bundle Directory

This commit is contained in:
Paul Masurel
2019-12-26 10:33:38 +09:00
parent 7ed6bc8718
commit cd7484c035
14 changed files with 222 additions and 120 deletions

View File

@@ -186,7 +186,7 @@ mod test {
use super::{CompositeFile, CompositeWrite};
use crate::common::BinarySerializable;
use crate::common::VInt;
use crate::directory::{Directory, RAMDirectory};
use crate::directory::{Directory, RAMDirectory, ReadOnlyDirectory};
use crate::schema::Field;
use std::io::Write;
use std::path::Path;

View File

@@ -4,7 +4,7 @@ use crate::core::SegmentId;
use crate::core::SegmentMeta;
use crate::directory::error::{OpenReadError, OpenWriteError};
use crate::directory::Directory;
use crate::directory::{ReadOnlySource, WritePtr};
use crate::directory::{ReadOnlyDirectory, ReadOnlySource, WritePtr};
use crate::indexer::segment_serializer::SegmentSerializer;
use crate::schema::Schema;
use crate::Opstamp;

View File

@@ -0,0 +1,97 @@
use crate::directory::directory::ReadOnlyDirectory;
use crate::directory::error::OpenReadError;
use crate::directory::ReadOnlySource;
use crate::error::DataCorruption;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
#[derive(Clone)]
struct BundleDirectory {
source_map: Arc<HashMap<PathBuf, ReadOnlySource>>,
}
impl BundleDirectory {
pub fn from_source(source: ReadOnlySource) -> Result<BundleDirectory, DataCorruption> {
let mut index_offset_buf = [0u8; 8];
let (body_idx, footer_offset) = source.split_from_end(8);
index_offset_buf.copy_from_slice(footer_offset.as_slice());
let offset = u64::from_le_bytes(index_offset_buf);
let (body_source, idx_source) = body_idx.split(offset as usize);
let idx: HashMap<PathBuf, (u64, u64)> = serde_json::from_slice(idx_source.as_slice())
.map_err(|err| {
let msg = format!("Failed to read index from bundle. {:?}", err);
DataCorruption::comment_only(msg)
})?;
let source_map: HashMap<PathBuf, ReadOnlySource> = idx
.into_iter()
.map(|(path, (start, stop))| {
let source = body_source.slice(start as usize, stop as usize);
(path, source)
})
.collect();
Ok(BundleDirectory {
source_map: Arc::new(source_map),
})
}
}
impl ReadOnlyDirectory for BundleDirectory {
fn open_read(&self, path: &Path) -> Result<ReadOnlySource, OpenReadError> {
self.source_map
.get(path)
.cloned()
.ok_or_else(|| OpenReadError::FileDoesNotExist(path.to_path_buf()))
}
fn exists(&self, path: &Path) -> bool {
self.source_map.contains_key(path)
}
fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError> {
let source = self
.source_map
.get(path)
.ok_or_else(|| OpenReadError::FileDoesNotExist(path.to_path_buf()))?;
Ok(source.as_slice().to_vec())
}
}
#[cfg(test)]
mod tests {
use super::BundleDirectory;
use crate::directory::{RAMDirectory, ReadOnlyDirectory, TerminatingWrite};
use crate::Directory;
use std::io::Write;
use std::path::Path;
#[test]
fn test_bundle_directory() {
let mut ram_directory = RAMDirectory::default();
let test_path_atomic = Path::new("testpath_atomic");
let test_path_wrt = Path::new("testpath_wrt");
assert!(ram_directory
.atomic_write(test_path_atomic, b"titi")
.is_ok());
{
let mut test_wrt = ram_directory.open_write(test_path_wrt).unwrap();
assert!(test_wrt.write_all(b"toto").is_ok());
assert!(test_wrt.terminate().is_ok());
}
let mut dest_directory = RAMDirectory::default();
let bundle_path = Path::new("bundle");
let mut wrt = dest_directory.open_write(bundle_path).unwrap();
assert!(ram_directory.serialize_bundle(&mut wrt).is_ok());
assert!(wrt.terminate().is_ok());
let source = dest_directory.open_read(bundle_path).unwrap();
let bundle_directory = BundleDirectory::from_source(source).unwrap();
assert_eq!(
&bundle_directory.atomic_read(test_path_atomic).unwrap()[..],
b"titi"
);
assert_eq!(
&bundle_directory.open_read(test_path_wrt).unwrap()[..],
b"toto"
);
}
}

View File

@@ -100,17 +100,7 @@ fn retry_policy(is_blocking: bool) -> RetryPolicy {
}
}
/// Write-once read many (WORM) abstraction for where
/// tantivy's data should be stored.
///
/// There are currently two implementations of `Directory`
///
/// - The [`MMapDirectory`](struct.MmapDirectory.html), this
/// should be your default choice.
/// - The [`RAMDirectory`](struct.RAMDirectory.html), which
/// should be used mostly for tests.
///
pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
pub trait ReadOnlyDirectory {
/// Opens a virtual file for read.
///
/// Once a virtual file is open, its data may not
@@ -122,6 +112,31 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
/// You should only use this to read files create with [Directory::open_write].
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError>;
/// Returns true iff the file exists
fn exists(&self, path: &Path) -> bool;
/// Reads the full content file that has been written using
/// atomic_write.
///
/// This should only be used for small files.
///
/// You should only use this to read files create with [Directory::atomic_write].
fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError>;
}
/// Write-once read many (WORM) abstraction for where
/// tantivy's data should be stored.
///
/// There are currently two implementations of `Directory`
///
/// - The [`MMapDirectory`](struct.MmapDirectory.html), this
/// should be your default choice.
/// - The [`RAMDirectory`](struct.RAMDirectory.html), which
/// should be used mostly for tests.
///
pub trait Directory:
DirectoryClone + ReadOnlyDirectory + fmt::Debug + Send + Sync + 'static
{
/// Removes a file
///
/// Removing a file will not affect an eventual
@@ -131,9 +146,6 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
/// `DeleteError::DoesNotExist`.
fn delete(&self, path: &Path) -> result::Result<(), DeleteError>;
/// Returns true iff the file exists
fn exists(&self, path: &Path) -> bool;
/// Opens a writer for the *virtual file* associated with
/// a Path.
///
@@ -155,14 +167,6 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
/// The file may not previously exist.
fn open_write(&mut self, path: &Path) -> Result<WritePtr, OpenWriteError>;
/// Reads the full content file that has been written using
/// atomic_write.
///
/// This should only be used for small files.
///
/// You should only use this to read files create with [Directory::atomic_write].
fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError>;
/// Atomically replace the content of a file with data.
///
/// This calls ensure that reads can never *observe*

View File

@@ -10,6 +10,7 @@ use crate::directory::{WatchCallback, WatchHandle};
use crate::error::DataCorruption;
use crate::Directory;
use crate::directory::directory::ReadOnlyDirectory;
use crc32fast::Hasher;
use serde_json;
use std::collections::HashSet;
@@ -264,14 +265,6 @@ impl ManagedDirectory {
}
impl Directory for ManagedDirectory {
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> {
let read_only_source = self.directory.open_read(path)?;
let (footer, reader) = Footer::extract_footer(read_only_source)
.map_err(|err| IOError::with_path(path.to_path_buf(), err))?;
footer.is_compatible()?;
Ok(reader)
}
fn open_write(&mut self, path: &Path) -> result::Result<WritePtr, OpenWriteError> {
self.register_file_as_managed(path)
.map_err(|e| IOError::with_path(path.to_owned(), e))?;
@@ -289,18 +282,10 @@ impl Directory for ManagedDirectory {
self.directory.atomic_write(path, data)
}
fn atomic_read(&self, path: &Path) -> result::Result<Vec<u8>, OpenReadError> {
self.directory.atomic_read(path)
}
fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
self.directory.delete(path)
}
fn exists(&self, path: &Path) -> bool {
self.directory.exists(path)
}
fn acquire_lock(&self, lock: &Lock) -> result::Result<DirectoryLock, LockError> {
self.directory.acquire_lock(lock)
}
@@ -310,6 +295,24 @@ impl Directory for ManagedDirectory {
}
}
impl ReadOnlyDirectory for ManagedDirectory {
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> {
let read_only_source = self.directory.open_read(path)?;
let (footer, reader) = Footer::extract_footer(read_only_source)
.map_err(|err| IOError::with_path(path.to_path_buf(), err))?;
footer.is_compatible()?;
Ok(reader)
}
fn exists(&self, path: &Path) -> bool {
self.directory.exists(path)
}
fn atomic_read(&self, path: &Path) -> result::Result<Vec<u8>, OpenReadError> {
self.directory.atomic_read(path)
}
}
impl Clone for ManagedDirectory {
fn clone(&self) -> ManagedDirectory {
ManagedDirectory {
@@ -323,7 +326,9 @@ impl Clone for ManagedDirectory {
#[cfg(test)]
mod tests_mmap_specific {
use crate::directory::{Directory, ManagedDirectory, MmapDirectory, TerminatingWrite};
use crate::directory::{
Directory, ManagedDirectory, MmapDirectory, ReadOnlyDirectory, TerminatingWrite,
};
use std::collections::HashSet;
use std::fs::OpenOptions;
use std::io::Write;

View File

@@ -6,6 +6,7 @@ use self::notify::RawEvent;
use self::notify::RecursiveMode;
use self::notify::Watcher;
use crate::core::META_FILEPATH;
use crate::directory::directory::ReadOnlyDirectory;
use crate::directory::error::LockError;
use crate::directory::error::{
DeleteError, IOError, OpenDirectoryError, OpenReadError, OpenWriteError,
@@ -407,24 +408,6 @@ impl TerminatingWrite for SafeFileWriter {
}
impl Directory for MmapDirectory {
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> {
debug!("Open Read {:?}", path);
let full_path = self.resolve_path(path);
let mut mmap_cache = self.inner.mmap_cache.write().map_err(|_| {
let msg = format!(
"Failed to acquired write lock \
on mmap cache while reading {:?}",
path
);
IOError::with_path(path.to_owned(), make_io_err(msg))
})?;
Ok(mmap_cache
.get_mmap(&full_path)?
.map(ReadOnlySource::from)
.unwrap_or_else(ReadOnlySource::empty))
}
/// Any entry associated to the path in the mmap will be
/// removed before the file is deleted.
fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
@@ -443,11 +426,6 @@ impl Directory for MmapDirectory {
}
}
fn exists(&self, path: &Path) -> bool {
let full_path = self.resolve_path(path);
full_path.exists()
}
fn open_write(&mut self, path: &Path) -> Result<WritePtr, OpenWriteError> {
debug!("Open Write {:?}", path);
let full_path = self.resolve_path(path);
@@ -478,25 +456,6 @@ impl Directory for MmapDirectory {
Ok(BufWriter::new(Box::new(writer)))
}
fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError> {
let full_path = self.resolve_path(path);
let mut buffer = Vec::new();
match File::open(&full_path) {
Ok(mut file) => {
file.read_to_end(&mut buffer)
.map_err(|e| IOError::with_path(path.to_owned(), e))?;
Ok(buffer)
}
Err(e) => {
if e.kind() == io::ErrorKind::NotFound {
Err(OpenReadError::FileDoesNotExist(path.to_owned()))
} else {
Err(IOError::with_path(path.to_owned(), e).into())
}
}
}
}
fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> {
debug!("Atomic Write {:?}", path);
let full_path = self.resolve_path(path);
@@ -530,6 +489,50 @@ impl Directory for MmapDirectory {
}
}
impl ReadOnlyDirectory for MmapDirectory {
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> {
debug!("Open Read {:?}", path);
let full_path = self.resolve_path(path);
let mut mmap_cache = self.inner.mmap_cache.write().map_err(|_| {
let msg = format!(
"Failed to acquired write lock \
on mmap cache while reading {:?}",
path
);
IOError::with_path(path.to_owned(), make_io_err(msg))
})?;
Ok(mmap_cache
.get_mmap(&full_path)?
.map(ReadOnlySource::from)
.unwrap_or_else(ReadOnlySource::empty))
}
fn exists(&self, path: &Path) -> bool {
let full_path = self.resolve_path(path);
full_path.exists()
}
fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError> {
let full_path = self.resolve_path(path);
let mut buffer = Vec::new();
match File::open(&full_path) {
Ok(mut file) => {
file.read_to_end(&mut buffer)
.map_err(|e| IOError::with_path(path.to_owned(), e))?;
Ok(buffer)
}
Err(e) => {
if e.kind() == io::ErrorKind::NotFound {
Err(OpenReadError::FileDoesNotExist(path.to_owned()))
} else {
Err(IOError::with_path(path.to_owned(), e).into())
}
}
}
}
}
#[cfg(test)]
mod tests {

View File

@@ -7,6 +7,7 @@ WORM directory abstraction.
#[cfg(feature = "mmap")]
mod mmap_directory;
mod bundle_directory;
mod directory;
mod directory_lock;
mod footer;
@@ -19,7 +20,7 @@ mod watch_event_router;
pub mod error;
pub use self::directory::DirectoryLock;
pub use self::directory::{Directory, DirectoryClone};
pub use self::directory::{Directory, DirectoryClone, ReadOnlyDirectory};
pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK};
pub use self::ram_directory::RAMDirectory;
pub use self::read_only_source::ReadOnlySource;

View File

@@ -1,5 +1,6 @@
use crate::common::CountingWriter;
use crate::core::META_FILEPATH;
use crate::directory::directory::ReadOnlyDirectory;
use crate::directory::error::{DeleteError, OpenReadError, OpenWriteError};
use crate::directory::AntiCallToken;
use crate::directory::WatchCallbackList;
@@ -117,7 +118,7 @@ impl InnerDirectory {
self.fs.values().map(|f| f.len()).sum()
}
fn serialize_bundle(self, wrt: &mut WritePtr) -> io::Result<()> {
fn serialize_bundle(&self, wrt: &mut WritePtr) -> io::Result<()> {
let mut counting_writer = CountingWriter::wrap(wrt);
let mut file_index: HashMap<PathBuf, (u64, u64)> = HashMap::default();
for (path, source) in &self.fs {
@@ -126,8 +127,8 @@ impl InnerDirectory {
let stop = counting_writer.written_bytes();
file_index.insert(path.to_path_buf(), (start, stop));
}
serde_json::to_writer(&mut counting_writer, &file_index)?;
let index_offset = counting_writer.written_bytes();
serde_json::to_writer(&mut counting_writer, &file_index)?;
let index_offset_buffer = index_offset.to_le_bytes();
counting_writer.write_all(&index_offset_buffer[..])?;
Ok(())
@@ -167,29 +168,12 @@ impl RAMDirectory {
/// This method will fail, write nothing, and return an error if a
/// clone of this repository exists.
pub fn serialize_bundle(self, wrt: &mut WritePtr) -> io::Result<()> {
let inner_directory = self.try_unwrap().map_err(|_| {
io::Error::new(
io::ErrorKind::Other,
"Serialize bundle requires that \
there are no other existing copy of the directory."
.to_string(),
)
})?;
inner_directory.serialize_bundle(wrt)
}
fn try_unwrap(self) -> Result<InnerDirectory, ()> {
let inner_directory_lock = Arc::try_unwrap(self.fs).map_err(|_| ())?;
let inner_directory = inner_directory_lock.into_inner().map_err(|_| ())?;
Ok(inner_directory)
let inner_directory_rlock = self.fs.read().unwrap();
inner_directory_rlock.serialize_bundle(wrt)
}
}
impl Directory for RAMDirectory {
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> {
self.fs.read().unwrap().open_read(path)
}
fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
fail_point!("RAMDirectory::delete", |_| {
use crate::directory::error::IOError;
@@ -199,10 +183,6 @@ impl Directory for RAMDirectory {
self.fs.write().unwrap().delete(path)
}
fn exists(&self, path: &Path) -> bool {
self.fs.read().unwrap().exists(path)
}
fn open_write(&mut self, path: &Path) -> Result<WritePtr, OpenWriteError> {
let mut fs = self.fs.write().unwrap();
let path_buf = PathBuf::from(path);
@@ -216,10 +196,6 @@ impl Directory for RAMDirectory {
}
}
fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError> {
Ok(self.open_read(path)?.as_slice().to_owned())
}
fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> {
fail_point!("RAMDirectory::atomic_write", |msg| Err(io::Error::new(
io::ErrorKind::Other,
@@ -243,3 +219,17 @@ impl Directory for RAMDirectory {
Ok(self.fs.write().unwrap().watch(watch_callback))
}
}
impl ReadOnlyDirectory for RAMDirectory {
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenReadError> {
self.fs.read().unwrap().open_read(path)
}
fn exists(&self, path: &Path) -> bool {
self.fs.read().unwrap().exists(path)
}
fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError> {
Ok(self.open_read(path)?.as_slice().to_owned())
}
}

View File

@@ -25,10 +25,10 @@ impl DataCorruption {
}
}
pub fn comment_only(comment: String) -> DataCorruption {
pub fn comment_only<TS: ToString>(comment: TS) -> DataCorruption {
DataCorruption {
filepath: None,
comment,
comment: comment.to_string(),
}
}
}

View File

@@ -179,7 +179,7 @@ mod tests {
use super::*;
use crate::common::CompositeFile;
use crate::directory::{Directory, RAMDirectory, WritePtr};
use crate::directory::{Directory, RAMDirectory, ReadOnlyDirectory, WritePtr};
use crate::fastfield::FastFieldReader;
use crate::merge_policy::NoMergePolicy;
use crate::schema::Field;

View File

@@ -4,7 +4,7 @@ use crate::common::compute_num_bits;
use crate::common::BinarySerializable;
use crate::common::CompositeFile;
use crate::directory::ReadOnlySource;
use crate::directory::{Directory, RAMDirectory, WritePtr};
use crate::directory::{Directory, RAMDirectory, ReadOnlyDirectory, WritePtr};
use crate::fastfield::{FastFieldSerializer, FastFieldsWriter};
use crate::schema::Schema;
use crate::schema::FAST;

View File

@@ -57,7 +57,7 @@ use self::compression_snap::{compress, decompress};
pub mod tests {
use super::*;
use crate::directory::{Directory, RAMDirectory, WritePtr};
use crate::directory::{Directory, RAMDirectory, ReadOnlyDirectory, WritePtr};
use crate::schema::Document;
use crate::schema::FieldValue;
use crate::schema::Schema;

View File

@@ -36,7 +36,7 @@ pub use self::termdict::{TermDictionary, TermDictionaryBuilder};
mod tests {
use super::{TermDictionary, TermDictionaryBuilder, TermStreamer};
use crate::core::Index;
use crate::directory::{Directory, RAMDirectory, ReadOnlySource};
use crate::directory::{Directory, RAMDirectory, ReadOnlyDirectory, ReadOnlySource};
use crate::postings::TermInfo;
use crate::schema::{Document, FieldType, Schema, TEXT};
use std::path::PathBuf;

View File

@@ -1,6 +1,8 @@
use fail;
use std::path::Path;
use tantivy::directory::{Directory, ManagedDirectory, RAMDirectory, TerminatingWrite};
use tantivy::directory::{
Directory, ManagedDirectory, RAMDirectory, ReadOnlyDirectory, TerminatingWrite,
};
use tantivy::doc;
use tantivy::schema::{Schema, TEXT};
use tantivy::{Index, Term};