From 26826ac4ea8f4c0ed5b8043f5214cd9db0619ca3 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Sat, 30 Apr 2016 18:45:44 +0900 Subject: [PATCH] Moved postings and directory to a different module --- src/cli/merge.rs | 6 +- .../simdcompression.rs => compression/mod.rs} | 0 src/core/codec.rs | 3 +- src/core/collector.rs | 2 +- src/core/directory.rs | 305 ---------------- src/core/fastfield.rs | 13 +- src/core/index.rs | 4 +- src/core/merger.rs | 10 +- src/core/mod.rs | 4 - src/core/postings.rs | 327 ------------------ src/core/reader.rs | 24 +- src/core/schema.rs | 5 +- src/core/searcher.rs | 6 +- src/core/store.rs | 8 +- src/core/timer.rs | 4 +- src/core/writer.rs | 3 +- src/{core => datastruct}/fstmap.rs | 6 +- src/datastruct/mod.rs | 4 + src/directory/directory.rs | 23 ++ src/directory/mmap_directory.rs | 99 ++++++ src/directory/mod.rs | 113 ++++++ src/directory/ram_directory.rs | 87 +++++ src/lib.rs | 11 +- src/postings/mod.rs | 132 +++++++ src/postings/serializer.rs | 74 ++++ src/postings/term_info.rs | 26 ++ src/postings/writer.rs | 141 ++++++++ 27 files changed, 751 insertions(+), 689 deletions(-) rename src/{core/simdcompression.rs => compression/mod.rs} (100%) delete mode 100644 src/core/directory.rs delete mode 100644 src/core/postings.rs rename src/{core => datastruct}/fstmap.rs (98%) create mode 100644 src/datastruct/mod.rs create mode 100644 src/directory/directory.rs create mode 100644 src/directory/mmap_directory.rs create mode 100644 src/directory/mod.rs create mode 100644 src/directory/ram_directory.rs create mode 100644 src/postings/mod.rs create mode 100644 src/postings/serializer.rs create mode 100644 src/postings/term_info.rs create mode 100644 src/postings/writer.rs diff --git a/src/cli/merge.rs b/src/cli/merge.rs index 5ef43b825..d419967c0 100644 --- a/src/cli/merge.rs +++ b/src/cli/merge.rs @@ -1,13 +1,11 @@ extern crate argparse; extern crate tantivy; -use argparse::{ArgumentParser, StoreTrue, Store}; +use argparse::{ArgumentParser, Store}; use tantivy::Index; use std::path::Path; fn main() { - - let mut verbose = false; let mut directory = String::from("."); { let mut ap = ArgumentParser::new(); @@ -22,5 +20,5 @@ fn main() { let mut index_writer = index.writer().unwrap(); let segments = index.segments(); println!("Merging {} segments", segments.len()); - index_writer.merge(&segments); + index_writer.merge(&segments).unwrap(); } diff --git a/src/core/simdcompression.rs b/src/compression/mod.rs similarity index 100% rename from src/core/simdcompression.rs rename to src/compression/mod.rs diff --git a/src/core/codec.rs b/src/core/codec.rs index 12a2cf4ca..a65cf2e0b 100644 --- a/src/core/codec.rs +++ b/src/core/codec.rs @@ -6,9 +6,10 @@ use core::index::SegmentInfo; use core::index::SegmentComponent; use core::fastfield::FastFieldSerializer; use core::store::StoreWriter; -use core::postings::PostingsSerializer; use core::convert_to_ioerror; +use postings::PostingsSerializer; + pub struct SegmentSerializer { segment: Segment, store_writer: StoreWriter, diff --git a/src/core/collector.rs b/src/core/collector.rs index 63f376919..def40c60b 100644 --- a/src/core/collector.rs +++ b/src/core/collector.rs @@ -1,4 +1,4 @@ -use core::schema::DocId; +use DocId; use core::reader::SegmentReader; use core::searcher::SegmentLocalId; use core::searcher::DocAddress; diff --git a/src/core/directory.rs b/src/core/directory.rs deleted file mode 100644 index d6711f9d6..000000000 --- a/src/core/directory.rs +++ /dev/null @@ -1,305 +0,0 @@ -use std::io::BufWriter; -use std::marker::Send; -use std::marker::Sync; -use std::io; -use std::io::Cursor; -use std::io::Write; -use std::io::Seek; -use std::io::SeekFrom; -use std::fs::File; -use std::fmt; -use std::collections::HashMap; -use std::collections::hash_map::Entry as HashMapEntry; -use fst::raw::MmapReadOnly; -use atomicwrites; -use std::sync::Arc; -use std::sync::RwLock; -use tempdir::TempDir; -use std::ops::Deref; -use std::path::{Path, PathBuf}; - -/////////////////////////////////////////////////////////////// - -pub enum ReadOnlySource { - Mmap(MmapReadOnly), - Anonymous(Vec), -} - -impl Deref for ReadOnlySource { - type Target = [u8]; - - fn deref(&self) -> &[u8] { - self.as_slice() - } -} - - -impl ReadOnlySource { - - pub fn len(&self,) -> usize { - self.as_slice().len() - } - - pub fn as_slice(&self,) -> &[u8] { - match *self { - ReadOnlySource::Mmap(ref mmap_read_only) => unsafe { mmap_read_only.as_slice() }, - ReadOnlySource::Anonymous(ref shared_vec) => shared_vec.as_slice(), - } - } - - pub fn cursor<'a>(&'a self) -> Cursor<&'a [u8]> { - Cursor::new(&self.deref()) - } - - pub fn slice(&self, from_offset:usize, to_offset:usize) -> ReadOnlySource { - match *self { - ReadOnlySource::Mmap(ref mmap_read_only) => { - let sliced_mmap = mmap_read_only.range(from_offset, to_offset - from_offset); - ReadOnlySource::Mmap(sliced_mmap) - } - ReadOnlySource::Anonymous(ref shared_vec) => { - let sliced_data: Vec = Vec::from(&shared_vec[from_offset..to_offset]); - ReadOnlySource::Anonymous(sliced_data) - }, - } - } -} - - -impl Clone for ReadOnlySource { - fn clone(&self) -> Self { - self.slice(0, self.len()) - } -} - - -pub trait SeekableWrite: Seek + Write {} -impl SeekableWrite for T {} -pub type WritePtr = Box; - -// -// #[derive(Debug)] -// pub enum CreateError { -// RootDirectoryDoesNotExist, -// DirectoryAlreadyExists, -// CannotCreateTempDirectory(io::Error), -// } - -pub trait Directory: fmt::Debug + Send + Sync { - fn open_read(&self, path: &Path) -> io::Result; - fn open_write(&mut self, path: &Path) -> io::Result; - fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()>; - fn sync(&self, path: &Path) -> io::Result<()>; - fn sync_directory(&self,) -> io::Result<()>; -} - - -//////////////////////////////////////////////////////////////// -// MmapDirectory - -pub struct MmapDirectory { - root_path: PathBuf, - mmap_cache: RwLock>, - _temp_directory: Option, -} - -impl fmt::Debug for MmapDirectory { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "MmapDirectory({:?})", self.root_path) - } -} - - -impl MmapDirectory { - - pub fn create_from_tempdir() -> io::Result { - // TODO error management - let tempdir = try!(TempDir::new("index")); - let tempdir_path = PathBuf::from(tempdir.path()); - let directory = MmapDirectory { - root_path: PathBuf::from(tempdir_path), - mmap_cache: RwLock::new(HashMap::new()), - _temp_directory: Some(tempdir) - }; - Ok(directory) - } - - pub fn create(filepath: &Path) -> io::Result { - Ok(MmapDirectory { - root_path: PathBuf::from(filepath), - mmap_cache: RwLock::new(HashMap::new()), - _temp_directory: None - }) - } - - fn resolve_path(&self, relative_path: &Path) -> PathBuf { - self.root_path.join(relative_path) - } - - -} - -impl Directory for MmapDirectory { - fn open_read(&self, path: &Path) -> io::Result { - let full_path = self.resolve_path(path); - let mut mmap_cache = self.mmap_cache.write().unwrap(); - let mmap = match mmap_cache.entry(full_path.clone()) { - HashMapEntry::Occupied(e) => e.get().clone(), - HashMapEntry::Vacant(vacant_entry) => { - let new_mmap = try!(MmapReadOnly::open_path(full_path.clone())); - vacant_entry.insert(new_mmap.clone()); - new_mmap - } - }; - Ok(ReadOnlySource::Mmap(mmap)) - } - fn open_write(&mut self, path: &Path) -> io::Result { - let full_path = self.resolve_path(path); - let file = try!(File::create(full_path)); - let buf_writer = BufWriter::new(file); - Ok(Box::new(buf_writer)) - } - - fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> { - let full_path = self.resolve_path(path); - let meta_file = atomicwrites::AtomicFile::new(full_path, atomicwrites::AllowOverwrite); - meta_file.write(|f| { - f.write_all(data) - }) - } - - fn sync(&self, path: &Path) -> io::Result<()> { - let full_path = self.resolve_path(path); - File::open(&full_path).and_then(|fd| fd.sync_all()) - } - - fn sync_directory(&self,) -> io::Result<()> { - File::open(&self.root_path).and_then(|fd| fd.sync_all()) - } -} - - - - -//////////////////////////////////////////////////////////////// -// RAMDirectory - - -#[derive(Clone)] -struct SharedVec(Arc>>>); - - -pub struct RAMDirectory { - fs: HashMap, -} - -impl SharedVec { - fn new() -> SharedVec { - SharedVec(Arc::new( RwLock::new(Cursor::new(Vec::new())) )) - } -} - -impl Write for SharedVec { - fn write(&mut self, buf: &[u8]) -> io::Result { - try!(self.0.write().unwrap().write(buf)); - Ok(buf.len()) - } - fn flush(&mut self) -> io::Result<()> { - Ok(()) - } -} - -impl Seek for SharedVec { - fn seek(&mut self, pos: SeekFrom) -> io::Result { - self.0.write().unwrap().seek(pos) - } -} - -impl fmt::Debug for RAMDirectory { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "RAMDirectory") - } -} - -impl RAMDirectory { - pub fn create() -> RAMDirectory { - RAMDirectory { - fs: HashMap::new() - } - } -} - -impl Directory for RAMDirectory { - fn open_read(&self, path: &Path) -> io::Result { - match self.fs.get(path) { - Some(ref data) => { - let data_copy = (*data).0.read().unwrap().clone(); - Ok(ReadOnlySource::Anonymous(data_copy.into_inner())) - }, - None => - Err(io::Error::new(io::ErrorKind::NotFound, format!("File has never been created. {:?}", path))) - } - } - fn open_write(&mut self, path: &Path) -> io::Result { - let full_path = PathBuf::from(&path); - let data = SharedVec::new(); - self.fs.insert(full_path, data.clone()); - Ok(Box::new(data)) - } - - fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> { - let meta_file = atomicwrites::AtomicFile::new(PathBuf::from(path), atomicwrites::AllowOverwrite); - meta_file.write(|f| { - f.write_all(data) - }) - } - - fn sync(&self, _: &Path) -> io::Result<()> { - Ok(()) - } - - fn sync_directory(&self,) -> io::Result<()> { - Ok(()) - } -} - - -#[cfg(test)] -mod tests { - - use super::*; - use std::path::Path; - - #[test] - fn test_ram_directory() { - let mut ram_directory = RAMDirectory::create(); - test_directory(&mut ram_directory); - } - - #[test] - fn test_mmap_directory() { - let mut mmap_directory = MmapDirectory::create_from_tempdir().unwrap(); - test_directory(&mut mmap_directory); - } - - fn test_directory(directory: &mut Directory) { - { - let mut write_file = directory.open_write(Path::new("toto")).unwrap(); - write_file.write_all(&[4]).unwrap(); - write_file.write_all(&[3]).unwrap(); - write_file.write_all(&[7,3,5]).unwrap(); - } - let read_file = directory.open_read(Path::new("toto")).unwrap(); - let data: &[u8] = &*read_file; - assert_eq!(data.len(), 5); - assert_eq!(data[0], 4); - assert_eq!(data[1], 3); - assert_eq!(data[2], 7); - assert_eq!(data[3], 3); - assert_eq!(data[4], 5); - } - - - - -} diff --git a/src/core/fastfield.rs b/src/core/fastfield.rs index 8678a53fb..8daed2604 100644 --- a/src/core/fastfield.rs +++ b/src/core/fastfield.rs @@ -1,12 +1,9 @@ -use std::io::Write; use std::io; -use std::io::SeekFrom; -use std::io::Seek; -use core::directory::WritePtr; +use std::io::{SeekFrom, Seek, Write}; +use directory::{WritePtr, ReadOnlySource}; use core::serialize::BinarySerializable; -use core::directory::ReadOnlySource; use std::collections::HashMap; -use core::schema::DocId; +use DocId; use core::schema::Schema; use core::schema::Document; use std::ops::Deref; @@ -281,10 +278,8 @@ mod tests { use super::U32FastFieldsWriter; use core::schema::U32Field; use std::path::Path; - use core::directory::WritePtr; - use core::directory::Directory; + use directory::{Directory, WritePtr, RAMDirectory}; use core::schema::Document; - use core::directory::RAMDirectory; use core::schema::Schema; use core::schema::FAST_U32; use core::fastfield::FastFieldSerializer; diff --git a/src/core/index.rs b/src/core/index.rs index 19c268fef..6be29a140 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -1,14 +1,14 @@ use std::path::{PathBuf, Path}; use std::io; use core::schema::Schema; -use core::schema::DocId; +use DocId; use std::io::Write; use std::sync::{Arc, RwLock, RwLockWriteGuard, RwLockReadGuard}; use std::fmt; use rustc_serialize::json; use std::io::Read; use std::io::ErrorKind as IOErrorKind; -use core::directory::{Directory, MmapDirectory, RAMDirectory, ReadOnlySource, WritePtr}; +use directory::{Directory, MmapDirectory, RAMDirectory, ReadOnlySource, WritePtr}; use core::writer::IndexWriter; use core::searcher::Searcher; use uuid::Uuid; diff --git a/src/core/merger.rs b/src/core/merger.rs index 882493755..8f7437640 100644 --- a/src/core/merger.rs +++ b/src/core/merger.rs @@ -1,13 +1,15 @@ use std::io; use core::reader::SegmentReader; use core::index::Segment; -use core::schema::DocId; +use DocId; use core::index::SerializableSegment; use core::codec::SegmentSerializer; -use core::postings::PostingsSerializer; -use core::postings::TermInfo; + +use postings::PostingsSerializer; +use postings::TermInfo; + use std::collections::BinaryHeap; -use core::fstmap::FstMapIter; +use datastruct::FstMapIter; use core::schema::Term; use core::schema::Schema; use core::fastfield::FastFieldSerializer; diff --git a/src/core/mod.rs b/src/core/mod.rs index cc1be2737..a374e2b4b 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -1,6 +1,4 @@ -pub mod postings; pub mod schema; -pub mod directory; pub mod writer; pub mod analyzer; pub mod reader; @@ -9,8 +7,6 @@ pub mod searcher; pub mod collector; pub mod serialize; pub mod store; -pub mod simdcompression; -pub mod fstmap; pub mod index; pub mod fastfield; pub mod fastdivide; diff --git a/src/core/postings.rs b/src/core/postings.rs deleted file mode 100644 index 357a738a1..000000000 --- a/src/core/postings.rs +++ /dev/null @@ -1,327 +0,0 @@ -use core::schema::DocId; -use std::ptr; -use std::collections::BTreeMap; -use core::schema::Term; -use core::fstmap::FstMapBuilder; -use core::index::Segment; -use core::directory::WritePtr; -use core::index::SegmentComponent; -use core::simdcompression; -use core::serialize::BinarySerializable; -use std::io::{Read, Write}; -use std::io; - -#[derive(Debug,Ord,PartialOrd,Eq,PartialEq,Clone)] -pub struct TermInfo { - pub doc_freq: u32, - pub postings_offset: u32, -} - -impl BinarySerializable for TermInfo { - fn serialize(&self, writer: &mut Write) -> io::Result { - Ok( - try!(self.doc_freq.serialize(writer)) + - try!(self.postings_offset.serialize(writer)) - ) - } - fn deserialize(reader: &mut Read) -> io::Result { - let doc_freq = try!(u32::deserialize(reader)); - let offset = try!(u32::deserialize(reader)); - Ok(TermInfo { - doc_freq: doc_freq, - postings_offset: offset, - }) - } -} - - -pub trait U32sRecorder { - fn new() -> Self; - fn record(&mut self, val: u32); -} - -pub struct VecRecorder(Vec); - -impl U32sRecorder for VecRecorder { - fn new() -> VecRecorder { - VecRecorder(Vec::new()) - } - fn record(&mut self, val: u32) { - self.0.push(val); - } -} - -pub struct ObliviousRecorder; - -impl U32sRecorder for ObliviousRecorder { - fn new() -> ObliviousRecorder { - ObliviousRecorder - } - fn record(&mut self, val: u32) { - } -} - -struct TermPostingsWriter { - doc_ids: Vec, - term_freqs: TermFreqsRec, - positions: PositionsRec, - current_position: u32, - current_freq: u32, -} - -impl TermPostingsWriter { - pub fn new() -> TermPostingsWriter { - TermPostingsWriter { - doc_ids: Vec::new(), - term_freqs: TermFreqsRec::new(), - positions: PositionsRec::new(), - current_position: 0u32, - current_freq: 0u32, - } - } - - fn close_doc(&mut self,) { - self.term_freqs.record(self.current_freq); - self.current_freq = 0; - self.current_position = 0; - } - - fn close(&mut self,) { - if self.current_freq > 0 { - self.close_doc(); - } - } - - fn is_new_doc(&self, doc: &DocId) -> bool { - match self.doc_ids.last() { - Some(&last_doc) => last_doc != *doc, - None => true, - } - } - - pub fn doc_freq(&self) -> u32 { - self.doc_ids.len() as u32 - } - - pub fn suscribe(&mut self, doc: DocId, pos: u32) { - if self.is_new_doc(&doc) { - // this is the first time we meet this term for this document - // first close the previous document, and write its doc_freq. - self.close_doc(); - self.doc_ids.push(doc); - } - self.current_freq += 1; - self.positions.record(pos - self.current_position); - self.current_position = pos; - } -} - -pub struct PostingsWriter { - postings: Vec>, - term_index: BTreeMap, -} - -impl PostingsWriter { - - pub fn new() -> PostingsWriter { - PostingsWriter { - postings: Vec::new(), - term_index: BTreeMap::new(), - } - } - - pub fn suscribe(&mut self, doc: DocId, pos: u32, term: Term) { - let doc_ids: &mut TermPostingsWriter = self.get_term_postings(term); - doc_ids.suscribe(doc, pos); - } - - fn get_term_postings(&mut self, term: Term) -> &mut TermPostingsWriter { - match self.term_index.get(&term) { - Some(unord_id) => { - return &mut self.postings[*unord_id]; - }, - None => {} - } - let unord_id = self.term_index.len(); - self.postings.push(TermPostingsWriter::new()); - self.term_index.insert(term, unord_id.clone()); - &mut self.postings[unord_id] - } - - pub fn serialize(&self, serializer: &mut PostingsSerializer) -> io::Result<()> { - for (term, postings_id) in self.term_index.iter() { - let term_postings_writer = &self.postings[postings_id.clone()]; - let term_docfreq = term_postings_writer.doc_freq(); - try!(serializer.new_term(&term, term_docfreq)); - for doc in term_postings_writer.doc_ids.iter() { - try!(serializer.write_doc(doc.clone(), None)); - } - } - Ok(()) - } - - -} - - -////////////////////////////////// - -pub trait Postings: Iterator { - // after skipping position - // the iterator in such a way that the - // next call to next() will return a - // value greater or equal to target. - fn skip_next(&mut self, target: DocId) -> Option; -} - - -pub struct PostingsSerializer { - terms_fst_builder: FstMapBuilder, // TODO find an alternative to work around the "move" - postings_write: WritePtr, - positions_write: WritePtr, - written_bytes_postings: usize, - written_bytes_positions: usize, - encoder: simdcompression::Encoder, - doc_ids: Vec, -} - -impl PostingsSerializer { - - pub fn open(segment: &Segment) -> io::Result { - let terms_write = try!(segment.open_write(SegmentComponent::TERMS)); - let terms_fst_builder = try!(FstMapBuilder::new(terms_write)); - let postings_write = try!(segment.open_write(SegmentComponent::POSTINGS)); - let positions_write = try!(segment.open_write(SegmentComponent::POSITIONS)); - Ok(PostingsSerializer { - terms_fst_builder: terms_fst_builder, - postings_write: postings_write, - positions_write: positions_write, - written_bytes_postings: 0, - written_bytes_positions: 0, - encoder: simdcompression::Encoder::new(), - doc_ids: Vec::new(), - }) - } - - pub fn new_term(&mut self, term: &Term, doc_freq: DocId) -> io::Result<()> { - try!(self.close_term()); - self.doc_ids.clear(); - let term_info = TermInfo { - doc_freq: doc_freq, - postings_offset: self.written_bytes_postings as u32, - }; - self.terms_fst_builder - .insert(term.as_slice(), &term_info) - } - - pub fn close_term(&mut self,) -> io::Result<()> { - if !self.doc_ids.is_empty() { - let docs_data = self.encoder.encode_sorted(&self.doc_ids); - self.written_bytes_postings += try!((docs_data.len() as u32).serialize(&mut self.postings_write)); - for num in docs_data { - self.written_bytes_postings += try!(num.serialize(&mut self.postings_write)); - } - } - Ok(()) - } - - pub fn write_doc(&mut self, doc_id: DocId, positions: Option<&[u32]>) -> io::Result<()> { - self.doc_ids.push(doc_id); - Ok(()) - } - - - pub fn close(mut self,) -> io::Result<()> { - try!(self.close_term()); - try!(self.terms_fst_builder.finish()); - try!(self.postings_write.flush()); - Ok(()) - } -} - - - -#[cfg(test)] -mod tests { - - use super::*; - use test::Bencher; - use core::schema::DocId; - - - #[derive(Debug)] - pub struct VecPostings { - doc_ids: Vec, - cursor: usize, - } - - impl VecPostings { - pub fn new(vals: Vec) -> VecPostings { - VecPostings { - doc_ids: vals, - cursor: 0, - } - } - } - - impl Postings for VecPostings { - // after skipping position - // the iterator in such a way that the - // next call to next() will return a - // value greater or equal to target. - fn skip_next(&mut self, target: DocId) -> Option { - loop { - match Iterator::next(self) { - Some(val) if val >= target => { - return Some(val); - }, - None => { - return None; - }, - _ => {} - } - } - } - } - - impl Iterator for VecPostings { - type Item = DocId; - fn next(&mut self,) -> Option { - if self.cursor >= self.doc_ids.len() { - None - } - else { - self.cursor += 1; - Some(self.doc_ids[self.cursor - 1]) - } - } - } - // - // #[test] - // fn test_intersection() { - // { - // let left = VecPostings::new(vec!(1, 3, 9)); - // let right = VecPostings::new(vec!(3, 4, 9, 18)); - // let inter = IntersectionPostings::from_postings(vec!(left, right)); - // let vals: Vec = inter.collect(); - // assert_eq!(vals, vec!(3, 9)); - // } - // { - // let a = VecPostings::new(vec!(1, 3, 9)); - // let b = VecPostings::new(vec!(3, 4, 9, 18)); - // let c = VecPostings::new(vec!(1, 5, 9, 111)); - // let inter = IntersectionPostings::from_postings(vec!(a, b, c)); - // let vals: Vec = inter.collect(); - // assert_eq!(vals, vec!(9)); - // } - // } - // - // #[bench] - // fn bench_single_intersection(b: &mut Bencher) { - // b.iter(|| { - // let docs = VecPostings::new((0..1_000_000).collect()); - // let intersection = IntersectionPostings::from_postings(vec!(docs)); - // intersection.count() - // }); - // } -} diff --git a/src/core/reader.rs b/src/core/reader.rs index 57f1c405f..ef1aedbe1 100644 --- a/src/core/reader.rs +++ b/src/core/reader.rs @@ -2,16 +2,14 @@ use core::index::{Segment, SegmentId}; use core::schema::Term; use core::store::StoreReader; use core::schema::Document; -use core::directory::ReadOnlySource; +use directory::ReadOnlySource; use std::io::Cursor; -use core::schema::DocId; +use DocId; use core::index::SegmentComponent; -use core::simdcompression::Decoder; use std::io; -use std::iter; use std::str; -use core::postings::TermInfo; -use core::fstmap::FstMap; +use postings::TermInfo; +use datastruct::FstMap; use std::fmt; use rustc_serialize::json; use core::index::SegmentInfo; @@ -21,7 +19,7 @@ use core::convert_to_ioerror; use core::serialize::BinarySerializable; use core::fastfield::U32FastFieldsReader; use core::fastfield::U32FastFieldReader; -use core::simdcompression; +use compression; use std::mem; impl fmt::Debug for SegmentReader { @@ -37,13 +35,13 @@ pub fn intersection(mut postings: Vec) -> SegmentPostings { .map(|v| v.len()) .min() .unwrap(); - let mut buffer: Vec = postings.pop().unwrap().0; + let buffer: Vec = postings.pop().unwrap().0; let mut output: Vec = Vec::with_capacity(min_len); unsafe { output.set_len(min_len); } let mut pair = (output, buffer); for posting in postings.iter() { pair = (pair.1, pair.0); - let output_len = simdcompression::intersection(posting.0.as_slice(), pair.0.as_slice(), pair.1.as_mut_slice()); + let output_len = compression::intersection(posting.0.as_slice(), pair.0.as_slice(), pair.1.as_mut_slice()); unsafe { pair.1.set_len(output_len); } } SegmentPostings(pair.1) @@ -77,8 +75,8 @@ impl SegmentPostings { let mut doc_ids: Vec = Vec::with_capacity(doc_freq as usize); unsafe { doc_ids.set_len(doc_freq as usize); } { - let decoder = Decoder::new(); - let num_doc_ids = decoder.decode_sorted(&data_u32[1..(num_u32s+1) as usize], &mut doc_ids); + let decoder = compression::Decoder::new(); + decoder.decode_sorted(&data_u32[1..(num_u32s+1) as usize], &mut doc_ids); SegmentPostings(doc_ids) } } @@ -212,7 +210,7 @@ impl SegmentReader { for term in terms.iter() { match self.get_term(term) { Some(term_info) => { - let decode_one_timer = decode_timer.open("decode_one"); + let _decode_one_timer = decode_timer.open("decode_one"); let segment_posting = self.read_postings(&term_info); segment_postings.push(segment_posting); } @@ -224,7 +222,7 @@ impl SegmentReader { } } { - let mut intersection_time = timer.open("intersection"); + let _intersection_time = timer.open("intersection"); intersection(segment_postings) } } diff --git a/src/core/schema.rs b/src/core/schema.rs index 1a1536a9c..e1457dee8 100644 --- a/src/core/schema.rs +++ b/src/core/schema.rs @@ -3,6 +3,7 @@ use std::collections::HashMap; use std::slice; use std::fmt; use std::io; + use std::io::Read; use core::serialize::BinarySerializable; use rustc_serialize::Decodable; @@ -12,10 +13,6 @@ use rustc_serialize::Encoder; use std::ops::BitOr; use std::borrow::Borrow; -/// u32 identifying a document within a segment. -/// Document gets their doc id assigned incrementally, -/// as they are added in the segment. -pub type DocId = u32; #[derive(Clone,Debug,PartialEq,Eq, RustcDecodable, RustcEncodable)] pub struct TextOptions { diff --git a/src/core/searcher.rs b/src/core/searcher.rs index 9be6e10f8..a475016a4 100644 --- a/src/core/searcher.rs +++ b/src/core/searcher.rs @@ -1,7 +1,7 @@ use core::reader::SegmentReader; use core::index::Index; use core::index::Segment; -use core::schema::DocId; +use DocId; use core::schema::Document; use core::collector::Collector; use std::io; @@ -56,12 +56,12 @@ impl Searcher { for (segment_ord, segment) in self.segments.iter().enumerate() { let mut segment_search_timer = search_timer.open("segment_search"); { - let set_segment_timer = segment_search_timer.open("set_segment"); + let _ = segment_search_timer.open("set_segment"); try!(collector.set_segment(segment_ord as SegmentLocalId, &segment)); } let postings = segment.search(terms, segment_search_timer.open("get_postings")); { - let collection_timer = segment_search_timer.open("collection"); + let _collection_timer = segment_search_timer.open("collection"); for doc_id in postings { collector.collect(doc_id); } diff --git a/src/core/store.rs b/src/core/store.rs index f2ddc1255..71872f240 100644 --- a/src/core/store.rs +++ b/src/core/store.rs @@ -1,10 +1,10 @@ -use core::directory::WritePtr; +use directory::{WritePtr, ReadOnlySource}; use std::cell::RefCell; -use core::schema::DocId; +use DocId; use core::schema::Document; use core::schema::TextFieldValue; use core::serialize::BinarySerializable; -use core::directory::ReadOnlySource; + use std::io::Write; use std::io::Read; use std::io::Cursor; @@ -212,7 +212,7 @@ mod tests { use core::schema::Schema; use core::schema::TextOptions; use core::schema::TextFieldValue; - use core::directory::{RAMDirectory, Directory, MmapDirectory, WritePtr}; + use directory::{RAMDirectory, Directory, MmapDirectory, WritePtr}; fn write_lorem_ipsum_store(writer: WritePtr) -> Schema { let mut schema = Schema::new(); diff --git a/src/core/timer.rs b/src/core/timer.rs index f873ad43c..8e5277dcc 100644 --- a/src/core/timer.rs +++ b/src/core/timer.rs @@ -72,10 +72,10 @@ mod tests { { let mut ab = a.open("b"); { - let abc = ab.open("c"); + let _abc = ab.open("c"); } { - let abd = ab.open("d"); + let _abd = ab.open("d"); } } } diff --git a/src/core/writer.rs b/src/core/writer.rs index 6fd2333e6..3c21979b6 100644 --- a/src/core/writer.rs +++ b/src/core/writer.rs @@ -1,3 +1,4 @@ +use DocId; use core::schema::*; use core::codec::*; use core::index::Index; @@ -6,7 +7,7 @@ use core::index::SerializableSegment; use core::analyzer::StreamingIterator; use core::index::Segment; use core::index::SegmentInfo; -use core::postings::PostingsWriter; +use postings::PostingsWriter; use core::fastfield::U32FastFieldsWriter; use std::clone::Clone; use std::sync::mpsc; diff --git a/src/core/fstmap.rs b/src/datastruct/fstmap.rs similarity index 98% rename from src/core/fstmap.rs rename to src/datastruct/fstmap.rs index f7b9c1ed3..424bda684 100644 --- a/src/core/fstmap.rs +++ b/src/datastruct/fstmap.rs @@ -5,7 +5,8 @@ use std::io::Cursor; use fst; use fst::raw::Fst; use fst::Streamer; -use core::directory::ReadOnlySource; + +use directory::ReadOnlySource; use core::serialize::BinarySerializable; use std::marker::PhantomData; @@ -125,9 +126,8 @@ impl FstMap { #[cfg(test)] mod tests { use super::*; - use core::directory::{RAMDirectory, Directory}; + use directory::{RAMDirectory, Directory}; use std::path::PathBuf; - use fst::Streamer; #[test] diff --git a/src/datastruct/mod.rs b/src/datastruct/mod.rs new file mode 100644 index 000000000..091533637 --- /dev/null +++ b/src/datastruct/mod.rs @@ -0,0 +1,4 @@ +mod fstmap; +pub use self::fstmap::FstMapBuilder; +pub use self::fstmap::FstMap; +pub use self::fstmap::FstMapIter; diff --git a/src/directory/directory.rs b/src/directory/directory.rs new file mode 100644 index 000000000..eca3692ec --- /dev/null +++ b/src/directory/directory.rs @@ -0,0 +1,23 @@ +use std::marker::Send; +use std::marker::Sync; +use std::io; +use std::fmt; +use std::path::Path; +use directory::{ReadOnlySource, WritePtr}; + +/////////////////////////////////////////////////////////////// +// +// #[derive(Debug)] +// pub enum CreateError { +// RootDirectoryDoesNotExist, +// DirectoryAlreadyExists, +// CannotCreateTempDirectory(io::Error), +// } + +pub trait Directory: fmt::Debug + Send + Sync { +fn open_read(&self, path: &Path) -> io::Result; + fn open_write(&mut self, path: &Path) -> io::Result; + fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()>; + fn sync(&self, path: &Path) -> io::Result<()>; + fn sync_directory(&self,) -> io::Result<()>; +} diff --git a/src/directory/mmap_directory.rs b/src/directory/mmap_directory.rs new file mode 100644 index 000000000..1187ce31a --- /dev/null +++ b/src/directory/mmap_directory.rs @@ -0,0 +1,99 @@ +use std::path::{Path, PathBuf}; +use tempdir::TempDir; +use std::collections::HashMap; +use std::collections::hash_map::Entry as HashMapEntry; +use fst::raw::MmapReadOnly; +use std::fs::File; +use atomicwrites; +use std::sync::RwLock; +use std::fmt; +use std::io::Write; +use std::io; +use directory::Directory; +use directory::ReadOnlySource; +use directory::WritePtr; +use std::io::BufWriter; + +//////////////////////////////////////////////////////////////// +// MmapDirectory + +pub struct MmapDirectory { + root_path: PathBuf, + mmap_cache: RwLock>, + _temp_directory: Option, +} + +impl fmt::Debug for MmapDirectory { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "MmapDirectory({:?})", self.root_path) + } +} + + +impl MmapDirectory { + + pub fn create_from_tempdir() -> io::Result { + // TODO error management + let tempdir = try!(TempDir::new("index")); + let tempdir_path = PathBuf::from(tempdir.path()); + let directory = MmapDirectory { + root_path: PathBuf::from(tempdir_path), + mmap_cache: RwLock::new(HashMap::new()), + _temp_directory: Some(tempdir) + }; + Ok(directory) + } + + pub fn create(filepath: &Path) -> io::Result { + Ok(MmapDirectory { + root_path: PathBuf::from(filepath), + mmap_cache: RwLock::new(HashMap::new()), + _temp_directory: None + }) + } + + fn resolve_path(&self, relative_path: &Path) -> PathBuf { + self.root_path.join(relative_path) + } + + +} + +impl Directory for MmapDirectory { + fn open_read(&self, path: &Path) -> io::Result { + let full_path = self.resolve_path(path); + let mut mmap_cache = self.mmap_cache.write().unwrap(); + let mmap = match mmap_cache.entry(full_path.clone()) { + HashMapEntry::Occupied(e) => e.get().clone(), + HashMapEntry::Vacant(vacant_entry) => { + let new_mmap = try!(MmapReadOnly::open_path(full_path.clone())); + vacant_entry.insert(new_mmap.clone()); + new_mmap + } + }; + Ok(ReadOnlySource::Mmap(mmap)) + } + fn open_write(&mut self, path: &Path) -> io::Result { + let full_path = self.resolve_path(path); + let file = try!(File::create(full_path)); + let buf_writer = BufWriter::new(file); + Ok(Box::new(buf_writer)) + } + + fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> { + let full_path = self.resolve_path(path); + let meta_file = atomicwrites::AtomicFile::new(full_path, atomicwrites::AllowOverwrite); + meta_file.write(|f| { + f.write_all(data) + }) + } + + fn sync(&self, path: &Path) -> io::Result<()> { + let full_path = self.resolve_path(path); + File::open(&full_path).and_then(|fd| fd.sync_all()) + } + + fn sync_directory(&self,) -> io::Result<()> { + File::open(&self.root_path).and_then(|fd| fd.sync_all()) + } +} diff --git a/src/directory/mod.rs b/src/directory/mod.rs new file mode 100644 index 000000000..80be116fe --- /dev/null +++ b/src/directory/mod.rs @@ -0,0 +1,113 @@ +mod mmap_directory; +mod ram_directory; +mod directory; + +use std::ops::Deref; +use std::io::{Seek, Write, Cursor}; +use fst::raw::MmapReadOnly; + +pub use self::directory::Directory; +pub use self::ram_directory::RAMDirectory; +pub use self::mmap_directory::MmapDirectory; + + +//////////////////////////////////////// +// WritePtr + + +pub trait SeekableWrite: Seek + Write {} +impl SeekableWrite for T {} +pub type WritePtr = Box; + + +//////////////////////////////////////// +// Read only source. + + +pub enum ReadOnlySource { + Mmap(MmapReadOnly), + Anonymous(Vec), +} + +impl Deref for ReadOnlySource { + type Target = [u8]; + + fn deref(&self) -> &[u8] { + self.as_slice() + } +} + +impl ReadOnlySource { + + pub fn len(&self,) -> usize { + self.as_slice().len() + } + + pub fn as_slice(&self,) -> &[u8] { + match *self { + ReadOnlySource::Mmap(ref mmap_read_only) => unsafe { mmap_read_only.as_slice() }, + ReadOnlySource::Anonymous(ref shared_vec) => shared_vec.as_slice(), + } + } + + pub fn cursor<'a>(&'a self) -> Cursor<&'a [u8]> { + Cursor::new(&self.deref()) + } + + pub fn slice(&self, from_offset:usize, to_offset:usize) -> ReadOnlySource { + match *self { + ReadOnlySource::Mmap(ref mmap_read_only) => { + let sliced_mmap = mmap_read_only.range(from_offset, to_offset - from_offset); + ReadOnlySource::Mmap(sliced_mmap) + } + ReadOnlySource::Anonymous(ref shared_vec) => { + let sliced_data: Vec = Vec::from(&shared_vec[from_offset..to_offset]); + ReadOnlySource::Anonymous(sliced_data) + }, + } + } +} + +impl Clone for ReadOnlySource { + fn clone(&self) -> Self { + self.slice(0, self.len()) + } +} + + +#[cfg(test)] +mod tests { + + use super::*; + use std::path::Path; + + #[test] + fn test_ram_directory() { + let mut ram_directory = RAMDirectory::create(); + test_directory(&mut ram_directory); + } + + #[test] + fn test_mmap_directory() { + let mut mmap_directory = MmapDirectory::create_from_tempdir().unwrap(); + test_directory(&mut mmap_directory); + } + + fn test_directory(directory: &mut Directory) { + { + let mut write_file = directory.open_write(Path::new("toto")).unwrap(); + write_file.write_all(&[4]).unwrap(); + write_file.write_all(&[3]).unwrap(); + write_file.write_all(&[7,3,5]).unwrap(); + } + let read_file = directory.open_read(Path::new("toto")).unwrap(); + let data: &[u8] = &*read_file; + assert_eq!(data.len(), 5); + assert_eq!(data[0], 4); + assert_eq!(data[1], 3); + assert_eq!(data[2], 7); + assert_eq!(data[3], 3); + assert_eq!(data[4], 5); + } + +} diff --git a/src/directory/ram_directory.rs b/src/directory/ram_directory.rs new file mode 100644 index 000000000..a467f27ce --- /dev/null +++ b/src/directory/ram_directory.rs @@ -0,0 +1,87 @@ +use directory::{Directory, ReadOnlySource}; +use std::io::{Cursor, Write, Seek, SeekFrom}; +use std::io; +use atomicwrites; +use std::fmt; +use std::sync::{Arc, RwLock}; +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use directory::WritePtr; + +#[derive(Clone)] +struct SharedVec(Arc>>>); + + +pub struct RAMDirectory { + fs: HashMap, +} + +impl SharedVec { + fn new() -> SharedVec { + SharedVec(Arc::new( RwLock::new(Cursor::new(Vec::new())) )) + } +} + +impl Write for SharedVec { + fn write(&mut self, buf: &[u8]) -> io::Result { + try!(self.0.write().unwrap().write(buf)); + Ok(buf.len()) + } + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +impl Seek for SharedVec { + fn seek(&mut self, pos: SeekFrom) -> io::Result { + self.0.write().unwrap().seek(pos) + } +} + +impl fmt::Debug for RAMDirectory { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "RAMDirectory") + } +} + +impl RAMDirectory { + pub fn create() -> RAMDirectory { + RAMDirectory { + fs: HashMap::new() + } + } +} + +impl Directory for RAMDirectory { + fn open_read(&self, path: &Path) -> io::Result { + match self.fs.get(path) { + Some(ref data) => { + let data_copy = (*data).0.read().unwrap().clone(); + Ok(ReadOnlySource::Anonymous(data_copy.into_inner())) + }, + None => + Err(io::Error::new(io::ErrorKind::NotFound, format!("File has never been created. {:?}", path))) + } + } + fn open_write(&mut self, path: &Path) -> io::Result { + let full_path = PathBuf::from(&path); + let data = SharedVec::new(); + self.fs.insert(full_path, data.clone()); + Ok(Box::new(data)) + } + + fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> { + let meta_file = atomicwrites::AtomicFile::new(PathBuf::from(path), atomicwrites::AllowOverwrite); + meta_file.write(|f| { + f.write_all(data) + }) + } + + fn sync(&self, _: &Path) -> io::Result<()> { + Ok(()) + } + + fn sync_directory(&self,) -> io::Result<()> { + Ok(()) + } +} diff --git a/src/lib.rs b/src/lib.rs index 93735e573..8b7eb3e58 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,19 +28,26 @@ extern crate num_cpus; #[cfg(test)] extern crate rand; mod core; +mod datastruct; +mod postings; +mod directory; +mod compression; +pub use directory::Directory; pub use core::analyzer; -pub use core::directory::Directory; pub use core::searcher::Searcher; pub use core::index::Index; pub use core::schema; pub use core::schema::Term; pub use core::schema::Document; pub use core::collector; -pub use core::schema::DocId; pub use core::reader::SegmentReader; pub use core::searcher::SegmentLocalId; +/// u32 identifying a document within a segment. +/// Document gets their doc id assigned incrementally, +/// as they are added in the segment. +pub type DocId = u32; #[cfg(test)] mod tests { diff --git a/src/postings/mod.rs b/src/postings/mod.rs new file mode 100644 index 000000000..2cfafb5c6 --- /dev/null +++ b/src/postings/mod.rs @@ -0,0 +1,132 @@ +// pub mod postings; +// pub mod schema; +// pub mod directory; +// pub mod writer; +// pub mod analyzer; +// pub mod reader; +// pub mod codec; +// pub mod searcher; +// pub mod collector; +// pub mod serialize; +// pub mod store; +// pub mod simdcompression; +// pub mod fstmap; +// pub mod index; +// pub mod fastfield; +// pub mod fastdivide; +// pub mod merger; +// pub mod timer; + +// use std::error; +// use std::io; + +// pub fn convert_to_ioerror(err: E) -> io::Error { +// io::Error::new( +// io::ErrorKind::InvalidData, +// err +// ) +// } + +mod serializer; +mod writer; +mod term_info; + +use DocId; +pub use self::serializer::PostingsSerializer; +pub use self::writer::PostingsWriter; +pub use self::term_info::TermInfo; + +pub trait Postings: Iterator { + // after skipping position + // the iterator in such a way that the + // next call to next() will return a + // value greater or equal to target. + fn skip_next(&mut self, target: DocId) -> Option; +} + + +#[cfg(test)] +mod tests { + + use super::*; + use DocId; + + + #[derive(Debug)] + pub struct VecPostings { + doc_ids: Vec, + cursor: usize, + } + + impl VecPostings { + pub fn new(vals: Vec) -> VecPostings { + VecPostings { + doc_ids: vals, + cursor: 0, + } + } + } + + impl Postings for VecPostings { + // after skipping position + // the iterator in such a way that the + // next call to next() will return a + // value greater or equal to target. + fn skip_next(&mut self, target: DocId) -> Option { + loop { + match Iterator::next(self) { + Some(val) if val >= target => { + return Some(val); + }, + None => { + return None; + }, + _ => {} + } + } + } + } + + impl Iterator for VecPostings { + type Item = DocId; + fn next(&mut self,) -> Option { + if self.cursor >= self.doc_ids.len() { + None + } + else { + self.cursor += 1; + Some(self.doc_ids[self.cursor - 1]) + } + } + } + + + // use test::Bencher; + // #[test] + // fn test_intersection() { + // { + // let left = VecPostings::new(vec!(1, 3, 9)); + // let right = VecPostings::new(vec!(3, 4, 9, 18)); + // let inter = IntersectionPostings::from_postings(vec!(left, right)); + // let vals: Vec = inter.collect(); + // assert_eq!(vals, vec!(3, 9)); + // } + // { + // let a = VecPostings::new(vec!(1, 3, 9)); + // let b = VecPostings::new(vec!(3, 4, 9, 18)); + // let c = VecPostings::new(vec!(1, 5, 9, 111)); + // let inter = IntersectionPostings::from_postings(vec!(a, b, c)); + // let vals: Vec = inter.collect(); + // assert_eq!(vals, vec!(9)); + // } + // } + // + // #[bench] + // fn bench_single_intersection(b: &mut Bencher) { + // b.iter(|| { + // let docs = VecPostings::new((0..1_000_000).collect()); + // let intersection = IntersectionPostings::from_postings(vec!(docs)); + // intersection.count() + // }); + // } +} diff --git a/src/postings/serializer.rs b/src/postings/serializer.rs new file mode 100644 index 000000000..ba5e0a0d4 --- /dev/null +++ b/src/postings/serializer.rs @@ -0,0 +1,74 @@ +use datastruct::FstMapBuilder; +use super::TermInfo; +use core::schema::Term; +use directory::WritePtr; +use compression; +use DocId; +use core::index::Segment; +use std::io; +use core::index::SegmentComponent; +use core::serialize::BinarySerializable; + +pub struct PostingsSerializer { + terms_fst_builder: FstMapBuilder, // TODO find an alternative to work around the "move" + postings_write: WritePtr, + positions_write: WritePtr, + written_bytes_postings: usize, + written_bytes_positions: usize, + encoder: compression::Encoder, + doc_ids: Vec, +} + +impl PostingsSerializer { + + pub fn open(segment: &Segment) -> io::Result { + let terms_write = try!(segment.open_write(SegmentComponent::TERMS)); + let terms_fst_builder = try!(FstMapBuilder::new(terms_write)); + let postings_write = try!(segment.open_write(SegmentComponent::POSTINGS)); + let positions_write = try!(segment.open_write(SegmentComponent::POSITIONS)); + Ok(PostingsSerializer { + terms_fst_builder: terms_fst_builder, + postings_write: postings_write, + positions_write: positions_write, + written_bytes_postings: 0, + written_bytes_positions: 0, + encoder: compression::Encoder::new(), + doc_ids: Vec::new(), + }) + } + + pub fn new_term(&mut self, term: &Term, doc_freq: DocId) -> io::Result<()> { + try!(self.close_term()); + self.doc_ids.clear(); + let term_info = TermInfo { + doc_freq: doc_freq, + postings_offset: self.written_bytes_postings as u32, + }; + self.terms_fst_builder + .insert(term.as_slice(), &term_info) + } + + pub fn close_term(&mut self,) -> io::Result<()> { + if !self.doc_ids.is_empty() { + let docs_data = self.encoder.encode_sorted(&self.doc_ids); + self.written_bytes_postings += try!((docs_data.len() as u32).serialize(&mut self.postings_write)); + for num in docs_data { + self.written_bytes_postings += try!(num.serialize(&mut self.postings_write)); + } + } + Ok(()) + } + + pub fn write_doc(&mut self, doc_id: DocId, positions: Option<&[u32]>) -> io::Result<()> { + self.doc_ids.push(doc_id); + Ok(()) + } + + + pub fn close(mut self,) -> io::Result<()> { + try!(self.close_term()); + try!(self.terms_fst_builder.finish()); + try!(self.postings_write.flush()); + Ok(()) + } +} diff --git a/src/postings/term_info.rs b/src/postings/term_info.rs new file mode 100644 index 000000000..8dc0728f6 --- /dev/null +++ b/src/postings/term_info.rs @@ -0,0 +1,26 @@ +use core::serialize::BinarySerializable; +use std::io; + +#[derive(Debug,Ord,PartialOrd,Eq,PartialEq,Clone)] +pub struct TermInfo { + pub doc_freq: u32, + pub postings_offset: u32, +} + + +impl BinarySerializable for TermInfo { + fn serialize(&self, writer: &mut io::Write) -> io::Result { + Ok( + try!(self.doc_freq.serialize(writer)) + + try!(self.postings_offset.serialize(writer)) + ) + } + fn deserialize(reader: &mut io::Read) -> io::Result { + let doc_freq = try!(u32::deserialize(reader)); + let offset = try!(u32::deserialize(reader)); + Ok(TermInfo { + doc_freq: doc_freq, + postings_offset: offset, + }) + } +} diff --git a/src/postings/writer.rs b/src/postings/writer.rs new file mode 100644 index 000000000..6a16c3282 --- /dev/null +++ b/src/postings/writer.rs @@ -0,0 +1,141 @@ +use DocId; +use std::collections::BTreeMap; +use core::schema::Term; +use postings::PostingsSerializer; +use std::io; + +pub trait U32sRecorder { + fn new() -> Self; + fn record(&mut self, val: u32); +} + +pub struct VecRecorder(Vec); + +impl U32sRecorder for VecRecorder { + fn new() -> VecRecorder { + VecRecorder(Vec::new()) + } + fn record(&mut self, val: u32) { + self.0.push(val); + } +} + +pub struct ObliviousRecorder; + +impl U32sRecorder for ObliviousRecorder { + fn new() -> ObliviousRecorder { + ObliviousRecorder + } + fn record(&mut self, _: u32) { + } +} + + + + +struct TermPostingsWriter { + doc_ids: Vec, + term_freqs: TermFreqsRec, + positions: PositionsRec, + current_position: u32, + current_freq: u32, +} + +impl TermPostingsWriter { + pub fn new() -> TermPostingsWriter { + TermPostingsWriter { + doc_ids: Vec::new(), + term_freqs: TermFreqsRec::new(), + positions: PositionsRec::new(), + current_position: 0u32, + current_freq: 0u32, + } + } + + fn close_doc(&mut self,) { + self.term_freqs.record(self.current_freq); + self.current_freq = 0; + self.current_position = 0; + } + + fn close(&mut self,) { + if self.current_freq > 0 { + self.close_doc(); + } + } + + fn is_new_doc(&self, doc: &DocId) -> bool { + match self.doc_ids.last() { + Some(&last_doc) => last_doc != *doc, + None => true, + } + } + + pub fn doc_freq(&self) -> u32 { + self.doc_ids.len() as u32 + } + + pub fn suscribe(&mut self, doc: DocId, pos: u32) { + if self.is_new_doc(&doc) { + // this is the first time we meet this term for this document + // first close the previous document, and write its doc_freq. + self.close_doc(); + self.doc_ids.push(doc); + } + self.current_freq += 1; + self.positions.record(pos - self.current_position); + self.current_position = pos; + } +} + + + + + + +pub struct PostingsWriter { + postings: Vec>, + term_index: BTreeMap, +} + +impl PostingsWriter { + + pub fn new() -> PostingsWriter { + PostingsWriter { + postings: Vec::new(), + term_index: BTreeMap::new(), + } + } + + pub fn suscribe(&mut self, doc: DocId, pos: u32, term: Term) { + let doc_ids: &mut TermPostingsWriter = self.get_term_postings(term); + doc_ids.suscribe(doc, pos); + } + + fn get_term_postings(&mut self, term: Term) -> &mut TermPostingsWriter { + match self.term_index.get(&term) { + Some(unord_id) => { + return &mut self.postings[*unord_id]; + }, + None => {} + } + let unord_id = self.term_index.len(); + self.postings.push(TermPostingsWriter::new()); + self.term_index.insert(term, unord_id.clone()); + &mut self.postings[unord_id] + } + + pub fn serialize(&self, serializer: &mut PostingsSerializer) -> io::Result<()> { + for (term, postings_id) in self.term_index.iter() { + let term_postings_writer = &self.postings[postings_id.clone()]; + let term_docfreq = term_postings_writer.doc_freq(); + try!(serializer.new_term(&term, term_docfreq)); + for doc in term_postings_writer.doc_ids.iter() { + try!(serializer.write_doc(doc.clone(), None)); + } + } + Ok(()) + } + + +}