diff --git a/src/datastruct/stacker/heap.rs b/src/datastruct/stacker/heap.rs index 05df522f2..ac5ee5bd8 100644 --- a/src/datastruct/stacker/heap.rs +++ b/src/datastruct/stacker/heap.rs @@ -9,10 +9,7 @@ pub struct BytesRef { pub stop: u32, } -struct InnerHeap { - buffer: Vec, - used: u32, -} + pub struct Heap { inner: UnsafeCell, @@ -43,8 +40,8 @@ impl Heap { self.inner().len() } - pub fn free(&self,) -> u32 { - self.inner().free() + pub fn num_free_bytes(&self,) -> u32 { + self.inner().num_free_bytes() } pub fn allocate(&self, num_bytes: usize) -> u32 { @@ -63,7 +60,7 @@ impl Heap { } pub fn get_slice(&self, bytes_ref: BytesRef) -> &[u8] { - self.inner().get_slice(bytes_ref) + self.inner().get_slice(bytes_ref.start, bytes_ref.stop) } pub fn set(&self, addr: u32, val: &Item) { @@ -76,17 +73,27 @@ impl Heap { } +struct InnerHeap { + buffer: Vec, + buffer_len: u32, + used: u32, + next_heap: Option>, +} + impl InnerHeap { pub fn with_capacity(num_bytes: usize) -> InnerHeap { InnerHeap { buffer: iter::repeat(0u8).take(num_bytes).collect(), + buffer_len: num_bytes as u32, + next_heap: None, used: 0u32, } } pub fn clear(&mut self) { self.used = 0u32; + self.next_heap = None; } pub fn capacity(&self,) -> u32 { @@ -96,56 +103,98 @@ impl InnerHeap { pub fn len(&self,) -> u32 { self.used } - - pub fn free(&self,) -> u32 { - (self.buffer.len() as u32) - self.used + + // Returns the number of free bytes. If the buffer + // has reached it's capacity and overflowed to another buffer, return 0. + pub fn num_free_bytes(&self,) -> u32 { + if self.next_heap.is_some() { + 0u32 + } + else { + self.buffer_len - self.used + } } pub fn allocate(&mut self, num_bytes: usize) -> u32 { let addr = self.used; self.used += num_bytes as u32; - let len_buffer = self.buffer.len(); - if self.used > len_buffer as u32 { - // TODO fix resizable heap - panic!("Resizing heap is not working"); - // self.buffer.resize((self.used * 2u32) as usize, 0u8); + if self.used <= self.buffer_len { + addr + } + else { + if self.next_heap.is_none() { + println!("exceed heap size"); + self.next_heap = Some(Box::new(InnerHeap::with_capacity(self.buffer_len as usize))); + } + self.next_heap.as_mut().unwrap().allocate(num_bytes) + self.buffer_len + } + + + } + + fn get_slice(&self, start: u32, stop: u32) -> &[u8] { + if start >= self.buffer_len { + self.next_heap.as_ref().unwrap().get_slice(start - self.buffer_len, stop - self.buffer_len) + } + else { + &self.buffer[start as usize..stop as usize] + } + } + + fn get_mut_slice(&mut self, start: u32, stop: u32) -> &mut [u8] { + if start >= self.buffer_len { + self.next_heap.as_mut().unwrap().get_mut_slice(start - self.buffer_len, stop - self.buffer_len) + } + else { + &mut self.buffer[start as usize..stop as usize] } - addr } - pub fn allocate_and_set(&mut self, data: &[u8]) -> BytesRef { - let start = self.allocate(data.len()) as usize; - let stop = start + data.len(); - &mut self.buffer[start..stop].clone_from_slice(data); + fn allocate_and_set(&mut self, data: &[u8]) -> BytesRef { + let start = self.allocate(data.len()); + let stop = start + data.len() as u32; + self.get_mut_slice(start, stop).clone_from_slice(data); BytesRef { start: start as u32, stop: stop as u32, } } - pub fn get_mut(&mut self, addr: u32) -> *mut u8 { - let addr_usize = addr as isize; - debug_assert!(addr < self.used); - unsafe { self.buffer.as_mut_ptr().offset(addr_usize) } + fn get_mut(&mut self, addr: u32) -> *mut u8 { + if addr >= self.buffer_len { + self.next_heap.as_mut().unwrap().get_mut(addr - self.buffer_len) + } + else { + let addr_isize = addr as isize; + unsafe { self.buffer.as_mut_ptr().offset(addr_isize) } + } } - pub fn get_slice(&self, bytes_ref: BytesRef) -> &[u8] { - &self.buffer[bytes_ref.start as usize .. bytes_ref.stop as usize] - } - pub fn get_mut_ref(&mut self, addr: u32) -> &mut Item { - let v_ptr_u8 = self.get_mut(addr) as *mut u8; - let v_ptr = v_ptr_u8 as *mut Item; - unsafe { &mut *v_ptr } + + fn get_mut_ref(&mut self, addr: u32) -> &mut Item { + if addr >= self.buffer_len { + self.next_heap.as_mut().unwrap().get_mut_ref(addr - self.buffer_len) + } + else { + let v_ptr_u8 = self.get_mut(addr) as *mut u8; + let v_ptr = v_ptr_u8 as *mut Item; + unsafe { &mut *v_ptr } + } } pub fn set(&mut self, addr: u32, val: &Item) { - let v_ptr: *const Item = val as *const Item; - let v_ptr_u8: *const u8 = v_ptr as *const u8; - debug_assert!(addr + mem::size_of::() as u32 <= self.used); - unsafe { - let dest_ptr: *mut u8 = self.get_mut(addr); - ptr::copy(v_ptr_u8, dest_ptr, mem::size_of::()); + if addr >= self.buffer_len { + self.next_heap.as_mut().unwrap().set(addr - self.buffer_len, val); + } + else { + let v_ptr: *const Item = val as *const Item; + let v_ptr_u8: *const u8 = v_ptr as *const u8; + debug_assert!(addr + mem::size_of::() as u32 <= self.used); + unsafe { + let dest_ptr: *mut u8 = self.get_mut(addr); + ptr::copy(v_ptr_u8, dest_ptr, mem::size_of::()); + } } } } \ No newline at end of file diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 41ee7b83e..1cce1c5b1 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -19,9 +19,19 @@ use chan; use Result; use Error; +// Size of the margin for the heap. A segment is closed when the remaining memory +// in the heap goes below MARGIN_IN_BYTES. +pub const MARGIN_IN_BYTES: u32 = 10_000_000u32; + +// We impose the memory per thread to be at least 30 MB. +pub const HEAP_SIZE_LIMIT: u32 = MARGIN_IN_BYTES * 3u32; + +// Add document will block if the number of docs waiting in the queue to be indexed reaches PIPELINE_MAX_SIZE_IN_DOCS +const PIPELINE_MAX_SIZE_IN_DOCS: usize = 10_000; + pub struct IndexWriter { - heap_size_in_bytes: usize, - index: Index, + index: Index, + heap_size_in_bytes_per_thread: usize, workers_join_handle: Vec>, segment_ready_sender: chan::Sender>, segment_ready_receiver: chan::Receiver>, @@ -29,10 +39,8 @@ pub struct IndexWriter { document_sender: chan::Sender, num_threads: usize, docstamp: u64, - } -const PIPELINE_MAX_SIZE_IN_DOCS: usize = 10_000; fn index_documents(heap: &mut Heap, segment: Segment, @@ -65,9 +73,9 @@ impl IndexWriter { let schema = self.index.schema(); let segment_ready_sender_clone = self.segment_ready_sender.clone(); let document_receiver_clone = self.document_receiver.clone(); - let heap_size_in_bytes = self.heap_size_in_bytes; - let join_handle: JoinHandle<()> = thread::spawn(move || { - let mut heap = Heap::with_capacity(heap_size_in_bytes); + + let mut heap = Heap::with_capacity(self.heap_size_in_bytes_per_thread); + let join_handle: JoinHandle<()> = thread::spawn(move || { loop { let segment = index.new_segment(); let segment_id = segment.id(); @@ -98,11 +106,14 @@ impl IndexWriter { /// should work at the same time. pub fn open(index: &Index, num_threads: usize, - heap_size_in_bytes: usize) -> Result { + heap_size_in_bytes_per_thread: usize) -> Result { + if heap_size_in_bytes_per_thread <= HEAP_SIZE_LIMIT as usize { + panic!(format!("The heap size per thread needs to be at least {}.", HEAP_SIZE_LIMIT)); + } let (document_sender, document_receiver): (chan::Sender, chan::Receiver) = chan::sync(PIPELINE_MAX_SIZE_IN_DOCS); let (segment_ready_sender, segment_ready_receiver): (chan::Sender>, chan::Receiver>) = chan::async(); let mut index_writer = IndexWriter { - heap_size_in_bytes: heap_size_in_bytes, + heap_size_in_bytes_per_thread: heap_size_in_bytes_per_thread, index: index.clone(), segment_ready_receiver: segment_ready_receiver, segment_ready_sender: segment_ready_sender, @@ -285,7 +296,7 @@ mod tests { { // writing the segment - let mut index_writer = index.writer_with_num_threads(3, 30_000_000).unwrap(); + let mut index_writer = index.writer_with_num_threads(3, 40_000_000).unwrap(); { let mut doc = Document::new(); doc.add_text(text_field, "a"); diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 605a011f0..d81bee5c2 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -292,7 +292,7 @@ mod tests { let index = Index::create_in_ram(schema_builder.build()); { - let mut index_writer = index.writer_with_num_threads(1, 30_000_000).unwrap(); + let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap(); { // writing the segment { @@ -335,7 +335,7 @@ mod tests { } { let segments = index.segments().unwrap(); - let mut index_writer = index.writer_with_num_threads(1, 30_000_000).unwrap(); + let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap(); index_writer.merge(&segments).unwrap(); } { diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index 7b53698df..13b9aa179 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -18,6 +18,7 @@ use postings::SpecializedPostingsWriter; use postings::{NothingRecorder, TermFrequencyRecorder, TFAndPositionRecorder}; use indexer::segment_serializer::SegmentSerializer; use datastruct::stacker::Heap; +use indexer::index_writer::MARGIN_IN_BYTES; pub struct SegmentWriter<'a> { heap: &'a Heap, @@ -101,7 +102,7 @@ impl<'a> SegmentWriter<'a> { } pub fn is_buffer_full(&self,) -> bool { - self.heap.free() <= 10_000_000 + self.heap.num_free_bytes() <= MARGIN_IN_BYTES } pub fn add_document(&mut self, doc: &Document, schema: &Schema) -> io::Result<()> { @@ -137,7 +138,6 @@ impl<'a> SegmentWriter<'a> { } } } - } self.fieldnorms_writer.fill_val_up_to(doc_id); diff --git a/src/lib.rs b/src/lib.rs index 32586cd43..2c0105a93 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -139,7 +139,7 @@ mod tests { let index = Index::create_from_tempdir(schema).unwrap(); { // writing the segment - let mut index_writer = index.writer_with_num_threads(1, 30_000_000).unwrap(); + let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap(); { let mut doc = Document::new(); doc.add_text(text_field, "af b"); @@ -165,7 +165,7 @@ mod tests { let mut schema_builder = SchemaBuilder::new(); let text_field = schema_builder.add_text_field("text", TEXT); let index = Index::create_in_ram(schema_builder.build()); - let mut index_writer = index.writer_with_num_threads(1, 30_000_000).unwrap(); + let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap(); { let mut doc = Document::new(); doc.add_text(text_field, "a b c"); @@ -211,7 +211,7 @@ mod tests { let text_field = schema_builder.add_text_field("text", TEXT); let index = Index::create_in_ram(schema_builder.build()); { - let mut index_writer = index.writer_with_num_threads(1, 30_000_000).unwrap(); + let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap(); { let mut doc = Document::new(); doc.add_text(text_field, "a b c"); @@ -247,7 +247,7 @@ mod tests { let index = Index::create_in_ram(schema); { // writing the segment - let mut index_writer = index.writer_with_num_threads(1, 30_000_000).unwrap(); + let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap(); { let mut doc = Document::new(); doc.add_text(text_field, "af af af bc bc"); @@ -275,7 +275,7 @@ mod tests { { // writing the segment - let mut index_writer = index.writer_with_num_threads(1, 30_000_000).unwrap(); + let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap(); { let mut doc = Document::new(); doc.add_text(text_field, "af af af b"); @@ -344,7 +344,7 @@ mod tests { { // writing the segment - let mut index_writer = index.writer_with_num_threads(1, 30_000_000).unwrap(); + let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap(); { let mut doc = Document::new(); doc.add_text(text_field, "af b");