diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 165e090fa..a3bedf9c8 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -26,7 +26,6 @@ pub struct IndexWriter { segment_ready_receiver: chan::Receiver>, document_receiver: chan::Receiver, document_sender: chan::Sender, - target_num_docs: usize, num_threads: usize, docstamp: u64, @@ -42,6 +41,11 @@ fn index_documents(block_store: &mut BlockStore, let mut segment_writer = try!(SegmentWriter::for_segment(block_store, segment, &schema)); for doc in document_iterator { try!(segment_writer.add_document(&doc, &schema)); + if segment_writer.is_buffer_full() { + println!("no more space committing."); + println!("seg max doc {}", segment_writer.max_doc()); + break; + } } let num_docs = segment_writer.max_doc() as usize; try!(segment_writer.finalize()); @@ -55,25 +59,19 @@ impl IndexWriter { /// Spawns a new worker thread for indexing. /// The thread consumes documents from the pipeline. /// - /// When target_num_docs is reached, or when the channel - /// is closed, the worker flushes its current segment to disc, - /// and sends its segment_id through the channel. - /// fn add_indexing_worker(&mut self,) -> Result<()> { let index = self.index.clone(); let schema = self.index.schema(); let segment_ready_sender_clone = self.segment_ready_sender.clone(); let document_receiver_clone = self.document_receiver.clone(); - let target_num_docs = self.target_num_docs; let join_handle: JoinHandle<()> = thread::spawn(move || { - let mut block_store = BlockStore::allocate(500_000); + let mut block_store = BlockStore::allocate(1_000_000); loop { let segment = index.new_segment(); let segment_id = segment.id(); let mut document_iterator = document_receiver_clone .clone() .into_iter() - .take(target_num_docs) .peekable(); // the peeking here is to avoid // creating a new segment's files @@ -105,7 +103,6 @@ impl IndexWriter { segment_ready_sender: segment_ready_sender, document_receiver: document_receiver, document_sender: document_sender, - target_num_docs: 100_000, workers_join_handle: Vec::new(), num_threads: num_threads, docstamp: try!(index.docstamp()), diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index 96fdec8fc..5a1499548 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -63,6 +63,7 @@ fn posting_from_field_entry(field_entry: &FieldEntry) -> Box { } + impl<'a> SegmentWriter<'a> { pub fn for_segment(block_store: &'a mut BlockStore, mut segment: Segment, schema: &Schema) -> Result> { @@ -103,7 +104,11 @@ impl<'a> SegmentWriter<'a> { segment_info, self.segment_serializer) } - + + pub fn is_buffer_full(&self,) -> bool { + self.block_store.num_free_blocks() < 1000 + } + pub fn add_document(&mut self, doc: &Document, schema: &Schema) -> io::Result<()> { let doc_id = self.max_doc; for (field, field_values) in doc.get_sorted_fields() { diff --git a/src/postings/block_store.rs b/src/postings/block_store.rs index 76ca90d7a..3ffe86ff3 100644 --- a/src/postings/block_store.rs +++ b/src/postings/block_store.rs @@ -36,6 +36,10 @@ impl BlockStore { } } + pub fn num_free_blocks(&self) -> usize { + self.blocks.len() - self.free_block_id + } + pub fn new_list(&mut self) -> u32 { let res = self.lists.len() as u32; let new_block_id = self.new_block().unwrap(); @@ -128,11 +132,18 @@ impl<'a> Iterator for BlockIterator<'a> { None } else { + if self.cursor % (BLOCK_SIZE as usize) == 0 { + if self.cursor != 0 { + if self.current_block.next != u32::max_value() { + self.current_block = &self.blocks[self.current_block.next as usize]; + } + else { + panic!("Block linked list ended prematurely."); + } + } + } let res = self.current_block.data[self.cursor % (BLOCK_SIZE as usize)]; self.cursor += 1; - if self.cursor % (BLOCK_SIZE as usize) == 0 { - self.current_block = &self.blocks[self.current_block.next as usize]; - } Some(res) } diff --git a/src/postings/mod.rs b/src/postings/mod.rs index bd3e75a80..f84800a09 100644 --- a/src/postings/mod.rs +++ b/src/postings/mod.rs @@ -167,23 +167,3 @@ mod tests { } } - - - -// #[cfg(test)] -// mod tests { - -// use super::*; -// use test::Bencher; - -// -// #[bench] -// fn bench_single_intersection(b: &mut Bencher) { -// b.iter(|| { -// let docs = VecPostings::new((0..1_000_000).collect()); -// let intersection = IntersectionDocSet::from_postings(vec!(docs)); -// intersection.count() -// }); -// } -// } -// \ No newline at end of file diff --git a/src/postings/postings_writer.rs b/src/postings/postings_writer.rs index 2f1a176b7..3e70cb1a7 100644 --- a/src/postings/postings_writer.rs +++ b/src/postings/postings_writer.rs @@ -23,11 +23,9 @@ pub struct SpecializedPostingsWriter { fn get_or_create_recorder<'a, Rec: Recorder>(term: Term, term_index: &'a mut HashMap, block_store: &mut BlockStore) -> &'a mut Rec { if term_index.contains_key(&term) { - println!("recorder here"); term_index.get_mut(&term).unwrap() } else { - println!("adding recorder"); let recorder = Rec::new(block_store); term_index .entry(term)