From 346fc31ac23dfe103b3278b325fd1ca35f9cdefe Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Tue, 13 Sep 2016 11:01:02 +0900 Subject: [PATCH] Chaining heaps. We commit close segments when the indexer heap is close to its capacity. (currently we use a limit of 10_000_000). Because we do this check before indexing a document, and before also because serialization starts by closing the postingswriter, and therefore all of the recorders open for the last document, we may still overflow the heap. We don't want to resize the heap because we may have references to objects in the current heap. Because of that, heap are actually chained list. In an ideal settings, the limit should work fine and this overflow behavior should never be activated. --- src/datastruct/stacker/heap.rs | 123 +++++++++++++++++++++++---------- src/indexer/index_writer.rs | 31 ++++++--- src/indexer/merger.rs | 4 +- src/indexer/segment_writer.rs | 4 +- src/lib.rs | 12 ++-- 5 files changed, 117 insertions(+), 57 deletions(-) diff --git a/src/datastruct/stacker/heap.rs b/src/datastruct/stacker/heap.rs index 05df522f2..ac5ee5bd8 100644 --- a/src/datastruct/stacker/heap.rs +++ b/src/datastruct/stacker/heap.rs @@ -9,10 +9,7 @@ pub struct BytesRef { pub stop: u32, } -struct InnerHeap { - buffer: Vec, - used: u32, -} + pub struct Heap { inner: UnsafeCell, @@ -43,8 +40,8 @@ impl Heap { self.inner().len() } - pub fn free(&self,) -> u32 { - self.inner().free() + pub fn num_free_bytes(&self,) -> u32 { + self.inner().num_free_bytes() } pub fn allocate(&self, num_bytes: usize) -> u32 { @@ -63,7 +60,7 @@ impl Heap { } pub fn get_slice(&self, bytes_ref: BytesRef) -> &[u8] { - self.inner().get_slice(bytes_ref) + self.inner().get_slice(bytes_ref.start, bytes_ref.stop) } pub fn set(&self, addr: u32, val: &Item) { @@ -76,17 +73,27 @@ impl Heap { } +struct InnerHeap { + buffer: Vec, + buffer_len: u32, + used: u32, + next_heap: Option>, +} + impl InnerHeap { pub fn with_capacity(num_bytes: usize) -> InnerHeap { InnerHeap { buffer: iter::repeat(0u8).take(num_bytes).collect(), + buffer_len: num_bytes as u32, + next_heap: None, used: 0u32, } } pub fn clear(&mut self) { self.used = 0u32; + self.next_heap = None; } pub fn capacity(&self,) -> u32 { @@ -96,56 +103,98 @@ impl InnerHeap { pub fn len(&self,) -> u32 { self.used } - - pub fn free(&self,) -> u32 { - (self.buffer.len() as u32) - self.used + + // Returns the number of free bytes. If the buffer + // has reached it's capacity and overflowed to another buffer, return 0. + pub fn num_free_bytes(&self,) -> u32 { + if self.next_heap.is_some() { + 0u32 + } + else { + self.buffer_len - self.used + } } pub fn allocate(&mut self, num_bytes: usize) -> u32 { let addr = self.used; self.used += num_bytes as u32; - let len_buffer = self.buffer.len(); - if self.used > len_buffer as u32 { - // TODO fix resizable heap - panic!("Resizing heap is not working"); - // self.buffer.resize((self.used * 2u32) as usize, 0u8); + if self.used <= self.buffer_len { + addr + } + else { + if self.next_heap.is_none() { + println!("exceed heap size"); + self.next_heap = Some(Box::new(InnerHeap::with_capacity(self.buffer_len as usize))); + } + self.next_heap.as_mut().unwrap().allocate(num_bytes) + self.buffer_len + } + + + } + + fn get_slice(&self, start: u32, stop: u32) -> &[u8] { + if start >= self.buffer_len { + self.next_heap.as_ref().unwrap().get_slice(start - self.buffer_len, stop - self.buffer_len) + } + else { + &self.buffer[start as usize..stop as usize] + } + } + + fn get_mut_slice(&mut self, start: u32, stop: u32) -> &mut [u8] { + if start >= self.buffer_len { + self.next_heap.as_mut().unwrap().get_mut_slice(start - self.buffer_len, stop - self.buffer_len) + } + else { + &mut self.buffer[start as usize..stop as usize] } - addr } - pub fn allocate_and_set(&mut self, data: &[u8]) -> BytesRef { - let start = self.allocate(data.len()) as usize; - let stop = start + data.len(); - &mut self.buffer[start..stop].clone_from_slice(data); + fn allocate_and_set(&mut self, data: &[u8]) -> BytesRef { + let start = self.allocate(data.len()); + let stop = start + data.len() as u32; + self.get_mut_slice(start, stop).clone_from_slice(data); BytesRef { start: start as u32, stop: stop as u32, } } - pub fn get_mut(&mut self, addr: u32) -> *mut u8 { - let addr_usize = addr as isize; - debug_assert!(addr < self.used); - unsafe { self.buffer.as_mut_ptr().offset(addr_usize) } + fn get_mut(&mut self, addr: u32) -> *mut u8 { + if addr >= self.buffer_len { + self.next_heap.as_mut().unwrap().get_mut(addr - self.buffer_len) + } + else { + let addr_isize = addr as isize; + unsafe { self.buffer.as_mut_ptr().offset(addr_isize) } + } } - pub fn get_slice(&self, bytes_ref: BytesRef) -> &[u8] { - &self.buffer[bytes_ref.start as usize .. bytes_ref.stop as usize] - } - pub fn get_mut_ref(&mut self, addr: u32) -> &mut Item { - let v_ptr_u8 = self.get_mut(addr) as *mut u8; - let v_ptr = v_ptr_u8 as *mut Item; - unsafe { &mut *v_ptr } + + fn get_mut_ref(&mut self, addr: u32) -> &mut Item { + if addr >= self.buffer_len { + self.next_heap.as_mut().unwrap().get_mut_ref(addr - self.buffer_len) + } + else { + let v_ptr_u8 = self.get_mut(addr) as *mut u8; + let v_ptr = v_ptr_u8 as *mut Item; + unsafe { &mut *v_ptr } + } } pub fn set(&mut self, addr: u32, val: &Item) { - let v_ptr: *const Item = val as *const Item; - let v_ptr_u8: *const u8 = v_ptr as *const u8; - debug_assert!(addr + mem::size_of::() as u32 <= self.used); - unsafe { - let dest_ptr: *mut u8 = self.get_mut(addr); - ptr::copy(v_ptr_u8, dest_ptr, mem::size_of::()); + if addr >= self.buffer_len { + self.next_heap.as_mut().unwrap().set(addr - self.buffer_len, val); + } + else { + let v_ptr: *const Item = val as *const Item; + let v_ptr_u8: *const u8 = v_ptr as *const u8; + debug_assert!(addr + mem::size_of::() as u32 <= self.used); + unsafe { + let dest_ptr: *mut u8 = self.get_mut(addr); + ptr::copy(v_ptr_u8, dest_ptr, mem::size_of::()); + } } } } \ No newline at end of file diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 41ee7b83e..1cce1c5b1 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -19,9 +19,19 @@ use chan; use Result; use Error; +// Size of the margin for the heap. A segment is closed when the remaining memory +// in the heap goes below MARGIN_IN_BYTES. +pub const MARGIN_IN_BYTES: u32 = 10_000_000u32; + +// We impose the memory per thread to be at least 30 MB. +pub const HEAP_SIZE_LIMIT: u32 = MARGIN_IN_BYTES * 3u32; + +// Add document will block if the number of docs waiting in the queue to be indexed reaches PIPELINE_MAX_SIZE_IN_DOCS +const PIPELINE_MAX_SIZE_IN_DOCS: usize = 10_000; + pub struct IndexWriter { - heap_size_in_bytes: usize, - index: Index, + index: Index, + heap_size_in_bytes_per_thread: usize, workers_join_handle: Vec>, segment_ready_sender: chan::Sender>, segment_ready_receiver: chan::Receiver>, @@ -29,10 +39,8 @@ pub struct IndexWriter { document_sender: chan::Sender, num_threads: usize, docstamp: u64, - } -const PIPELINE_MAX_SIZE_IN_DOCS: usize = 10_000; fn index_documents(heap: &mut Heap, segment: Segment, @@ -65,9 +73,9 @@ impl IndexWriter { let schema = self.index.schema(); let segment_ready_sender_clone = self.segment_ready_sender.clone(); let document_receiver_clone = self.document_receiver.clone(); - let heap_size_in_bytes = self.heap_size_in_bytes; - let join_handle: JoinHandle<()> = thread::spawn(move || { - let mut heap = Heap::with_capacity(heap_size_in_bytes); + + let mut heap = Heap::with_capacity(self.heap_size_in_bytes_per_thread); + let join_handle: JoinHandle<()> = thread::spawn(move || { loop { let segment = index.new_segment(); let segment_id = segment.id(); @@ -98,11 +106,14 @@ impl IndexWriter { /// should work at the same time. pub fn open(index: &Index, num_threads: usize, - heap_size_in_bytes: usize) -> Result { + heap_size_in_bytes_per_thread: usize) -> Result { + if heap_size_in_bytes_per_thread <= HEAP_SIZE_LIMIT as usize { + panic!(format!("The heap size per thread needs to be at least {}.", HEAP_SIZE_LIMIT)); + } let (document_sender, document_receiver): (chan::Sender, chan::Receiver) = chan::sync(PIPELINE_MAX_SIZE_IN_DOCS); let (segment_ready_sender, segment_ready_receiver): (chan::Sender>, chan::Receiver>) = chan::async(); let mut index_writer = IndexWriter { - heap_size_in_bytes: heap_size_in_bytes, + heap_size_in_bytes_per_thread: heap_size_in_bytes_per_thread, index: index.clone(), segment_ready_receiver: segment_ready_receiver, segment_ready_sender: segment_ready_sender, @@ -285,7 +296,7 @@ mod tests { { // writing the segment - let mut index_writer = index.writer_with_num_threads(3, 30_000_000).unwrap(); + let mut index_writer = index.writer_with_num_threads(3, 40_000_000).unwrap(); { let mut doc = Document::new(); doc.add_text(text_field, "a"); diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 605a011f0..d81bee5c2 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -292,7 +292,7 @@ mod tests { let index = Index::create_in_ram(schema_builder.build()); { - let mut index_writer = index.writer_with_num_threads(1, 30_000_000).unwrap(); + let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap(); { // writing the segment { @@ -335,7 +335,7 @@ mod tests { } { let segments = index.segments().unwrap(); - let mut index_writer = index.writer_with_num_threads(1, 30_000_000).unwrap(); + let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap(); index_writer.merge(&segments).unwrap(); } { diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index 7b53698df..13b9aa179 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -18,6 +18,7 @@ use postings::SpecializedPostingsWriter; use postings::{NothingRecorder, TermFrequencyRecorder, TFAndPositionRecorder}; use indexer::segment_serializer::SegmentSerializer; use datastruct::stacker::Heap; +use indexer::index_writer::MARGIN_IN_BYTES; pub struct SegmentWriter<'a> { heap: &'a Heap, @@ -101,7 +102,7 @@ impl<'a> SegmentWriter<'a> { } pub fn is_buffer_full(&self,) -> bool { - self.heap.free() <= 10_000_000 + self.heap.num_free_bytes() <= MARGIN_IN_BYTES } pub fn add_document(&mut self, doc: &Document, schema: &Schema) -> io::Result<()> { @@ -137,7 +138,6 @@ impl<'a> SegmentWriter<'a> { } } } - } self.fieldnorms_writer.fill_val_up_to(doc_id); diff --git a/src/lib.rs b/src/lib.rs index 32586cd43..2c0105a93 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -139,7 +139,7 @@ mod tests { let index = Index::create_from_tempdir(schema).unwrap(); { // writing the segment - let mut index_writer = index.writer_with_num_threads(1, 30_000_000).unwrap(); + let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap(); { let mut doc = Document::new(); doc.add_text(text_field, "af b"); @@ -165,7 +165,7 @@ mod tests { let mut schema_builder = SchemaBuilder::new(); let text_field = schema_builder.add_text_field("text", TEXT); let index = Index::create_in_ram(schema_builder.build()); - let mut index_writer = index.writer_with_num_threads(1, 30_000_000).unwrap(); + let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap(); { let mut doc = Document::new(); doc.add_text(text_field, "a b c"); @@ -211,7 +211,7 @@ mod tests { let text_field = schema_builder.add_text_field("text", TEXT); let index = Index::create_in_ram(schema_builder.build()); { - let mut index_writer = index.writer_with_num_threads(1, 30_000_000).unwrap(); + let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap(); { let mut doc = Document::new(); doc.add_text(text_field, "a b c"); @@ -247,7 +247,7 @@ mod tests { let index = Index::create_in_ram(schema); { // writing the segment - let mut index_writer = index.writer_with_num_threads(1, 30_000_000).unwrap(); + let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap(); { let mut doc = Document::new(); doc.add_text(text_field, "af af af bc bc"); @@ -275,7 +275,7 @@ mod tests { { // writing the segment - let mut index_writer = index.writer_with_num_threads(1, 30_000_000).unwrap(); + let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap(); { let mut doc = Document::new(); doc.add_text(text_field, "af af af b"); @@ -344,7 +344,7 @@ mod tests { { // writing the segment - let mut index_writer = index.writer_with_num_threads(1, 30_000_000).unwrap(); + let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap(); { let mut doc = Document::new(); doc.add_text(text_field, "af b");