From 4c846b1202fd9b6fb48e1eb65fd6feede299d273 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Fri, 5 Apr 2019 10:07:29 +0900 Subject: [PATCH] Added NRT directory kinda working --- src/directory/directory.rs | 2 + src/directory/mmap_directory.rs | 2 +- src/directory/mod.rs | 1 + src/directory/nrt_directory.rs | 195 ++++++++++++++++++++++++++++++++ src/directory/ram_directory.rs | 16 +-- 5 files changed, 207 insertions(+), 9 deletions(-) create mode 100644 src/directory/nrt_directory.rs diff --git a/src/directory/directory.rs b/src/directory/directory.rs index 9c2af62e2..277dfd379 100644 --- a/src/directory/directory.rs +++ b/src/directory/directory.rs @@ -210,6 +210,8 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static { /// /// In order to make Near Real Time efficient, tantivy introduced the notion of soft_commit vs /// commit. Commit will call `.flush()`, while softcommit won't. + /// + /// `meta.json` should be the last file to be flushed. fn flush(&self) -> io::Result<()> { Ok(()) } diff --git a/src/directory/mmap_directory.rs b/src/directory/mmap_directory.rs index f8f0810d0..ae70e5746 100644 --- a/src/directory/mmap_directory.rs +++ b/src/directory/mmap_directory.rs @@ -368,7 +368,7 @@ impl Drop for ReleaseLockFile { /// This Write wraps a File, but has the specificity of /// call `sync_all` on flush. -struct SafeFileWriter(File); +pub struct SafeFileWriter(File); impl SafeFileWriter { fn new(file: File) -> SafeFileWriter { diff --git a/src/directory/mod.rs b/src/directory/mod.rs index 8d880b0f9..bbeaa5a10 100644 --- a/src/directory/mod.rs +++ b/src/directory/mod.rs @@ -13,6 +13,7 @@ mod managed_directory; mod ram_directory; mod read_only_source; mod watch_event_router; +mod nrt_directory; /// Errors specific to the directory module. pub mod error; diff --git a/src/directory/nrt_directory.rs b/src/directory/nrt_directory.rs new file mode 100644 index 000000000..e087bd1be --- /dev/null +++ b/src/directory/nrt_directory.rs @@ -0,0 +1,195 @@ +use directory::Directory; +use std::path::{PathBuf, Path}; +use directory::ReadOnlySource; +use directory::error::OpenReadError; +use directory::error::DeleteError; +use std::io::{BufWriter, Cursor}; +use directory::SeekableWrite; +use directory::error::OpenWriteError; +use directory::WatchHandle; +use directory::ram_directory::InnerRamDirectory; +use std::sync::RwLock; +use std::sync::Arc; +use directory::WatchCallback; +use std::fmt; +use std::io; +use std::io::{Seek, Write}; +use directory::DirectoryClone; + + +const BUFFER_LEN: usize = 1_000_000; + + +pub enum NRTWriter { + InRam { + buffer: Cursor>, + path: PathBuf, + nrt_directory: NRTDirectory + }, + UnderlyingFile(BufWriter>) +} + +impl NRTWriter { + pub fn new(path: PathBuf, nrt_directory: NRTDirectory) -> NRTWriter { + NRTWriter::InRam { + buffer: Cursor::new(Vec::with_capacity(BUFFER_LEN)), + path, + nrt_directory, + } + } +} + +impl io::Seek for NRTWriter { + fn seek(&mut self, pos: io::SeekFrom) -> io::Result { + match self { + NRTWriter::InRam { buffer, path, nrt_directory } => { + buffer.seek(pos) + } + NRTWriter::UnderlyingFile(file) => { + file.seek(pos) + } + } + } +} + +impl io::Write for NRTWriter { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.write_all(buf)?; + Ok(buf.len()) + } + + fn flush(&mut self) -> io::Result<()> { + match self { + NRTWriter::InRam { buffer, path, nrt_directory } => { + let mut cache_wlock = nrt_directory.cache.write().unwrap(); + cache_wlock.write(path.clone(), buffer.get_ref()); + Ok(()) + } + NRTWriter::UnderlyingFile(file) => { + file.flush() + } + } + } + + fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { + // Working around the borrow checker. + let mut underlying_write_opt: Option>> = None; + if let NRTWriter::InRam { buffer, path, nrt_directory } = self { + if buffer.get_ref().len() + buf.len() > BUFFER_LEN { + // We can't keep this in RAM. Let's move it to the underlying directory. + underlying_write_opt = Some(nrt_directory.open_write(path) + .map_err(|open_err| { + io::Error::new(io::ErrorKind::Other, open_err) + })?); + + } + } + if let Some(underlying_write) = underlying_write_opt { + *self = NRTWriter::UnderlyingFile(underlying_write); + } + match self { + NRTWriter::InRam { buffer, path, nrt_directory } => { + assert!(buffer.get_ref().len() + buf.len() <= BUFFER_LEN); + buffer.write_all(buf) + } + NRTWriter::UnderlyingFile(file) => { + file.write_all(buf) + } + } + } +} + +pub struct NRTDirectory { + underlying: Box, + cache: Arc>, +} + + +impl Clone for NRTDirectory { + fn clone(&self) -> Self { + NRTDirectory { + underlying: self.underlying.box_clone(), + cache: self.cache.clone() + } + } +} + +impl NRTDirectory { + fn wrap(underlying: Box) -> NRTDirectory { + NRTDirectory { + underlying, + cache: Default::default() + } + } +} + +impl fmt::Debug for NRTDirectory { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "NRTDirectory({:?})", self.underlying) + } +} + +impl Directory for NRTDirectory { + fn open_read(&self, path: &Path) -> Result { + unimplemented!() + } + + fn delete(&self, path: &Path) -> Result<(), DeleteError> { + // We explicitly release the lock, to prevent a panic on the underlying directory + // to poison the lock. + // + // File can only go from cache to underlying so the result does not lead to + // any inconsistency. + { + let mut cache_wlock = self.cache.write().unwrap(); + if cache_wlock.exists(path) { + return cache_wlock.delete(path); + } + } + self.underlying.delete(path) + } + + fn exists(&self, path: &Path) -> bool { + // We explicitly release the lock, to prevent a panic on the underlying directory + // to poison the lock. + // + // File can only go from cache to underlying so the result does not lead to + // any inconsistency. + { + let rlock_cache = self.cache.read().unwrap(); + if rlock_cache.exists(path) { + return true; + } + } + self.underlying.exists(path) + } + + fn open_write(&mut self, path: &Path) -> Result>, OpenWriteError> { + let mut cache_wlock = self.cache.write().unwrap(); + // TODO might poison our lock. I don't know have a sound solution yet. + let path_buf = path.to_owned(); + if self.underlying.exists(path) { + return Err(OpenWriteError::FileAlreadyExists(path_buf)); + } + let exists = cache_wlock.write(path_buf.clone(), &[]); + // force the creation of the file to mimic the MMap directory. + if exists { + Err(OpenWriteError::FileAlreadyExists(path_buf)) + } else { + let vec_writer = NRTWriter::new(path_buf.clone(), self.clone()); + Ok(BufWriter::new(Box::new(vec_writer))) + } + } + + fn atomic_read(&self, path: &Path) -> Result, OpenReadError> { + self.underlying.atomic_read(path) + } + + fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> { + self.underlying.atomic_write(path, data) + } + + fn watch(&self, watch_callback: WatchCallback) -> WatchHandle { + self.underlying.watch(watch_callback) + } +} \ No newline at end of file diff --git a/src/directory/ram_directory.rs b/src/directory/ram_directory.rs index 985117740..d0f7a7ec1 100644 --- a/src/directory/ram_directory.rs +++ b/src/directory/ram_directory.rs @@ -71,36 +71,36 @@ impl Write for VecWriter { } #[derive(Default)] -struct InnerDirectory { +pub(crate) struct InnerRamDirectory { fs: HashMap, watch_router: WatchCallbackList, } -impl InnerDirectory { - fn write(&mut self, path: PathBuf, data: &[u8]) -> bool { +impl InnerRamDirectory { + pub fn write(&mut self, path: PathBuf, data: &[u8]) -> bool { let data = ReadOnlySource::new(Vec::from(data)); self.fs.insert(path, data).is_some() } - fn open_read(&self, path: &Path) -> Result { + pub fn open_read(&self, path: &Path) -> Result { self.fs .get(path) .ok_or_else(|| OpenReadError::FileDoesNotExist(PathBuf::from(path))) .map(|el| el.clone()) } - fn delete(&mut self, path: &Path) -> result::Result<(), DeleteError> { + pub fn delete(&mut self, path: &Path) -> result::Result<(), DeleteError> { match self.fs.remove(path) { Some(_) => Ok(()), None => Err(DeleteError::FileDoesNotExist(PathBuf::from(path))), } } - fn exists(&self, path: &Path) -> bool { + pub fn exists(&self, path: &Path) -> bool { self.fs.contains_key(path) } - fn watch(&mut self, watch_handle: WatchCallback) -> WatchHandle { + pub fn watch(&mut self, watch_handle: WatchCallback) -> WatchHandle { self.watch_router.subscribe(watch_handle) } } @@ -118,7 +118,7 @@ impl fmt::Debug for RAMDirectory { /// #[derive(Clone, Default)] pub struct RAMDirectory { - fs: Arc>, + fs: Arc>, } impl RAMDirectory {