No filelen problem.

This commit is contained in:
Paul Masurel
2020-11-20 16:17:08 +09:00
parent e9e6d141e9
commit 5a25c8dfd3
6 changed files with 61 additions and 31 deletions

View File

@@ -1,8 +1,8 @@
use crate::directory::directory_lock::Lock;
use crate::directory::error::LockError;
use crate::directory::error::{DeleteError, OpenReadError, OpenWriteError};
use crate::directory::WatchCallback;
use crate::directory::WatchHandle;
use crate::directory::{FileHandle, WatchCallback};
use crate::directory::{FileSlice, WritePtr};
use std::fmt;
use std::io;
@@ -108,10 +108,13 @@ fn retry_policy(is_blocking: bool) -> RetryPolicy {
/// should be your default choice.
/// - The [`RAMDirectory`](struct.RAMDirectory.html), which
/// should be used mostly for tests.
///
pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
/// Opens a virtual file for read.
/// Opens a file and returns a boxed `FileHandle`.
///
/// Users of `Directory` should typically call `Directory::open_read(...)`,
/// while `Directory` implementor should implement `get_file_handle()`.
fn get_file_handle(&self, path: &Path) -> Result<Box<dyn FileHandle>, OpenReadError>;
/// Once a virtual file is open, its data may not
/// change.
///
@@ -119,7 +122,10 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
/// have no effect on the returned `FileSlice` object.
///
/// You should only use this to read files create with [Directory::open_write].
fn open_read(&self, path: &Path) -> Result<FileSlice, OpenReadError>;
fn open_read(&self, path: &Path) -> Result<FileSlice, OpenReadError> {
let file_handle = self.get_file_handle(path)?;
Ok(FileSlice::new(file_handle))
}
/// Removes a file
///

View File

@@ -40,7 +40,7 @@ where
B: StableDeref + Deref<Target = [u8]> + 'static + Send + Sync,
{
fn from(bytes: B) -> FileSlice {
FileSlice::new(OwnedBytes::new(bytes))
FileSlice::new(Box::new(OwnedBytes::new(bytes)))
}
}
@@ -50,22 +50,25 @@ where
///
#[derive(Clone)]
pub struct FileSlice {
data: Arc<Box<dyn FileHandle>>,
data: Arc<dyn FileHandle>,
start: usize,
stop: usize,
}
impl FileSlice {
/// Wraps a FileHandle.
pub fn new<D>(data: D) -> Self
where
D: FileHandle,
{
let len = data.len();
pub fn new(file_handle: Box<dyn FileHandle>) -> Self {
let num_bytes = file_handle.len();
FileSlice::new_with_num_bytes(file_handle, num_bytes)
}
/// Wraps a FileHandle.
#[doc(hidden)]
pub fn new_with_num_bytes(file_handle: Box<dyn FileHandle>, num_bytes: usize) -> Self {
FileSlice {
data: Arc::new(Box::new(data)),
data: Arc::from(file_handle),
start: 0,
stop: len,
stop: num_bytes,
}
}
@@ -146,6 +149,12 @@ impl FileSlice {
}
}
impl FileHandle for FileSlice {
fn read_bytes(&self, from: usize, to: usize) -> io::Result<OwnedBytes> {
self.read_bytes_slice(from, to)
}
}
impl HasLen for FileSlice {
fn len(&self) -> usize {
self.stop - self.start
@@ -160,7 +169,7 @@ mod tests {
#[test]
fn test_file_slice() -> io::Result<()> {
let file_slice = FileSlice::new(b"abcdef".as_ref());
let file_slice = FileSlice::new(Box::new(b"abcdef".as_ref()));
assert_eq!(file_slice.len(), 6);
assert_eq!(file_slice.slice_from(2).read_bytes()?.as_slice(), b"cdef");
assert_eq!(file_slice.slice_to(2).read_bytes()?.as_slice(), b"ab");
@@ -204,7 +213,7 @@ mod tests {
#[test]
fn test_slice_simple_read() -> io::Result<()> {
let slice = FileSlice::new(&b"abcdef"[..]);
let slice = FileSlice::new(Box::new(&b"abcdef"[..]));
assert_eq!(slice.len(), 6);
assert_eq!(slice.read_bytes()?.as_ref(), b"abcdef");
assert_eq!(slice.slice(1, 4).read_bytes()?.as_ref(), b"bcd");
@@ -213,7 +222,7 @@ mod tests {
#[test]
fn test_slice_read_slice() -> io::Result<()> {
let slice_deref = FileSlice::new(&b"abcdef"[..]);
let slice_deref = FileSlice::new(Box::new(&b"abcdef"[..]));
assert_eq!(slice_deref.read_bytes_slice(1, 4)?.as_ref(), b"bcd");
Ok(())
}
@@ -221,14 +230,14 @@ mod tests {
#[test]
#[should_panic(expected = "assertion failed: from <= to")]
fn test_slice_read_slice_invalid_range() {
let slice_deref = FileSlice::new(&b"abcdef"[..]);
let slice_deref = FileSlice::new(Box::new(&b"abcdef"[..]));
assert_eq!(slice_deref.read_bytes_slice(1, 0).unwrap().as_ref(), b"bcd");
}
#[test]
#[should_panic(expected = "`to` exceeds the fileslice length")]
fn test_slice_read_slice_invalid_range_exceeds() {
let slice_deref = FileSlice::new(&b"abcdef"[..]);
let slice_deref = FileSlice::new(Box::new(&b"abcdef"[..]));
assert_eq!(
slice_deref.read_bytes_slice(0, 10).unwrap().as_ref(),
b"bcd"

View File

@@ -1,10 +1,10 @@
use crate::core::{MANAGED_FILEPATH, META_FILEPATH};
use crate::directory::error::{DeleteError, LockError, OpenReadError, OpenWriteError};
use crate::directory::footer::{Footer, FooterProxy};
use crate::directory::DirectoryLock;
use crate::directory::GarbageCollectionResult;
use crate::directory::Lock;
use crate::directory::META_LOCK;
use crate::directory::{DirectoryLock, FileHandle};
use crate::directory::{FileSlice, WritePtr};
use crate::directory::{WatchCallback, WatchHandle};
use crate::error::DataCorruption;
@@ -274,6 +274,11 @@ impl ManagedDirectory {
}
impl Directory for ManagedDirectory {
fn get_file_handle(&self, path: &Path) -> Result<Box<dyn FileHandle>, OpenReadError> {
let file_slice = self.open_read(path)?;
Ok(Box::new(file_slice))
}
fn open_read(&self, path: &Path) -> result::Result<FileSlice, OpenReadError> {
let file_slice = self.directory.open_read(path)?;
let (footer, reader) = Footer::extract_footer(file_slice)

View File

@@ -2,14 +2,13 @@ use crate::core::META_FILEPATH;
use crate::directory::error::LockError;
use crate::directory::error::{DeleteError, OpenDirectoryError, OpenReadError, OpenWriteError};
use crate::directory::file_watcher::FileWatcher;
use crate::directory::AntiCallToken;
use crate::directory::BoxedData;
use crate::directory::Directory;
use crate::directory::DirectoryLock;
use crate::directory::FileSlice;
use crate::directory::Lock;
use crate::directory::WatchCallback;
use crate::directory::WatchHandle;
use crate::directory::{AntiCallToken, FileHandle, OwnedBytes};
use crate::directory::{TerminatingWrite, WritePtr};
use fs2::FileExt;
use memmap::Mmap;
@@ -161,7 +160,7 @@ impl MmapDirectoryInner {
mmap_cache: Default::default(),
_temp_directory: temp_directory,
watcher: FileWatcher::new(&root_path.join(*META_FILEPATH)),
root_path: root_path,
root_path,
}
}
@@ -346,7 +345,7 @@ pub(crate) fn atomic_write(path: &Path, content: &[u8]) -> io::Result<()> {
}
impl Directory for MmapDirectory {
fn open_read(&self, path: &Path) -> result::Result<FileSlice, OpenReadError> {
fn get_file_handle(&self, path: &Path) -> result::Result<Box<dyn FileHandle>, OpenReadError> {
debug!("Open Read {:?}", path);
let full_path = self.resolve_path(path);
@@ -359,11 +358,16 @@ impl Directory for MmapDirectory {
let io_err = make_io_err(msg);
OpenReadError::wrap_io_error(io_err, path.to_path_buf())
})?;
if let Some(mmap_arc) = mmap_cache.get_mmap(&full_path)? {
Ok(FileSlice::from(MmapArc(mmap_arc)))
} else {
Ok(FileSlice::empty())
}
let owned_bytes = mmap_cache
.get_mmap(&full_path)?
.map(|mmap_arc| {
let mmap_arc_obj = MmapArc(mmap_arc);
OwnedBytes::new(mmap_arc_obj)
})
.unwrap_or_else(OwnedBytes::empty);
Ok(Box::new(owned_bytes))
}
/// Any entry associated to the path in the mmap will be

View File

@@ -12,6 +12,8 @@ use std::path::{Path, PathBuf};
use std::result;
use std::sync::{Arc, RwLock};
use super::FileHandle;
/// Writer associated with the `RAMDirectory`
///
/// The Writer just writes a buffer.
@@ -163,6 +165,11 @@ impl RAMDirectory {
}
impl Directory for RAMDirectory {
fn get_file_handle(&self, path: &Path) -> Result<Box<dyn FileHandle>, OpenReadError> {
let file_slice = self.open_read(path)?;
Ok(Box::new(file_slice))
}
fn open_read(&self, path: &Path) -> result::Result<FileSlice, OpenReadError> {
self.fs.read().unwrap().open_read(path)
}

View File

@@ -19,7 +19,7 @@ impl<'a> Iterator for LayerCursor<'a> {
return None;
}
let (block_mut, remaining_mut) = (&mut self.block, &mut self.remaining);
if let Err(_) = block_mut.deserialize(remaining_mut) {
if block_mut.deserialize(remaining_mut).is_err() {
return None;
}
self.cursor = 0;
@@ -50,8 +50,7 @@ impl Layer {
fn seek_start_at_offset(&self, target: DocId, offset: u64) -> Option<Checkpoint> {
self.cursor_at_offset(offset)
.filter(|checkpoint| checkpoint.end_doc > target)
.next()
.find(|checkpoint| checkpoint.end_doc > target)
}
}