Removing data copy in the RAMDirectory

The fst crate recently added support for sliced `Arc<Vec<u8>>`.
This called for a rewrite of the RAMDirectory for tantivy's RAMDirectory.
Previously every single read was copying data.

In addition:
- RAMDirectory's Write object panic if someone does not flush
right before the destruction of the object.
- In the same spirit, the postings serializer panics if someone
opens a term without closing the previous one.

Closes #16
This commit is contained in:
Paul Masurel
2016-08-17 23:41:58 +09:00
parent cecc9f928b
commit 0972a1c6a0
12 changed files with 213 additions and 72 deletions

View File

@@ -43,7 +43,7 @@ impl fmt::Debug for Index {
}
}
type DirectoryPtr = Box<Directory>;
pub type DirectoryPtr = Box<Directory>;
#[derive(Clone)]
pub struct Index {
@@ -127,7 +127,7 @@ impl Index {
pub fn publish_segment(&mut self, segment: &Segment) -> Result<()> {
{
let mut meta_write = self.metas.write().unwrap();
meta_write.segments.push(segment.segment_id.clone());
meta_write.segments.push(segment.segment_id);
}
self.save_metas()
}
@@ -148,7 +148,6 @@ impl Index {
}
meta_write.segments.push(merged_segment.id());
}
// TODO use logs
self.save_metas()
}
@@ -201,10 +200,11 @@ impl Index {
pub fn save_metas(&mut self,) -> Result<()> {
let mut w = Vec::new();
{
let metas_lock = self.metas.read().unwrap() ;
let metas_lock = self.metas.read().unwrap();
try!(write!(&mut w, "{}\n", json::as_pretty_json(&*metas_lock)));
};
try!(self.rw_directory())
try!(self
.rw_directory())
.atomic_write(&META_FILEPATH, &w[..])
}
}

View File

