diff --git a/src/directory/directory.rs b/src/directory/directory.rs index 495fb9b38..99396ee90 100644 --- a/src/directory/directory.rs +++ b/src/directory/directory.rs @@ -5,6 +5,7 @@ use directory::error::{FileError, OpenWriteError}; use directory::{ReadOnlySource, WritePtr}; use std::result; use std::io; +use std::marker::Sync; /// Write-once read many (WORM) abstraction for where tantivy's index should be stored. /// @@ -15,7 +16,7 @@ use std::io; /// - The [RAMDirectory](struct.RAMDirectory.html), which /// should be used mostly for tests. /// -pub trait Directory: fmt::Debug + Send + 'static { +pub trait Directory: fmt::Debug + Send + Sync + 'static { /// Opens a virtual file for read. /// diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index d2d485897..814b5d071 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -28,6 +28,7 @@ pub struct IndexWriter { target_num_docs: usize, num_threads: usize, docstamp: u64, + } const PIPELINE_MAX_SIZE_IN_DOCS: usize = 10_000; diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs index c942e5efe..c1d918eb7 100644 --- a/src/indexer/mod.rs +++ b/src/indexer/mod.rs @@ -1,5 +1,5 @@ -pub mod index_writer; +mod index_writer; pub mod segment_serializer; pub mod merger; diff --git a/src/lib.rs b/src/lib.rs index 2a1a5a2b9..e1f1d3ea0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,11 +1,7 @@ /*! -# Creating a new index, adding documents and searching. +Tantivy is a search engine library. -``` - - -``` */ #![feature(binary_heap_extras)] @@ -71,6 +67,7 @@ pub mod schema; pub use directory::Directory; pub use core::searcher::Searcher; pub use core::Index; +pub use indexer::IndexWriter; pub use schema::Term; pub use schema::Document; pub use core::SegmentReader; @@ -158,10 +155,6 @@ mod tests { index_writer.add_document(doc).unwrap(); } assert!(index_writer.commit().is_ok()); - // TODO reenable this test - // let segment = commit_result.unwrap(); - // let segment_reader = SegmentReader::open(segment).unwrap(); - // assert_eq!(segment_reader.max_doc(), 3); } } diff --git a/src/postings/block_appender.rs b/src/postings/block_appender.rs new file mode 100644 index 000000000..6da7c9ee5 --- /dev/null +++ b/src/postings/block_appender.rs @@ -0,0 +1,265 @@ +use compression::NUM_DOCS_PER_BLOCK; +use DocId; + +const BLOCK_SIZE: u32 = NUM_DOCS_PER_BLOCK as u32; + +struct Block { + data: [u32; BLOCK_SIZE as usize], + next: u32, +} + +impl Block { + fn new() -> Block { + Block { + data: [0u32; BLOCK_SIZE as usize], + next: u32::max_value(), + } + } +} + +#[derive(Copy, Clone)] +struct ListInfo { + first: u32, + last: u32, + len: u32, +} + +pub struct BlockStore { + lists: Vec, + blocks: Vec, + free_blocks: Vec, +} + +impl BlockStore { + pub fn allocate(num_blocks: usize) -> BlockStore { + BlockStore { + lists: Vec::with_capacity(1_000_000), + blocks: (0 .. num_blocks).map(|_| Block::new()).collect(), + free_blocks: (0u32 .. num_blocks as u32).collect() + } + } + + fn new_list(&mut self, first_el: u32) -> u32 { + let res = self.lists.len() as u32; + let new_block_id = self.new_block().unwrap(); + self.blocks[new_block_id as usize].data[0] = first_el; + self.lists.push(ListInfo { + first: new_block_id, + last: new_block_id, + len: 1, + }); + res + } + + fn new_block(&mut self,) -> Option { + self.free_blocks.pop() + .map(|block_id| { + self.blocks[block_id as usize].next = u32::max_value(); + block_id + }) + } + + fn get_list_info(&mut self, list_id: u32) -> &mut ListInfo { + &mut self.lists[list_id as usize] + } + + + fn block_id_to_append(&mut self, list_id: u32) -> u32 { + let list_info: ListInfo = self.lists[list_id as usize]; + // get_list_info(list_id).len % BLOCK_SIZE == 0; + // let new_block_required: bool = self.get_list_info(list_id).len % BLOCK_SIZE == 0; + if list_info.len % BLOCK_SIZE == 0 { + // we need to add a fresh new block. + let new_block_id: u32 = { self.new_block().unwrap() }; + let last_block_id: usize; + { + // update the list info. + let list_info: &mut ListInfo = self.get_list_info(list_id); + last_block_id = list_info.last as usize; + list_info.last = new_block_id; + } + self.blocks[last_block_id].next = new_block_id; + new_block_id + } + else { + list_info.last + } + } + + pub fn push(&mut self, list_id: u32, val: u32) { + let new_block_required: bool = self.get_list_info(list_id).len % BLOCK_SIZE == 0; + let block_id: u32 = self.block_id_to_append(list_id); + let list_len: u32; + { + let list_info: &mut ListInfo = self.get_list_info(list_id); + list_len = list_info.len; + list_info.len += 1u32; + } + self.blocks[block_id as usize].data[(list_len % BLOCK_SIZE) as usize] = val; + } + + pub fn iter_list(&self, list_id: u32) -> BlockIterator { + let list_info = &self.lists[list_id as usize]; + BlockIterator { + current_block: &self.blocks[list_info.first as usize], + blocks: &self.blocks, + cursor: 0, + len: list_info.len as usize, + } + } +} + + +pub struct BlockIterator<'a> { + current_block: &'a Block, + blocks: &'a [Block], + cursor: usize, + len: usize, +} + + +impl<'a> Iterator for BlockIterator<'a> { + + type Item = u32; + + fn next(&mut self) -> Option { + if self.cursor == self.len { + None + } + else { + let res = self.current_block.data[self.cursor % (BLOCK_SIZE as usize)]; + self.cursor += 1; + if self.cursor % (BLOCK_SIZE as usize) == 0 { + self.current_block = &self.blocks[self.current_block.next as usize]; + } + Some(res) + } + + } +} + + + + + +pub struct BlockAppender { + blocks: Vec>, + doc_freq: usize, +} + +impl BlockAppender { + + pub fn new() -> BlockAppender { + BlockAppender { + blocks: Vec::new(), + doc_freq: 0, + } + } + + pub fn push(&mut self, doc_id: DocId) { + if self.doc_freq % NUM_DOCS_PER_BLOCK == 0 { + self.blocks.push(Box::new([0u32; NUM_DOCS_PER_BLOCK ])); + } + self.blocks[self.doc_freq / NUM_DOCS_PER_BLOCK][self.doc_freq % NUM_DOCS_PER_BLOCK] = doc_id; + self.doc_freq += 1; + } + + pub fn last(&self) -> Option { + if self.doc_freq == 0 { + return None + } + else { + Some(self.get(self.doc_freq - 1)) + } + } + + pub fn len(&self,) -> usize { + self.doc_freq + } + + + pub fn get(&self, cursor: usize) -> DocId { + self.blocks[cursor / NUM_DOCS_PER_BLOCK][cursor % NUM_DOCS_PER_BLOCK] + } + + + pub fn iter(&self,) -> IterBlockAppender { + IterBlockAppender { + cursor: 0, + block_appender: &self, + } + } +} + + + +pub struct IterBlockAppender<'a> { + cursor: usize, + block_appender: &'a BlockAppender, +} + + +impl<'a> Iterator for IterBlockAppender<'a> { + + type Item = DocId; + + fn next(&mut self) -> Option { + if self.cursor == self.block_appender.doc_freq { + return None + } + else { + let res = self.block_appender.get(self.cursor); + self.cursor += 1; + Some(res) + } + + } +} + + + + + + +#[cfg(test)] +mod tests { + + + use super::*; + + + #[test] + pub fn test_block_store() { + let mut block_store = BlockStore::allocate(1_000); + let list_2 = block_store.new_list(0); + let list_3 = block_store.new_list(0); + let list_4 = block_store.new_list(0); + let list_5 = block_store.new_list(0); + for i in 1 .. 2_000 { + block_store.push(list_2, i * 2); + block_store.push(list_3, i * 3); + } + for i in 1 .. 10 { + block_store.push(list_4, i * 4); + block_store.push(list_5, i * 5); + } + + let mut list2_iter = block_store.iter_list(list_2); + let mut list3_iter = block_store.iter_list(list_3); + let mut list4_iter = block_store.iter_list(list_4); + let mut list5_iter = block_store.iter_list(list_5); + for i in 0 .. 2_000 { + assert_eq!(list2_iter.next().unwrap(), i * 2); + assert_eq!(list3_iter.next().unwrap(), i * 3); + + } + assert!(list2_iter.next().is_none()); + assert!(list3_iter.next().is_none()); + for i in 0 .. 10 { + assert_eq!(list4_iter.next().unwrap(), i * 4); + assert_eq!(list5_iter.next().unwrap(), i * 5); + } + assert!(list4_iter.next().is_none()); + assert!(list5_iter.next().is_none()); + } +} \ No newline at end of file diff --git a/src/postings/mod.rs b/src/postings/mod.rs index be97d5719..d7d0084df 100644 --- a/src/postings/mod.rs +++ b/src/postings/mod.rs @@ -1,7 +1,7 @@ mod postings; mod recorder; mod serializer; -mod writer; +mod postings_writer; mod term_info; mod chained_postings; mod vec_postings; @@ -12,13 +12,14 @@ mod freq_handler; mod docset; mod scored_docset; mod segment_postings_option; +mod block_appender; pub use self::docset::{SkipResult, DocSet}; pub use self::offset_postings::OffsetPostings; pub use self::recorder::{Recorder, NothingRecorder, TermFrequencyRecorder, TFAndPositionRecorder}; pub use self::serializer::PostingsSerializer; -pub use self::writer::PostingsWriter; -pub use self::writer::SpecializedPostingsWriter; +pub use self::postings_writer::PostingsWriter; +pub use self::postings_writer::SpecializedPostingsWriter; pub use self::term_info::TermInfo; pub use self::postings::Postings; pub use self::vec_postings::VecPostings; diff --git a/src/postings/writer.rs b/src/postings/postings_writer.rs similarity index 96% rename from src/postings/writer.rs rename to src/postings/postings_writer.rs index 1726b9c10..933b6d3a8 100644 --- a/src/postings/writer.rs +++ b/src/postings/postings_writer.rs @@ -4,17 +4,18 @@ use schema::Term; use postings::PostingsSerializer; use std::io; use postings::Recorder; +use postings::block_appender::BlockAppender; struct TermPostingsWriter { - doc_ids: Vec, + doc_ids: BlockAppender, recorder: Rec, } impl TermPostingsWriter { pub fn new() -> TermPostingsWriter { TermPostingsWriter { - doc_ids: Vec::new(), + doc_ids: BlockAppender::new(), recorder: Recorder::new(), } } @@ -29,7 +30,7 @@ impl TermPostingsWriter { pub fn suscribe(&mut self, doc: DocId, pos: u32) { match self.doc_ids.last() { - Some(&last_doc) => { + Some(last_doc) => { if last_doc != doc { self.close_doc(); self.doc_ids.push(doc); diff --git a/src/schema/document.rs b/src/schema/document.rs index 02237374a..c86ebea25 100644 --- a/src/schema/document.rs +++ b/src/schema/document.rs @@ -12,7 +12,7 @@ use itertools::Itertools; /// Documents are really just a list of couple `(field, value)`. /// In this list, one field may appear more than once. -#[derive(Debug)] +#[derive(Debug, RustcEncodable, RustcDecodable)] pub struct Document { field_values: Vec, } diff --git a/src/schema/field.rs b/src/schema/field.rs index c4abfbd3b..b43be36ce 100644 --- a/src/schema/field.rs +++ b/src/schema/field.rs @@ -3,7 +3,7 @@ use std::io::Write; use std::io::Read; use common::BinarySerializable; -#[derive(Copy,Clone,Debug,PartialEq,PartialOrd,Eq,Ord,Hash)] +#[derive(Copy,Clone,Debug,PartialEq,PartialOrd,Eq,Ord,Hash, RustcEncodable, RustcDecodable)] pub struct Field(pub u8); impl BinarySerializable for Field { diff --git a/src/schema/field_value.rs b/src/schema/field_value.rs index 197e999df..71d5693d6 100644 --- a/src/schema/field_value.rs +++ b/src/schema/field_value.rs @@ -6,7 +6,7 @@ use schema::Field; use schema::Value; -#[derive(Debug, Clone, Ord, PartialEq, Eq, PartialOrd)] +#[derive(Debug, Clone, Ord, PartialEq, Eq, PartialOrd, RustcEncodable, RustcDecodable)] pub struct FieldValue { pub field: Field, pub value: Value, diff --git a/src/schema/value.rs b/src/schema/value.rs index e5aa154bf..354934799 100644 --- a/src/schema/value.rs +++ b/src/schema/value.rs @@ -4,7 +4,7 @@ use std::io; use std::io::Write; use std::io::Read; -#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)] +#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, RustcEncodable, RustcDecodable)] pub enum Value { Str(String), U32(u32),