Moved postings and directory to a different module

This commit is contained in:
Paul Masurel
2016-04-30 18:45:44 +09:00
parent 0f863dcc1e
commit 26826ac4ea
27 changed files with 751 additions and 689 deletions

View File

@@ -1,13 +1,11 @@
extern crate argparse;
extern crate tantivy;
use argparse::{ArgumentParser, StoreTrue, Store};
use argparse::{ArgumentParser, Store};
use tantivy::Index;
use std::path::Path;
fn main() {
let mut verbose = false;
let mut directory = String::from(".");
{
let mut ap = ArgumentParser::new();
@@ -22,5 +20,5 @@ fn main() {
let mut index_writer = index.writer().unwrap();
let segments = index.segments();
println!("Merging {} segments", segments.len());
index_writer.merge(&segments);
index_writer.merge(&segments).unwrap();
}

View File

@@ -6,9 +6,10 @@ use core::index::SegmentInfo;
use core::index::SegmentComponent;
use core::fastfield::FastFieldSerializer;
use core::store::StoreWriter;
use core::postings::PostingsSerializer;
use core::convert_to_ioerror;
use postings::PostingsSerializer;
pub struct SegmentSerializer {
segment: Segment,
store_writer: StoreWriter,

View File

@@ -1,4 +1,4 @@
use core::schema::DocId;
use DocId;
use core::reader::SegmentReader;
use core::searcher::SegmentLocalId;
use core::searcher::DocAddress;

View File

@@ -1,305 +0,0 @@
use std::io::BufWriter;
use std::marker::Send;
use std::marker::Sync;
use std::io;
use std::io::Cursor;
use std::io::Write;
use std::io::Seek;
use std::io::SeekFrom;
use std::fs::File;
use std::fmt;
use std::collections::HashMap;
use std::collections::hash_map::Entry as HashMapEntry;
use fst::raw::MmapReadOnly;
use atomicwrites;
use std::sync::Arc;
use std::sync::RwLock;
use tempdir::TempDir;
use std::ops::Deref;
use std::path::{Path, PathBuf};
///////////////////////////////////////////////////////////////
pub enum ReadOnlySource {
Mmap(MmapReadOnly),
Anonymous(Vec<u8>),
}
impl Deref for ReadOnlySource {
type Target = [u8];
fn deref(&self) -> &[u8] {
self.as_slice()
}
}
impl ReadOnlySource {
pub fn len(&self,) -> usize {
self.as_slice().len()
}
pub fn as_slice(&self,) -> &[u8] {
match *self {
ReadOnlySource::Mmap(ref mmap_read_only) => unsafe { mmap_read_only.as_slice() },
ReadOnlySource::Anonymous(ref shared_vec) => shared_vec.as_slice(),
}
}
pub fn cursor<'a>(&'a self) -> Cursor<&'a [u8]> {
Cursor::new(&self.deref())
}
pub fn slice(&self, from_offset:usize, to_offset:usize) -> ReadOnlySource {
match *self {
ReadOnlySource::Mmap(ref mmap_read_only) => {
let sliced_mmap = mmap_read_only.range(from_offset, to_offset - from_offset);
ReadOnlySource::Mmap(sliced_mmap)
}
ReadOnlySource::Anonymous(ref shared_vec) => {
let sliced_data: Vec<u8> = Vec::from(&shared_vec[from_offset..to_offset]);
ReadOnlySource::Anonymous(sliced_data)
},
}
}
}
impl Clone for ReadOnlySource {
fn clone(&self) -> Self {
self.slice(0, self.len())
}
}
pub trait SeekableWrite: Seek + Write {}
impl<T: Seek + Write> SeekableWrite for T {}
pub type WritePtr = Box<SeekableWrite>;
//
// #[derive(Debug)]
// pub enum CreateError {
// RootDirectoryDoesNotExist,
// DirectoryAlreadyExists,
// CannotCreateTempDirectory(io::Error),
// }
pub trait Directory: fmt::Debug + Send + Sync {
fn open_read(&self, path: &Path) -> io::Result<ReadOnlySource>;
fn open_write(&mut self, path: &Path) -> io::Result<WritePtr>;
fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()>;
fn sync(&self, path: &Path) -> io::Result<()>;
fn sync_directory(&self,) -> io::Result<()>;
}
////////////////////////////////////////////////////////////////
// MmapDirectory
pub struct MmapDirectory {
root_path: PathBuf,
mmap_cache: RwLock<HashMap<PathBuf, MmapReadOnly>>,
_temp_directory: Option<TempDir>,
}
impl fmt::Debug for MmapDirectory {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "MmapDirectory({:?})", self.root_path)
}
}
impl MmapDirectory {
pub fn create_from_tempdir() -> io::Result<MmapDirectory> {
// TODO error management
let tempdir = try!(TempDir::new("index"));
let tempdir_path = PathBuf::from(tempdir.path());
let directory = MmapDirectory {
root_path: PathBuf::from(tempdir_path),
mmap_cache: RwLock::new(HashMap::new()),
_temp_directory: Some(tempdir)
};
Ok(directory)
}
pub fn create(filepath: &Path) -> io::Result<MmapDirectory> {
Ok(MmapDirectory {
root_path: PathBuf::from(filepath),
mmap_cache: RwLock::new(HashMap::new()),
_temp_directory: None
})
}
fn resolve_path(&self, relative_path: &Path) -> PathBuf {
self.root_path.join(relative_path)
}
}
impl Directory for MmapDirectory {
fn open_read(&self, path: &Path) -> io::Result<ReadOnlySource> {
let full_path = self.resolve_path(path);
let mut mmap_cache = self.mmap_cache.write().unwrap();
let mmap = match mmap_cache.entry(full_path.clone()) {
HashMapEntry::Occupied(e) => e.get().clone(),
HashMapEntry::Vacant(vacant_entry) => {
let new_mmap = try!(MmapReadOnly::open_path(full_path.clone()));
vacant_entry.insert(new_mmap.clone());
new_mmap
}
};
Ok(ReadOnlySource::Mmap(mmap))
}
fn open_write(&mut self, path: &Path) -> io::Result<WritePtr> {
let full_path = self.resolve_path(path);
let file = try!(File::create(full_path));
let buf_writer = BufWriter::new(file);
Ok(Box::new(buf_writer))
}
fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> {
let full_path = self.resolve_path(path);
let meta_file = atomicwrites::AtomicFile::new(full_path, atomicwrites::AllowOverwrite);
meta_file.write(|f| {
f.write_all(data)
})
}
fn sync(&self, path: &Path) -> io::Result<()> {
let full_path = self.resolve_path(path);
File::open(&full_path).and_then(|fd| fd.sync_all())
}
fn sync_directory(&self,) -> io::Result<()> {
File::open(&self.root_path).and_then(|fd| fd.sync_all())
}
}
////////////////////////////////////////////////////////////////
// RAMDirectory
#[derive(Clone)]
struct SharedVec(Arc<RwLock<Cursor<Vec<u8>>>>);
pub struct RAMDirectory {
fs: HashMap<PathBuf, SharedVec>,
}
impl SharedVec {
fn new() -> SharedVec {
SharedVec(Arc::new( RwLock::new(Cursor::new(Vec::new())) ))
}
}
impl Write for SharedVec {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
try!(self.0.write().unwrap().write(buf));
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl Seek for SharedVec {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
self.0.write().unwrap().seek(pos)
}
}
impl fmt::Debug for RAMDirectory {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "RAMDirectory")
}
}
impl RAMDirectory {
pub fn create() -> RAMDirectory {
RAMDirectory {
fs: HashMap::new()
}
}
}
impl Directory for RAMDirectory {
fn open_read(&self, path: &Path) -> io::Result<ReadOnlySource> {
match self.fs.get(path) {
Some(ref data) => {
let data_copy = (*data).0.read().unwrap().clone();
Ok(ReadOnlySource::Anonymous(data_copy.into_inner()))
},
None =>
Err(io::Error::new(io::ErrorKind::NotFound, format!("File has never been created. {:?}", path)))
}
}
fn open_write(&mut self, path: &Path) -> io::Result<WritePtr> {
let full_path = PathBuf::from(&path);
let data = SharedVec::new();
self.fs.insert(full_path, data.clone());
Ok(Box::new(data))
}
fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> {
let meta_file = atomicwrites::AtomicFile::new(PathBuf::from(path), atomicwrites::AllowOverwrite);
meta_file.write(|f| {
f.write_all(data)
})
}
fn sync(&self, _: &Path) -> io::Result<()> {
Ok(())
}
fn sync_directory(&self,) -> io::Result<()> {
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::Path;
#[test]
fn test_ram_directory() {
let mut ram_directory = RAMDirectory::create();
test_directory(&mut ram_directory);
}
#[test]
fn test_mmap_directory() {
let mut mmap_directory = MmapDirectory::create_from_tempdir().unwrap();
test_directory(&mut mmap_directory);
}
fn test_directory(directory: &mut Directory) {
{
let mut write_file = directory.open_write(Path::new("toto")).unwrap();
write_file.write_all(&[4]).unwrap();
write_file.write_all(&[3]).unwrap();
write_file.write_all(&[7,3,5]).unwrap();
}
let read_file = directory.open_read(Path::new("toto")).unwrap();
let data: &[u8] = &*read_file;
assert_eq!(data.len(), 5);
assert_eq!(data[0], 4);
assert_eq!(data[1], 3);
assert_eq!(data[2], 7);
assert_eq!(data[3], 3);
assert_eq!(data[4], 5);
}
}

View File

@@ -1,12 +1,9 @@
use std::io::Write;
use std::io;
use std::io::SeekFrom;
use std::io::Seek;
use core::directory::WritePtr;
use std::io::{SeekFrom, Seek, Write};
use directory::{WritePtr, ReadOnlySource};
use core::serialize::BinarySerializable;
use core::directory::ReadOnlySource;
use std::collections::HashMap;
use core::schema::DocId;
use DocId;
use core::schema::Schema;
use core::schema::Document;
use std::ops::Deref;
@@ -281,10 +278,8 @@ mod tests {
use super::U32FastFieldsWriter;
use core::schema::U32Field;
use std::path::Path;
use core::directory::WritePtr;
use core::directory::Directory;
use directory::{Directory, WritePtr, RAMDirectory};
use core::schema::Document;
use core::directory::RAMDirectory;
use core::schema::Schema;
use core::schema::FAST_U32;
use core::fastfield::FastFieldSerializer;

View File

@@ -1,14 +1,14 @@
use std::path::{PathBuf, Path};
use std::io;
use core::schema::Schema;
use core::schema::DocId;
use DocId;
use std::io::Write;
use std::sync::{Arc, RwLock, RwLockWriteGuard, RwLockReadGuard};
use std::fmt;
use rustc_serialize::json;
use std::io::Read;
use std::io::ErrorKind as IOErrorKind;
use core::directory::{Directory, MmapDirectory, RAMDirectory, ReadOnlySource, WritePtr};
use directory::{Directory, MmapDirectory, RAMDirectory, ReadOnlySource, WritePtr};
use core::writer::IndexWriter;
use core::searcher::Searcher;
use uuid::Uuid;

View File

@@ -1,13 +1,15 @@
use std::io;
use core::reader::SegmentReader;
use core::index::Segment;
use core::schema::DocId;
use DocId;
use core::index::SerializableSegment;
use core::codec::SegmentSerializer;
use core::postings::PostingsSerializer;
use core::postings::TermInfo;
use postings::PostingsSerializer;
use postings::TermInfo;
use std::collections::BinaryHeap;
use core::fstmap::FstMapIter;
use datastruct::FstMapIter;
use core::schema::Term;
use core::schema::Schema;
use core::fastfield::FastFieldSerializer;

View File

@@ -1,6 +1,4 @@
pub mod postings;
pub mod schema;
pub mod directory;
pub mod writer;
pub mod analyzer;
pub mod reader;
@@ -9,8 +7,6 @@ pub mod searcher;
pub mod collector;
pub mod serialize;
pub mod store;
pub mod simdcompression;
pub mod fstmap;
pub mod index;
pub mod fastfield;
pub mod fastdivide;

View File

@@ -1,327 +0,0 @@
use core::schema::DocId;
use std::ptr;
use std::collections::BTreeMap;
use core::schema::Term;
use core::fstmap::FstMapBuilder;
use core::index::Segment;
use core::directory::WritePtr;
use core::index::SegmentComponent;
use core::simdcompression;
use core::serialize::BinarySerializable;
use std::io::{Read, Write};
use std::io;
#[derive(Debug,Ord,PartialOrd,Eq,PartialEq,Clone)]
pub struct TermInfo {
pub doc_freq: u32,
pub postings_offset: u32,
}
impl BinarySerializable for TermInfo {
fn serialize(&self, writer: &mut Write) -> io::Result<usize> {
Ok(
try!(self.doc_freq.serialize(writer)) +
try!(self.postings_offset.serialize(writer))
)
}
fn deserialize(reader: &mut Read) -> io::Result<Self> {
let doc_freq = try!(u32::deserialize(reader));
let offset = try!(u32::deserialize(reader));
Ok(TermInfo {
doc_freq: doc_freq,
postings_offset: offset,
})
}
}
pub trait U32sRecorder {
fn new() -> Self;
fn record(&mut self, val: u32);
}
pub struct VecRecorder(Vec<u32>);
impl U32sRecorder for VecRecorder {
fn new() -> VecRecorder {
VecRecorder(Vec::new())
}
fn record(&mut self, val: u32) {
self.0.push(val);
}
}
pub struct ObliviousRecorder;
impl U32sRecorder for ObliviousRecorder {
fn new() -> ObliviousRecorder {
ObliviousRecorder
}
fn record(&mut self, val: u32) {
}
}
struct TermPostingsWriter<TermFreqsRec: U32sRecorder, PositionsRec: U32sRecorder> {
doc_ids: Vec<DocId>,
term_freqs: TermFreqsRec,
positions: PositionsRec,
current_position: u32,
current_freq: u32,
}
impl<TermFreqsRec: U32sRecorder, PositionsRec: U32sRecorder> TermPostingsWriter<TermFreqsRec, PositionsRec> {
pub fn new() -> TermPostingsWriter<TermFreqsRec, PositionsRec> {
TermPostingsWriter {
doc_ids: Vec::new(),
term_freqs: TermFreqsRec::new(),
positions: PositionsRec::new(),
current_position: 0u32,
current_freq: 0u32,
}
}
fn close_doc(&mut self,) {
self.term_freqs.record(self.current_freq);
self.current_freq = 0;
self.current_position = 0;
}
fn close(&mut self,) {
if self.current_freq > 0 {
self.close_doc();
}
}
fn is_new_doc(&self, doc: &DocId) -> bool {
match self.doc_ids.last() {
Some(&last_doc) => last_doc != *doc,
None => true,
}
}
pub fn doc_freq(&self) -> u32 {
self.doc_ids.len() as u32
}
pub fn suscribe(&mut self, doc: DocId, pos: u32) {
if self.is_new_doc(&doc) {
// this is the first time we meet this term for this document
// first close the previous document, and write its doc_freq.
self.close_doc();
self.doc_ids.push(doc);
}
self.current_freq += 1;
self.positions.record(pos - self.current_position);
self.current_position = pos;
}
}
pub struct PostingsWriter {
postings: Vec<TermPostingsWriter<ObliviousRecorder, ObliviousRecorder>>,
term_index: BTreeMap<Term, usize>,
}
impl PostingsWriter {
pub fn new() -> PostingsWriter {
PostingsWriter {
postings: Vec::new(),
term_index: BTreeMap::new(),
}
}
pub fn suscribe(&mut self, doc: DocId, pos: u32, term: Term) {
let doc_ids: &mut TermPostingsWriter<ObliviousRecorder, ObliviousRecorder> = self.get_term_postings(term);
doc_ids.suscribe(doc, pos);
}
fn get_term_postings(&mut self, term: Term) -> &mut TermPostingsWriter<ObliviousRecorder, ObliviousRecorder> {
match self.term_index.get(&term) {
Some(unord_id) => {
return &mut self.postings[*unord_id];
},
None => {}
}
let unord_id = self.term_index.len();
self.postings.push(TermPostingsWriter::new());
self.term_index.insert(term, unord_id.clone());
&mut self.postings[unord_id]
}
pub fn serialize(&self, serializer: &mut PostingsSerializer) -> io::Result<()> {
for (term, postings_id) in self.term_index.iter() {
let term_postings_writer = &self.postings[postings_id.clone()];
let term_docfreq = term_postings_writer.doc_freq();
try!(serializer.new_term(&term, term_docfreq));
for doc in term_postings_writer.doc_ids.iter() {
try!(serializer.write_doc(doc.clone(), None));
}
}
Ok(())
}
}
//////////////////////////////////
pub trait Postings: Iterator<Item=DocId> {
// after skipping position
// the iterator in such a way that the
// next call to next() will return a
// value greater or equal to target.
fn skip_next(&mut self, target: DocId) -> Option<DocId>;
}
pub struct PostingsSerializer {
terms_fst_builder: FstMapBuilder<WritePtr, TermInfo>, // TODO find an alternative to work around the "move"
postings_write: WritePtr,
positions_write: WritePtr,
written_bytes_postings: usize,
written_bytes_positions: usize,
encoder: simdcompression::Encoder,
doc_ids: Vec<DocId>,
}
impl PostingsSerializer {
pub fn open(segment: &Segment) -> io::Result<PostingsSerializer> {
let terms_write = try!(segment.open_write(SegmentComponent::TERMS));
let terms_fst_builder = try!(FstMapBuilder::new(terms_write));
let postings_write = try!(segment.open_write(SegmentComponent::POSTINGS));
let positions_write = try!(segment.open_write(SegmentComponent::POSITIONS));
Ok(PostingsSerializer {
terms_fst_builder: terms_fst_builder,
postings_write: postings_write,
positions_write: positions_write,
written_bytes_postings: 0,
written_bytes_positions: 0,
encoder: simdcompression::Encoder::new(),
doc_ids: Vec::new(),
})
}
pub fn new_term(&mut self, term: &Term, doc_freq: DocId) -> io::Result<()> {
try!(self.close_term());
self.doc_ids.clear();
let term_info = TermInfo {
doc_freq: doc_freq,
postings_offset: self.written_bytes_postings as u32,
};
self.terms_fst_builder
.insert(term.as_slice(), &term_info)
}
pub fn close_term(&mut self,) -> io::Result<()> {
if !self.doc_ids.is_empty() {
let docs_data = self.encoder.encode_sorted(&self.doc_ids);
self.written_bytes_postings += try!((docs_data.len() as u32).serialize(&mut self.postings_write));
for num in docs_data {
self.written_bytes_postings += try!(num.serialize(&mut self.postings_write));
}
}
Ok(())
}
pub fn write_doc(&mut self, doc_id: DocId, positions: Option<&[u32]>) -> io::Result<()> {
self.doc_ids.push(doc_id);
Ok(())
}
pub fn close(mut self,) -> io::Result<()> {
try!(self.close_term());
try!(self.terms_fst_builder.finish());
try!(self.postings_write.flush());
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use test::Bencher;
use core::schema::DocId;
#[derive(Debug)]
pub struct VecPostings {
doc_ids: Vec<DocId>,
cursor: usize,
}
impl VecPostings {
pub fn new(vals: Vec<DocId>) -> VecPostings {
VecPostings {
doc_ids: vals,
cursor: 0,
}
}
}
impl Postings for VecPostings {
// after skipping position
// the iterator in such a way that the
// next call to next() will return a
// value greater or equal to target.
fn skip_next(&mut self, target: DocId) -> Option<DocId> {
loop {
match Iterator::next(self) {
Some(val) if val >= target => {
return Some(val);
},
None => {
return None;
},
_ => {}
}
}
}
}
impl Iterator for VecPostings {
type Item = DocId;
fn next(&mut self,) -> Option<DocId> {
if self.cursor >= self.doc_ids.len() {
None
}
else {
self.cursor += 1;
Some(self.doc_ids[self.cursor - 1])
}
}
}
//
// #[test]
// fn test_intersection() {
// {
// let left = VecPostings::new(vec!(1, 3, 9));
// let right = VecPostings::new(vec!(3, 4, 9, 18));
// let inter = IntersectionPostings::from_postings(vec!(left, right));
// let vals: Vec<DocId> = inter.collect();
// assert_eq!(vals, vec!(3, 9));
// }
// {
// let a = VecPostings::new(vec!(1, 3, 9));
// let b = VecPostings::new(vec!(3, 4, 9, 18));
// let c = VecPostings::new(vec!(1, 5, 9, 111));
// let inter = IntersectionPostings::from_postings(vec!(a, b, c));
// let vals: Vec<DocId> = inter.collect();
// assert_eq!(vals, vec!(9));
// }
// }
//
// #[bench]
// fn bench_single_intersection(b: &mut Bencher) {
// b.iter(|| {
// let docs = VecPostings::new((0..1_000_000).collect());
// let intersection = IntersectionPostings::from_postings(vec!(docs));
// intersection.count()
// });
// }
}

View File

@@ -2,16 +2,14 @@ use core::index::{Segment, SegmentId};
use core::schema::Term;
use core::store::StoreReader;
use core::schema::Document;
use core::directory::ReadOnlySource;
use directory::ReadOnlySource;
use std::io::Cursor;
use core::schema::DocId;
use DocId;
use core::index::SegmentComponent;
use core::simdcompression::Decoder;
use std::io;
use std::iter;
use std::str;
use core::postings::TermInfo;
use core::fstmap::FstMap;
use postings::TermInfo;
use datastruct::FstMap;
use std::fmt;
use rustc_serialize::json;
use core::index::SegmentInfo;
@@ -21,7 +19,7 @@ use core::convert_to_ioerror;
use core::serialize::BinarySerializable;
use core::fastfield::U32FastFieldsReader;
use core::fastfield::U32FastFieldReader;
use core::simdcompression;
use compression;
use std::mem;
impl fmt::Debug for SegmentReader {
@@ -37,13 +35,13 @@ pub fn intersection(mut postings: Vec<SegmentPostings>) -> SegmentPostings {
.map(|v| v.len())
.min()
.unwrap();
let mut buffer: Vec<u32> = postings.pop().unwrap().0;
let buffer: Vec<u32> = postings.pop().unwrap().0;
let mut output: Vec<u32> = Vec::with_capacity(min_len);
unsafe { output.set_len(min_len); }
let mut pair = (output, buffer);
for posting in postings.iter() {
pair = (pair.1, pair.0);
let output_len = simdcompression::intersection(posting.0.as_slice(), pair.0.as_slice(), pair.1.as_mut_slice());
let output_len = compression::intersection(posting.0.as_slice(), pair.0.as_slice(), pair.1.as_mut_slice());
unsafe { pair.1.set_len(output_len); }
}
SegmentPostings(pair.1)
@@ -77,8 +75,8 @@ impl SegmentPostings {
let mut doc_ids: Vec<u32> = Vec::with_capacity(doc_freq as usize);
unsafe { doc_ids.set_len(doc_freq as usize); }
{
let decoder = Decoder::new();
let num_doc_ids = decoder.decode_sorted(&data_u32[1..(num_u32s+1) as usize], &mut doc_ids);
let decoder = compression::Decoder::new();
decoder.decode_sorted(&data_u32[1..(num_u32s+1) as usize], &mut doc_ids);
SegmentPostings(doc_ids)
}
}
@@ -212,7 +210,7 @@ impl SegmentReader {
for term in terms.iter() {
match self.get_term(term) {
Some(term_info) => {
let decode_one_timer = decode_timer.open("decode_one");
let _decode_one_timer = decode_timer.open("decode_one");
let segment_posting = self.read_postings(&term_info);
segment_postings.push(segment_posting);
}
@@ -224,7 +222,7 @@ impl SegmentReader {
}
}
{
let mut intersection_time = timer.open("intersection");
let _intersection_time = timer.open("intersection");
intersection(segment_postings)
}
}

View File

@@ -3,6 +3,7 @@ use std::collections::HashMap;
use std::slice;
use std::fmt;
use std::io;
use std::io::Read;
use core::serialize::BinarySerializable;
use rustc_serialize::Decodable;
@@ -12,10 +13,6 @@ use rustc_serialize::Encoder;
use std::ops::BitOr;
use std::borrow::Borrow;
/// u32 identifying a document within a segment.
/// Document gets their doc id assigned incrementally,
/// as they are added in the segment.
pub type DocId = u32;
#[derive(Clone,Debug,PartialEq,Eq, RustcDecodable, RustcEncodable)]
pub struct TextOptions {

View File

@@ -1,7 +1,7 @@
use core::reader::SegmentReader;
use core::index::Index;
use core::index::Segment;
use core::schema::DocId;
use DocId;
use core::schema::Document;
use core::collector::Collector;
use std::io;
@@ -56,12 +56,12 @@ impl Searcher {
for (segment_ord, segment) in self.segments.iter().enumerate() {
let mut segment_search_timer = search_timer.open("segment_search");
{
let set_segment_timer = segment_search_timer.open("set_segment");
let _ = segment_search_timer.open("set_segment");
try!(collector.set_segment(segment_ord as SegmentLocalId, &segment));
}
let postings = segment.search(terms, segment_search_timer.open("get_postings"));
{
let collection_timer = segment_search_timer.open("collection");
let _collection_timer = segment_search_timer.open("collection");
for doc_id in postings {
collector.collect(doc_id);
}

View File

@@ -1,10 +1,10 @@
use core::directory::WritePtr;
use directory::{WritePtr, ReadOnlySource};
use std::cell::RefCell;
use core::schema::DocId;
use DocId;
use core::schema::Document;
use core::schema::TextFieldValue;
use core::serialize::BinarySerializable;
use core::directory::ReadOnlySource;
use std::io::Write;
use std::io::Read;
use std::io::Cursor;
@@ -212,7 +212,7 @@ mod tests {
use core::schema::Schema;
use core::schema::TextOptions;
use core::schema::TextFieldValue;
use core::directory::{RAMDirectory, Directory, MmapDirectory, WritePtr};
use directory::{RAMDirectory, Directory, MmapDirectory, WritePtr};
fn write_lorem_ipsum_store(writer: WritePtr) -> Schema {
let mut schema = Schema::new();

View File

@@ -72,10 +72,10 @@ mod tests {
{
let mut ab = a.open("b");
{
let abc = ab.open("c");
let _abc = ab.open("c");
}
{
let abd = ab.open("d");
let _abd = ab.open("d");
}
}
}

View File

@@ -1,3 +1,4 @@
use DocId;
use core::schema::*;
use core::codec::*;
use core::index::Index;
@@ -6,7 +7,7 @@ use core::index::SerializableSegment;
use core::analyzer::StreamingIterator;
use core::index::Segment;
use core::index::SegmentInfo;
use core::postings::PostingsWriter;
use postings::PostingsWriter;
use core::fastfield::U32FastFieldsWriter;
use std::clone::Clone;
use std::sync::mpsc;

View File

@@ -5,7 +5,8 @@ use std::io::Cursor;
use fst;
use fst::raw::Fst;
use fst::Streamer;
use core::directory::ReadOnlySource;
use directory::ReadOnlySource;
use core::serialize::BinarySerializable;
use std::marker::PhantomData;
@@ -125,9 +126,8 @@ impl<V: BinarySerializable> FstMap<V> {
#[cfg(test)]
mod tests {
use super::*;
use core::directory::{RAMDirectory, Directory};
use directory::{RAMDirectory, Directory};
use std::path::PathBuf;
use fst::Streamer;
#[test]

4
src/datastruct/mod.rs Normal file
View File

@@ -0,0 +1,4 @@
mod fstmap;
pub use self::fstmap::FstMapBuilder;
pub use self::fstmap::FstMap;
pub use self::fstmap::FstMapIter;

View File

@@ -0,0 +1,23 @@
use std::marker::Send;
use std::marker::Sync;
use std::io;
use std::fmt;
use std::path::Path;
use directory::{ReadOnlySource, WritePtr};
///////////////////////////////////////////////////////////////
//
// #[derive(Debug)]
// pub enum CreateError {
// RootDirectoryDoesNotExist,
// DirectoryAlreadyExists,
// CannotCreateTempDirectory(io::Error),
// }
pub trait Directory: fmt::Debug + Send + Sync {
fn open_read(&self, path: &Path) -> io::Result<ReadOnlySource>;
fn open_write(&mut self, path: &Path) -> io::Result<WritePtr>;
fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()>;
fn sync(&self, path: &Path) -> io::Result<()>;
fn sync_directory(&self,) -> io::Result<()>;
}

View File

@@ -0,0 +1,99 @@
use std::path::{Path, PathBuf};
use tempdir::TempDir;
use std::collections::HashMap;
use std::collections::hash_map::Entry as HashMapEntry;
use fst::raw::MmapReadOnly;
use std::fs::File;
use atomicwrites;
use std::sync::RwLock;
use std::fmt;
use std::io::Write;
use std::io;
use directory::Directory;
use directory::ReadOnlySource;
use directory::WritePtr;
use std::io::BufWriter;
////////////////////////////////////////////////////////////////
// MmapDirectory
pub struct MmapDirectory {
root_path: PathBuf,
mmap_cache: RwLock<HashMap<PathBuf, MmapReadOnly>>,
_temp_directory: Option<TempDir>,
}
impl fmt::Debug for MmapDirectory {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "MmapDirectory({:?})", self.root_path)
}
}
impl MmapDirectory {
pub fn create_from_tempdir() -> io::Result<MmapDirectory> {
// TODO error management
let tempdir = try!(TempDir::new("index"));
let tempdir_path = PathBuf::from(tempdir.path());
let directory = MmapDirectory {
root_path: PathBuf::from(tempdir_path),
mmap_cache: RwLock::new(HashMap::new()),
_temp_directory: Some(tempdir)
};
Ok(directory)
}
pub fn create(filepath: &Path) -> io::Result<MmapDirectory> {
Ok(MmapDirectory {
root_path: PathBuf::from(filepath),
mmap_cache: RwLock::new(HashMap::new()),
_temp_directory: None
})
}
fn resolve_path(&self, relative_path: &Path) -> PathBuf {
self.root_path.join(relative_path)
}
}
impl Directory for MmapDirectory {
fn open_read(&self, path: &Path) -> io::Result<ReadOnlySource> {
let full_path = self.resolve_path(path);
let mut mmap_cache = self.mmap_cache.write().unwrap();
let mmap = match mmap_cache.entry(full_path.clone()) {
HashMapEntry::Occupied(e) => e.get().clone(),
HashMapEntry::Vacant(vacant_entry) => {
let new_mmap = try!(MmapReadOnly::open_path(full_path.clone()));
vacant_entry.insert(new_mmap.clone());
new_mmap
}
};
Ok(ReadOnlySource::Mmap(mmap))
}
fn open_write(&mut self, path: &Path) -> io::Result<WritePtr> {
let full_path = self.resolve_path(path);
let file = try!(File::create(full_path));
let buf_writer = BufWriter::new(file);
Ok(Box::new(buf_writer))
}
fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> {
let full_path = self.resolve_path(path);
let meta_file = atomicwrites::AtomicFile::new(full_path, atomicwrites::AllowOverwrite);
meta_file.write(|f| {
f.write_all(data)
})
}
fn sync(&self, path: &Path) -> io::Result<()> {
let full_path = self.resolve_path(path);
File::open(&full_path).and_then(|fd| fd.sync_all())
}
fn sync_directory(&self,) -> io::Result<()> {
File::open(&self.root_path).and_then(|fd| fd.sync_all())
}
}

113
src/directory/mod.rs Normal file
View File

@@ -0,0 +1,113 @@
mod mmap_directory;
mod ram_directory;
mod directory;
use std::ops::Deref;
use std::io::{Seek, Write, Cursor};
use fst::raw::MmapReadOnly;
pub use self::directory::Directory;
pub use self::ram_directory::RAMDirectory;
pub use self::mmap_directory::MmapDirectory;
////////////////////////////////////////
// WritePtr
pub trait SeekableWrite: Seek + Write {}
impl<T: Seek + Write> SeekableWrite for T {}
pub type WritePtr = Box<SeekableWrite>;
////////////////////////////////////////
// Read only source.
pub enum ReadOnlySource {
Mmap(MmapReadOnly),
Anonymous(Vec<u8>),
}
impl Deref for ReadOnlySource {
type Target = [u8];
fn deref(&self) -> &[u8] {
self.as_slice()
}
}
impl ReadOnlySource {
pub fn len(&self,) -> usize {
self.as_slice().len()
}
pub fn as_slice(&self,) -> &[u8] {
match *self {
ReadOnlySource::Mmap(ref mmap_read_only) => unsafe { mmap_read_only.as_slice() },
ReadOnlySource::Anonymous(ref shared_vec) => shared_vec.as_slice(),
}
}
pub fn cursor<'a>(&'a self) -> Cursor<&'a [u8]> {
Cursor::new(&self.deref())
}
pub fn slice(&self, from_offset:usize, to_offset:usize) -> ReadOnlySource {
match *self {
ReadOnlySource::Mmap(ref mmap_read_only) => {
let sliced_mmap = mmap_read_only.range(from_offset, to_offset - from_offset);
ReadOnlySource::Mmap(sliced_mmap)
}
ReadOnlySource::Anonymous(ref shared_vec) => {
let sliced_data: Vec<u8> = Vec::from(&shared_vec[from_offset..to_offset]);
ReadOnlySource::Anonymous(sliced_data)
},
}
}
}
impl Clone for ReadOnlySource {
fn clone(&self) -> Self {
self.slice(0, self.len())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::Path;
#[test]
fn test_ram_directory() {
let mut ram_directory = RAMDirectory::create();
test_directory(&mut ram_directory);
}
#[test]
fn test_mmap_directory() {
let mut mmap_directory = MmapDirectory::create_from_tempdir().unwrap();
test_directory(&mut mmap_directory);
}
fn test_directory(directory: &mut Directory) {
{
let mut write_file = directory.open_write(Path::new("toto")).unwrap();
write_file.write_all(&[4]).unwrap();
write_file.write_all(&[3]).unwrap();
write_file.write_all(&[7,3,5]).unwrap();
}
let read_file = directory.open_read(Path::new("toto")).unwrap();
let data: &[u8] = &*read_file;
assert_eq!(data.len(), 5);
assert_eq!(data[0], 4);
assert_eq!(data[1], 3);
assert_eq!(data[2], 7);
assert_eq!(data[3], 3);
assert_eq!(data[4], 5);
}
}

View File

@@ -0,0 +1,87 @@
use directory::{Directory, ReadOnlySource};
use std::io::{Cursor, Write, Seek, SeekFrom};
use std::io;
use atomicwrites;
use std::fmt;
use std::sync::{Arc, RwLock};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use directory::WritePtr;
#[derive(Clone)]
struct SharedVec(Arc<RwLock<Cursor<Vec<u8>>>>);
pub struct RAMDirectory {
fs: HashMap<PathBuf, SharedVec>,
}
impl SharedVec {
fn new() -> SharedVec {
SharedVec(Arc::new( RwLock::new(Cursor::new(Vec::new())) ))
}
}
impl Write for SharedVec {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
try!(self.0.write().unwrap().write(buf));
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl Seek for SharedVec {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
self.0.write().unwrap().seek(pos)
}
}
impl fmt::Debug for RAMDirectory {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "RAMDirectory")
}
}
impl RAMDirectory {
pub fn create() -> RAMDirectory {
RAMDirectory {
fs: HashMap::new()
}
}
}
impl Directory for RAMDirectory {
fn open_read(&self, path: &Path) -> io::Result<ReadOnlySource> {
match self.fs.get(path) {
Some(ref data) => {
let data_copy = (*data).0.read().unwrap().clone();
Ok(ReadOnlySource::Anonymous(data_copy.into_inner()))
},
None =>
Err(io::Error::new(io::ErrorKind::NotFound, format!("File has never been created. {:?}", path)))
}
}
fn open_write(&mut self, path: &Path) -> io::Result<WritePtr> {
let full_path = PathBuf::from(&path);
let data = SharedVec::new();
self.fs.insert(full_path, data.clone());
Ok(Box::new(data))
}
fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> {
let meta_file = atomicwrites::AtomicFile::new(PathBuf::from(path), atomicwrites::AllowOverwrite);
meta_file.write(|f| {
f.write_all(data)
})
}
fn sync(&self, _: &Path) -> io::Result<()> {
Ok(())
}
fn sync_directory(&self,) -> io::Result<()> {
Ok(())
}
}

View File

@@ -28,19 +28,26 @@ extern crate num_cpus;
#[cfg(test)] extern crate rand;
mod core;
mod datastruct;
mod postings;
mod directory;
mod compression;
pub use directory::Directory;
pub use core::analyzer;
pub use core::directory::Directory;
pub use core::searcher::Searcher;
pub use core::index::Index;
pub use core::schema;
pub use core::schema::Term;
pub use core::schema::Document;
pub use core::collector;
pub use core::schema::DocId;
pub use core::reader::SegmentReader;
pub use core::searcher::SegmentLocalId;
/// u32 identifying a document within a segment.
/// Document gets their doc id assigned incrementally,
/// as they are added in the segment.
pub type DocId = u32;
#[cfg(test)]
mod tests {

132
src/postings/mod.rs Normal file
View File

@@ -0,0 +1,132 @@
// pub mod postings;
// pub mod schema;
// pub mod directory;
// pub mod writer;
// pub mod analyzer;
// pub mod reader;
// pub mod codec;
// pub mod searcher;
// pub mod collector;
// pub mod serialize;
// pub mod store;
// pub mod simdcompression;
// pub mod fstmap;
// pub mod index;
// pub mod fastfield;
// pub mod fastdivide;
// pub mod merger;
// pub mod timer;
// use std::error;
// use std::io;
// pub fn convert_to_ioerror<E: 'static + error::Error + Send + Sync>(err: E) -> io::Error {
// io::Error::new(
// io::ErrorKind::InvalidData,
// err
// )
// }
mod serializer;
mod writer;
mod term_info;
use DocId;
pub use self::serializer::PostingsSerializer;
pub use self::writer::PostingsWriter;
pub use self::term_info::TermInfo;
pub trait Postings: Iterator<Item=DocId> {
// after skipping position
// the iterator in such a way that the
// next call to next() will return a
// value greater or equal to target.
fn skip_next(&mut self, target: DocId) -> Option<DocId>;
}
#[cfg(test)]
mod tests {
use super::*;
use DocId;
#[derive(Debug)]
pub struct VecPostings {
doc_ids: Vec<DocId>,
cursor: usize,
}
impl VecPostings {
pub fn new(vals: Vec<DocId>) -> VecPostings {
VecPostings {
doc_ids: vals,
cursor: 0,
}
}
}
impl Postings for VecPostings {
// after skipping position
// the iterator in such a way that the
// next call to next() will return a
// value greater or equal to target.
fn skip_next(&mut self, target: DocId) -> Option<DocId> {
loop {
match Iterator::next(self) {
Some(val) if val >= target => {
return Some(val);
},
None => {
return None;
},
_ => {}
}
}
}
}
impl Iterator for VecPostings {
type Item = DocId;
fn next(&mut self,) -> Option<DocId> {
if self.cursor >= self.doc_ids.len() {
None
}
else {
self.cursor += 1;
Some(self.doc_ids[self.cursor - 1])
}
}
}
// use test::Bencher;
// #[test]
// fn test_intersection() {
// {
// let left = VecPostings::new(vec!(1, 3, 9));
// let right = VecPostings::new(vec!(3, 4, 9, 18));
// let inter = IntersectionPostings::from_postings(vec!(left, right));
// let vals: Vec<DocId> = inter.collect();
// assert_eq!(vals, vec!(3, 9));
// }
// {
// let a = VecPostings::new(vec!(1, 3, 9));
// let b = VecPostings::new(vec!(3, 4, 9, 18));
// let c = VecPostings::new(vec!(1, 5, 9, 111));
// let inter = IntersectionPostings::from_postings(vec!(a, b, c));
// let vals: Vec<DocId> = inter.collect();
// assert_eq!(vals, vec!(9));
// }
// }
//
// #[bench]
// fn bench_single_intersection(b: &mut Bencher) {
// b.iter(|| {
// let docs = VecPostings::new((0..1_000_000).collect());
// let intersection = IntersectionPostings::from_postings(vec!(docs));
// intersection.count()
// });
// }
}

View File

@@ -0,0 +1,74 @@
use datastruct::FstMapBuilder;
use super::TermInfo;
use core::schema::Term;
use directory::WritePtr;
use compression;
use DocId;
use core::index::Segment;
use std::io;
use core::index::SegmentComponent;
use core::serialize::BinarySerializable;
pub struct PostingsSerializer {
terms_fst_builder: FstMapBuilder<WritePtr, TermInfo>, // TODO find an alternative to work around the "move"
postings_write: WritePtr,
positions_write: WritePtr,
written_bytes_postings: usize,
written_bytes_positions: usize,
encoder: compression::Encoder,
doc_ids: Vec<DocId>,
}
impl PostingsSerializer {
pub fn open(segment: &Segment) -> io::Result<PostingsSerializer> {
let terms_write = try!(segment.open_write(SegmentComponent::TERMS));
let terms_fst_builder = try!(FstMapBuilder::new(terms_write));
let postings_write = try!(segment.open_write(SegmentComponent::POSTINGS));
let positions_write = try!(segment.open_write(SegmentComponent::POSITIONS));
Ok(PostingsSerializer {
terms_fst_builder: terms_fst_builder,
postings_write: postings_write,
positions_write: positions_write,
written_bytes_postings: 0,
written_bytes_positions: 0,
encoder: compression::Encoder::new(),
doc_ids: Vec::new(),
})
}
pub fn new_term(&mut self, term: &Term, doc_freq: DocId) -> io::Result<()> {
try!(self.close_term());
self.doc_ids.clear();
let term_info = TermInfo {
doc_freq: doc_freq,
postings_offset: self.written_bytes_postings as u32,
};
self.terms_fst_builder
.insert(term.as_slice(), &term_info)
}
pub fn close_term(&mut self,) -> io::Result<()> {
if !self.doc_ids.is_empty() {
let docs_data = self.encoder.encode_sorted(&self.doc_ids);
self.written_bytes_postings += try!((docs_data.len() as u32).serialize(&mut self.postings_write));
for num in docs_data {
self.written_bytes_postings += try!(num.serialize(&mut self.postings_write));
}
}
Ok(())
}
pub fn write_doc(&mut self, doc_id: DocId, positions: Option<&[u32]>) -> io::Result<()> {
self.doc_ids.push(doc_id);
Ok(())
}
pub fn close(mut self,) -> io::Result<()> {
try!(self.close_term());
try!(self.terms_fst_builder.finish());
try!(self.postings_write.flush());
Ok(())
}
}

26
src/postings/term_info.rs Normal file
View File

@@ -0,0 +1,26 @@
use core::serialize::BinarySerializable;
use std::io;
#[derive(Debug,Ord,PartialOrd,Eq,PartialEq,Clone)]
pub struct TermInfo {
pub doc_freq: u32,
pub postings_offset: u32,
}
impl BinarySerializable for TermInfo {
fn serialize(&self, writer: &mut io::Write) -> io::Result<usize> {
Ok(
try!(self.doc_freq.serialize(writer)) +
try!(self.postings_offset.serialize(writer))
)
}
fn deserialize(reader: &mut io::Read) -> io::Result<Self> {
let doc_freq = try!(u32::deserialize(reader));
let offset = try!(u32::deserialize(reader));
Ok(TermInfo {
doc_freq: doc_freq,
postings_offset: offset,
})
}
}

141
src/postings/writer.rs Normal file
View File

@@ -0,0 +1,141 @@
use DocId;
use std::collections::BTreeMap;
use core::schema::Term;
use postings::PostingsSerializer;
use std::io;
pub trait U32sRecorder {
fn new() -> Self;
fn record(&mut self, val: u32);
}
pub struct VecRecorder(Vec<u32>);
impl U32sRecorder for VecRecorder {
fn new() -> VecRecorder {
VecRecorder(Vec::new())
}
fn record(&mut self, val: u32) {
self.0.push(val);
}
}
pub struct ObliviousRecorder;
impl U32sRecorder for ObliviousRecorder {
fn new() -> ObliviousRecorder {
ObliviousRecorder
}
fn record(&mut self, _: u32) {
}
}
struct TermPostingsWriter<TermFreqsRec: U32sRecorder, PositionsRec: U32sRecorder> {
doc_ids: Vec<DocId>,
term_freqs: TermFreqsRec,
positions: PositionsRec,
current_position: u32,
current_freq: u32,
}
impl<TermFreqsRec: U32sRecorder, PositionsRec: U32sRecorder> TermPostingsWriter<TermFreqsRec, PositionsRec> {
pub fn new() -> TermPostingsWriter<TermFreqsRec, PositionsRec> {
TermPostingsWriter {
doc_ids: Vec::new(),
term_freqs: TermFreqsRec::new(),
positions: PositionsRec::new(),
current_position: 0u32,
current_freq: 0u32,
}
}
fn close_doc(&mut self,) {
self.term_freqs.record(self.current_freq);
self.current_freq = 0;
self.current_position = 0;
}
fn close(&mut self,) {
if self.current_freq > 0 {
self.close_doc();
}
}
fn is_new_doc(&self, doc: &DocId) -> bool {
match self.doc_ids.last() {
Some(&last_doc) => last_doc != *doc,
None => true,
}
}
pub fn doc_freq(&self) -> u32 {
self.doc_ids.len() as u32
}
pub fn suscribe(&mut self, doc: DocId, pos: u32) {
if self.is_new_doc(&doc) {
// this is the first time we meet this term for this document
// first close the previous document, and write its doc_freq.
self.close_doc();
self.doc_ids.push(doc);
}
self.current_freq += 1;
self.positions.record(pos - self.current_position);
self.current_position = pos;
}
}
pub struct PostingsWriter {
postings: Vec<TermPostingsWriter<ObliviousRecorder, ObliviousRecorder>>,
term_index: BTreeMap<Term, usize>,
}
impl PostingsWriter {
pub fn new() -> PostingsWriter {
PostingsWriter {
postings: Vec::new(),
term_index: BTreeMap::new(),
}
}
pub fn suscribe(&mut self, doc: DocId, pos: u32, term: Term) {
let doc_ids: &mut TermPostingsWriter<ObliviousRecorder, ObliviousRecorder> = self.get_term_postings(term);
doc_ids.suscribe(doc, pos);
}
fn get_term_postings(&mut self, term: Term) -> &mut TermPostingsWriter<ObliviousRecorder, ObliviousRecorder> {
match self.term_index.get(&term) {
Some(unord_id) => {
return &mut self.postings[*unord_id];
},
None => {}
}
let unord_id = self.term_index.len();
self.postings.push(TermPostingsWriter::new());
self.term_index.insert(term, unord_id.clone());
&mut self.postings[unord_id]
}
pub fn serialize(&self, serializer: &mut PostingsSerializer) -> io::Result<()> {
for (term, postings_id) in self.term_index.iter() {
let term_postings_writer = &self.postings[postings_id.clone()];
let term_docfreq = term_postings_writer.doc_freq();
try!(serializer.new_term(&term, term_docfreq));
for doc in term_postings_writer.doc_ids.iter() {
try!(serializer.write_doc(doc.clone(), None));
}
}
Ok(())
}
}