@@ -107,7 +107,7 @@ impl SegmentReader {
let positions_data = segment
.open_read(SegmentComponent::POSITIONS)
.unwrap_or(ReadOnlySource::Anonymous(Vec::new()));
.unwrap_or(ReadOnlySource::empty());
let schema = segment.schema();
Ok(SegmentReader {

View File

@@ -61,7 +61,7 @@ pub struct FstMap<V: BinarySerializable> {
fn open_fst_index(source: ReadOnlySource) -> io::Result<fst::Map> {
Ok(fst::Map::from(match source {
ReadOnlySource::Anonymous(data) => try!(Fst::from_bytes(data).map_err(convert_fst_error)),
ReadOnlySource::Anonymous(data) => try!(Fst::from_shared_bytes(data.data, data.start, data.len).map_err(convert_fst_error)),
ReadOnlySource::Mmap(mmap_readonly) => try!(Fst::from_mmap(mmap_readonly).map_err(convert_fst_error)),
}))
}
@@ -141,6 +141,7 @@ mod tests {
assert_eq!(keys.next().unwrap(), "abc".as_bytes());
assert_eq!(keys.next().unwrap(), "abcd".as_bytes());
assert_eq!(keys.next(), None);
}
}

View File

@@ -2,16 +2,18 @@ mod mmap_directory;
mod ram_directory;
mod directory;
mod read_only_source;
mod shared_vec_slice;
use std::io::{Seek, Write};
use std::io;
use std::path::PathBuf;
pub use self::shared_vec_slice::SharedVecSlice;
pub use self::read_only_source::ReadOnlySource;
pub use self::directory::Directory;
pub use self::ram_directory::RAMDirectory;
pub use self::mmap_directory::MmapDirectory;
pub use self::ram_directory::SharedVec;
////////////////////////////////////////
@@ -35,6 +37,7 @@ mod tests {
use super::*;
use std::path::Path;
use std::io::SeekFrom;
#[test]
fn test_ram_directory() {
@@ -48,12 +51,13 @@ mod tests {
test_directory(&mut mmap_directory);
}
fn test_directory(directory: &mut Directory) {
fn test_directory_simple(directory: &mut Directory) {
{
let mut write_file = directory.open_write(Path::new("toto")).unwrap();
write_file.write_all(&[4]).unwrap();
write_file.write_all(&[3]).unwrap();
write_file.write_all(&[7,3,5]).unwrap();
write_file.flush().unwrap();
}
let read_file = directory.open_read(Path::new("toto")).unwrap();
let data: &[u8] = &*read_file;
@@ -65,4 +69,30 @@ mod tests {
assert_eq!(data[4], 5);
}
fn test_directory_seek(directory: &mut Directory) {
{
let mut write_file = directory.open_write(Path::new("toto_seek")).unwrap();
write_file.write_all(&[4]).unwrap();
write_file.write_all(&[3]).unwrap();
write_file.write_all(&[7,3,5]).unwrap();
write_file.seek(SeekFrom::Start(0)).unwrap();
write_file.write_all(&[3,1]).unwrap();
write_file.flush().unwrap();
}
let read_file = directory.open_read(Path::new("toto_seek")).unwrap();
let data: &[u8] = &*read_file;
assert_eq!(data.len(), 5);
assert_eq!(data[0], 3);
assert_eq!(data[1], 1);
assert_eq!(data[2], 7);
assert_eq!(data[3], 3);
assert_eq!(data[4], 5);
}
fn test_directory(directory: &mut Directory) {
test_directory_simple(directory);
test_directory_seek(directory);
}
}

View File

@@ -8,40 +8,81 @@ use std::path::{Path, PathBuf};
use directory::OpenError;
use directory::WritePtr;
use std::result;
use super::SharedVecSlice;
use Result;
#[derive(Clone)]
pub struct SharedVec(Arc<RwLock<Cursor<Vec<u8>>>>);
pub struct RAMDirectory {
fs: HashMap<PathBuf, SharedVec>,
struct VecWriter {
path: PathBuf,
shared_directory: InnerDirectory,
data: Cursor<Vec<u8>>,
is_flushed: bool,
}
impl SharedVec {
pub fn new() -> SharedVec {
SharedVec(Arc::new( RwLock::new(Cursor::new(Vec::new())) ))
}
pub fn copy_vec(&self,) -> Vec<u8> {
self.0.read().unwrap().clone().into_inner()
impl Drop for VecWriter {
fn drop(&mut self) {
if !self.is_flushed {
panic!("You forgot to flush {:?} before its writter got Drop. Do not rely on drop.", self.path)
}
}
}
impl Write for SharedVec {
impl Seek for VecWriter {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
self.data.seek(pos)
}
}
impl Write for VecWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
try!(self.0.write().unwrap().write(buf));
self.is_flushed = false;
try!(self.data.write(buf));
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
self.is_flushed = true;
self.shared_directory.write(self.path.clone(), self.data.get_ref())
}
}
impl Seek for SharedVec {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
self.0.write().unwrap().seek(pos)
#[derive(Clone)]
struct InnerDirectory(Arc<RwLock<HashMap<PathBuf, Arc<Vec<u8>>>>>);
impl InnerDirectory {
fn new() -> InnerDirectory {
InnerDirectory(Arc::new(RwLock::new(HashMap::new())))
}
fn write(&self, path: PathBuf, data: &Vec<u8>) -> io::Result<()> {
let mut map = try!(
self.0
.write()
.map_err(|_| io::Error::new(io::ErrorKind::Other, format!("Failed to lock the directory, when trying to write {:?}", path)))
);
map.insert(path, Arc::new(data.clone()));
Ok(())
}
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenError> {
self.0
.read()
.map_err(|_| {
let io_err = io::Error::new(io::ErrorKind::Other, format!("Failed to read lock for the directory, when trying to read {:?}", path));
OpenError::IOError(io_err)
})
.and_then(|readable_map| {
readable_map
.get(path)
.ok_or_else(|| OpenError::FileDoesNotExist(PathBuf::from(path)))
.map(|data| {
ReadOnlySource::Anonymous(SharedVecSlice::new(data.clone()))
})
})
}
}
impl fmt::Debug for RAMDirectory {
@@ -53,33 +94,37 @@ impl fmt::Debug for RAMDirectory {
impl RAMDirectory {
pub fn create() -> RAMDirectory {
RAMDirectory {
fs: HashMap::new()
fs: InnerDirectory::new()
}
}
}
pub struct RAMDirectory {
fs: InnerDirectory,
}
impl Directory for RAMDirectory {
fn open_read(&self, path: &Path) -> result::Result<ReadOnlySource, OpenError> {
match self.fs.get(path) {
Some(ref data) => {
let data_copy = data.copy_vec();
Ok(ReadOnlySource::Anonymous(data_copy))
},
None => {
Err(OpenError::FileDoesNotExist(PathBuf::from(path)))
}
}
self.fs.open_read(path)
}
fn open_write(&mut self, path: &Path) -> Result<WritePtr> {
let full_path = PathBuf::from(&path);
let data = SharedVec::new();
self.fs.insert(full_path, data.clone());
Ok(Box::new(data))
let mut vec_writer = VecWriter {
path: PathBuf::from(path),
data: Cursor::new(Vec::new()),
shared_directory: self.fs.clone(),
is_flushed: false,
};
// force the creation of the file to mimick the MMap directory.
try!(vec_writer.flush());
Ok(Box::new(vec_writer))
}
fn atomic_write(&mut self, path: &Path, data: &[u8]) -> Result<()> {
let mut write = try!(self.open_write(path));
try!(write.write_all(data));
try!(write.flush());
Ok(())
}

View File

@@ -1,6 +1,7 @@
use fst::raw::MmapReadOnly;
use std::ops::Deref;
use std::io::Cursor;
use super::SharedVecSlice;
////////////////////////////////////////
// Read only source.
@@ -8,7 +9,7 @@ use std::io::Cursor;
pub enum ReadOnlySource {
Mmap(MmapReadOnly),
Anonymous(Vec<u8>),
Anonymous(SharedVecSlice),
}
impl Deref for ReadOnlySource {
@@ -25,12 +26,18 @@ impl ReadOnlySource {
self.as_slice().len()
}
pub fn empty() -> ReadOnlySource {
ReadOnlySource::Anonymous(SharedVecSlice::empty())
}
pub fn as_slice(&self,) -> &[u8] {
match *self {
ReadOnlySource::Mmap(ref mmap_read_only) => unsafe {
mmap_read_only.as_slice()
},
ReadOnlySource::Anonymous(ref shared_vec) => shared_vec.as_slice(),
ReadOnlySource::Anonymous(ref shared_vec) => {
shared_vec.as_slice()
},
}
}
@@ -45,8 +52,7 @@ impl ReadOnlySource {
ReadOnlySource::Mmap(sliced_mmap)
}
ReadOnlySource::Anonymous(ref shared_vec) => {
let sliced_data: Vec<u8> = Vec::from(&shared_vec[from_offset..to_offset]);
ReadOnlySource::Anonymous(sliced_data)
ReadOnlySource::Anonymous(shared_vec.slice(from_offset, to_offset))
},
}
}

View File

@@ -0,0 +1,36 @@
use std::sync::Arc;
#[derive(Clone)]
pub struct SharedVecSlice {
pub data: Arc<Vec<u8>>,
pub start: usize,
pub len: usize
}
impl SharedVecSlice {
pub fn empty() -> SharedVecSlice {
SharedVecSlice::new(Arc::new(Vec::new()))
}
pub fn new(data: Arc<Vec<u8>>) -> SharedVecSlice {
let data_len = data.len();
SharedVecSlice {
data: data,
start: 0,
len: data_len,
}
}
pub fn as_slice(&self,) -> &[u8] {
&self.data[self.start..self.start + self.len]
}
pub fn slice(&self, from_offset: usize, to_offset:usize) -> SharedVecSlice {
SharedVecSlice {
data: self.data.clone(),
start: self.start + from_offset,
len: to_offset - from_offset,
}
}
}

View File

@@ -88,6 +88,7 @@ impl FastFieldSerializer {
self.written_size += try!(self.fields.serialize(&mut self.write));
try!(self.write.seek(SeekFrom::Start(0)));
try!((header_offset as u32).serialize(&mut self.write));
try!(self.write.flush());
Ok(self.written_size)
}
}

View File

@@ -52,11 +52,12 @@ mod tests {
let mut posting_serializer = PostingsSerializer::open(&segment).unwrap();
let term = Term::from_field_text(text_field, "abc");
posting_serializer.new_term(&term, 3).unwrap();
for _ in 0..3 {
let a = vec!(1,2,3,2);
posting_serializer.write_doc(0, 2, &a).unwrap();
for doc_id in 0u32..3u32 {
let positions = vec!(1,2,3,2);
posting_serializer.write_doc(doc_id, 2, &positions).unwrap();
}
posting_serializer.close_term().unwrap();
posting_serializer.close().unwrap();
let read = segment.open_read(SegmentComponent::POSITIONS).unwrap();
assert_eq!(read.len(), 13);
}

View File

@@ -31,6 +31,7 @@ pub struct PostingsSerializer {
position_deltas: Vec<u32>,
schema: Schema,
text_indexing_options: TextIndexingOptions,
term_open: bool,
}
impl PostingsSerializer {
@@ -55,6 +56,7 @@ impl PostingsSerializer {
position_deltas: Vec::new(),
schema: schema,
text_indexing_options: TextIndexingOptions::Unindexed,
term_open: false,
})
}
@@ -76,7 +78,10 @@ impl PostingsSerializer {
}
pub fn new_term(&mut self, term: &Term, doc_freq: DocId) -> io::Result<()> {
try!(self.close_term());
if self.term_open {
panic!("Called new_term, while the previous term was not closed.");
}
self.term_open = true;
self.load_indexing_options(term.get_field());
self.doc_ids.clear();
self.last_doc_id_encoded = 0;
@@ -92,14 +97,22 @@ impl PostingsSerializer {
}
pub fn close_term(&mut self,) -> io::Result<()> {
if !self.doc_ids.is_empty() {
{
let block_encoded = self.block_encoder.compress_vint_sorted(&self.doc_ids, self.last_doc_id_encoded);
self.written_bytes_postings += block_encoded.len();
try!(self.postings_write.write_all(block_encoded));
}
if self.text_indexing_options.is_termfreq_enabled() {
if self.term_open {
if !self.doc_ids.is_empty() {
// we have doc ids waiting to be written
// this happens when the number of doc ids is
// not a perfect multiple of our block size.
//
// In that case, the remaining part is encoded
// using variable int encoding.
{
let block_encoded = self.block_encoder.compress_vint_sorted(&self.doc_ids, self.last_doc_id_encoded);
self.written_bytes_postings += block_encoded.len();
try!(self.postings_write.write_all(block_encoded));
self.doc_ids.clear();
}
// ... Idem for term frequencies
if self.text_indexing_options.is_termfreq_enabled() {
let block_encoded = self.block_encoder.compress_vint_unsorted(&self.term_freqs[..]);
for num in block_encoded {
self.written_bytes_postings += try!(num.serialize(&mut self.postings_write));
@@ -107,14 +120,16 @@ impl PostingsSerializer {
self.term_freqs.clear();
}
}
self.doc_ids.clear();
}
if self.text_indexing_options.is_position_enabled() {
self.written_bytes_positions += try!(VInt(self.position_deltas.len() as u64).serialize(&mut self.positions_write));
let positions_encoded: &[u8] = self.positions_encoder.compress_unsorted(&self.position_deltas[..]);
try!(self.positions_write.write_all(positions_encoded));
self.written_bytes_positions += positions_encoded.len();
self.position_deltas.clear();
// On the other hand, positions are entirely buffered until the
// end of the term, at which point they are compressed and written.
if self.text_indexing_options.is_position_enabled() {
self.written_bytes_positions += try!(VInt(self.position_deltas.len() as u64).serialize(&mut self.positions_write));
let positions_encoded: &[u8] = self.positions_encoder.compress_unsorted(&self.position_deltas[..]);
try!(self.positions_write.write_all(positions_encoded));
self.written_bytes_positions += positions_encoded.len();
self.position_deltas.clear();
}
self.term_open = false;
}
Ok(())
}

View File

@@ -119,6 +119,7 @@ impl<Rec: Recorder + 'static> PostingsWriter for SpecializedPostingsWriter<Rec>
let term_docfreq = term_postings_writer.doc_freq();
try!(serializer.new_term(term, term_docfreq));
try!(term_postings_writer.serialize(serializer));
try!(serializer.close_term());
}
Ok(())
}

View File

@@ -204,9 +204,10 @@ mod tests {
use postings::{DocSet, VecPostings};
use query::TfIdf;
use query::Scorer;
use directory::ReadOnlySource;
use directory::SharedVec;
use directory::Directory;
use directory::RAMDirectory;
use schema::Field;
use std::path::Path;
use query::Occur;
use fastfield::{U32FastFieldReader, U32FastFieldWriter, FastFieldSerializer};
@@ -216,12 +217,16 @@ mod tests {
for val in vals {
u32_field_writer.add_val(val);
}
let data = SharedVec::new();
let write: Box<SharedVec> = Box::new(data.clone());
let mut serializer = FastFieldSerializer::new(write).unwrap();
u32_field_writer.serialize(&mut serializer).unwrap();
serializer.close().unwrap();
U32FastFieldReader::open(ReadOnlySource::Anonymous(data.copy_vec())).unwrap()
let path = Path::new("some_path");
let mut directory = RAMDirectory::create();
{
let write = directory.open_write(&path).unwrap();
let mut serializer = FastFieldSerializer::new(write).unwrap();
u32_field_writer.serialize(&mut serializer).unwrap();
serializer.close().unwrap();
}
let read = directory.open_read(&path).unwrap();
U32FastFieldReader::open(read).unwrap()
}
fn abs_diff(left: f32, right: f32) -> f32 {