Chaining heaps.

We commit close segments when the indexer heap is close to its capacity.
(currently we use a limit of 10_000_000).

Because we do this check before indexing a document, and before
also because serialization starts by closing the postingswriter, and
therefore all of the recorders open for the last document, we may still
overflow the heap.

We don't want to resize the heap because we may have references to objects
in the current heap.

Because of that, heap are actually chained list.
In an ideal settings, the limit should work fine and this overflow behavior should
never be activated.
This commit is contained in:
Paul Masurel
2016-09-13 11:01:02 +09:00
parent b911c4dc98
commit 346fc31ac2
5 changed files with 117 additions and 57 deletions

View File

@@ -9,10 +9,7 @@ pub struct BytesRef {
pub stop: u32,
}
struct InnerHeap {
buffer: Vec<u8>,
used: u32,
}
pub struct Heap {
inner: UnsafeCell<InnerHeap>,
@@ -43,8 +40,8 @@ impl Heap {
self.inner().len()
}
pub fn free(&self,) -> u32 {
self.inner().free()
pub fn num_free_bytes(&self,) -> u32 {
self.inner().num_free_bytes()
}
pub fn allocate(&self, num_bytes: usize) -> u32 {
@@ -63,7 +60,7 @@ impl Heap {
}
pub fn get_slice(&self, bytes_ref: BytesRef) -> &[u8] {
self.inner().get_slice(bytes_ref)
self.inner().get_slice(bytes_ref.start, bytes_ref.stop)
}
pub fn set<Item>(&self, addr: u32, val: &Item) {
@@ -76,17 +73,27 @@ impl Heap {
}
struct InnerHeap {
buffer: Vec<u8>,
buffer_len: u32,
used: u32,
next_heap: Option<Box<InnerHeap>>,
}
impl InnerHeap {
pub fn with_capacity(num_bytes: usize) -> InnerHeap {
InnerHeap {
buffer: iter::repeat(0u8).take(num_bytes).collect(),
buffer_len: num_bytes as u32,
next_heap: None,
used: 0u32,
}
}
pub fn clear(&mut self) {
self.used = 0u32;
self.next_heap = None;
}
pub fn capacity(&self,) -> u32 {
@@ -96,56 +103,98 @@ impl InnerHeap {
pub fn len(&self,) -> u32 {
self.used
}
pub fn free(&self,) -> u32 {
(self.buffer.len() as u32) - self.used
// Returns the number of free bytes. If the buffer
// has reached it's capacity and overflowed to another buffer, return 0.
pub fn num_free_bytes(&self,) -> u32 {
if self.next_heap.is_some() {
0u32
}
else {
self.buffer_len - self.used
}
}
pub fn allocate(&mut self, num_bytes: usize) -> u32 {
let addr = self.used;
self.used += num_bytes as u32;
let len_buffer = self.buffer.len();
if self.used > len_buffer as u32 {
// TODO fix resizable heap
panic!("Resizing heap is not working");
// self.buffer.resize((self.used * 2u32) as usize, 0u8);
if self.used <= self.buffer_len {
addr
}
else {
if self.next_heap.is_none() {
println!("exceed heap size");
self.next_heap = Some(Box::new(InnerHeap::with_capacity(self.buffer_len as usize)));
}
self.next_heap.as_mut().unwrap().allocate(num_bytes) + self.buffer_len
}
}
fn get_slice(&self, start: u32, stop: u32) -> &[u8] {
if start >= self.buffer_len {
self.next_heap.as_ref().unwrap().get_slice(start - self.buffer_len, stop - self.buffer_len)
}
else {
&self.buffer[start as usize..stop as usize]
}
}
fn get_mut_slice(&mut self, start: u32, stop: u32) -> &mut [u8] {
if start >= self.buffer_len {
self.next_heap.as_mut().unwrap().get_mut_slice(start - self.buffer_len, stop - self.buffer_len)
}
else {
&mut self.buffer[start as usize..stop as usize]
}
addr
}
pub fn allocate_and_set(&mut self, data: &[u8]) -> BytesRef {
let start = self.allocate(data.len()) as usize;
let stop = start + data.len();
&mut self.buffer[start..stop].clone_from_slice(data);
fn allocate_and_set(&mut self, data: &[u8]) -> BytesRef {
let start = self.allocate(data.len());
let stop = start + data.len() as u32;
self.get_mut_slice(start, stop).clone_from_slice(data);
BytesRef {
start: start as u32,
stop: stop as u32,
}
}
pub fn get_mut(&mut self, addr: u32) -> *mut u8 {
let addr_usize = addr as isize;
debug_assert!(addr < self.used);
unsafe { self.buffer.as_mut_ptr().offset(addr_usize) }
fn get_mut(&mut self, addr: u32) -> *mut u8 {
if addr >= self.buffer_len {
self.next_heap.as_mut().unwrap().get_mut(addr - self.buffer_len)
}
else {
let addr_isize = addr as isize;
unsafe { self.buffer.as_mut_ptr().offset(addr_isize) }
}
}
pub fn get_slice(&self, bytes_ref: BytesRef) -> &[u8] {
&self.buffer[bytes_ref.start as usize .. bytes_ref.stop as usize]
}
pub fn get_mut_ref<Item>(&mut self, addr: u32) -> &mut Item {
let v_ptr_u8 = self.get_mut(addr) as *mut u8;
let v_ptr = v_ptr_u8 as *mut Item;
unsafe { &mut *v_ptr }
fn get_mut_ref<Item>(&mut self, addr: u32) -> &mut Item {
if addr >= self.buffer_len {
self.next_heap.as_mut().unwrap().get_mut_ref(addr - self.buffer_len)
}
else {
let v_ptr_u8 = self.get_mut(addr) as *mut u8;
let v_ptr = v_ptr_u8 as *mut Item;
unsafe { &mut *v_ptr }
}
}
pub fn set<Item>(&mut self, addr: u32, val: &Item) {
let v_ptr: *const Item = val as *const Item;
let v_ptr_u8: *const u8 = v_ptr as *const u8;
debug_assert!(addr + mem::size_of::<Item>() as u32 <= self.used);
unsafe {
let dest_ptr: *mut u8 = self.get_mut(addr);
ptr::copy(v_ptr_u8, dest_ptr, mem::size_of::<Item>());
if addr >= self.buffer_len {
self.next_heap.as_mut().unwrap().set(addr - self.buffer_len, val);
}
else {
let v_ptr: *const Item = val as *const Item;
let v_ptr_u8: *const u8 = v_ptr as *const u8;
debug_assert!(addr + mem::size_of::<Item>() as u32 <= self.used);
unsafe {
let dest_ptr: *mut u8 = self.get_mut(addr);
ptr::copy(v_ptr_u8, dest_ptr, mem::size_of::<Item>());
}
}
}
}

View File

@@ -19,9 +19,19 @@ use chan;
use Result;
use Error;
// Size of the margin for the heap. A segment is closed when the remaining memory
// in the heap goes below MARGIN_IN_BYTES.
pub const MARGIN_IN_BYTES: u32 = 10_000_000u32;
// We impose the memory per thread to be at least 30 MB.
pub const HEAP_SIZE_LIMIT: u32 = MARGIN_IN_BYTES * 3u32;
// Add document will block if the number of docs waiting in the queue to be indexed reaches PIPELINE_MAX_SIZE_IN_DOCS
const PIPELINE_MAX_SIZE_IN_DOCS: usize = 10_000;
pub struct IndexWriter {
heap_size_in_bytes: usize,
index: Index,
index: Index,
heap_size_in_bytes_per_thread: usize,
workers_join_handle: Vec<JoinHandle<()>>,
segment_ready_sender: chan::Sender<Result<(SegmentId, usize)>>,
segment_ready_receiver: chan::Receiver<Result<(SegmentId, usize)>>,
@@ -29,10 +39,8 @@ pub struct IndexWriter {
document_sender: chan::Sender<Document>,
num_threads: usize,
docstamp: u64,
}
const PIPELINE_MAX_SIZE_IN_DOCS: usize = 10_000;
fn index_documents(heap: &mut Heap,
segment: Segment,
@@ -65,9 +73,9 @@ impl IndexWriter {
let schema = self.index.schema();
let segment_ready_sender_clone = self.segment_ready_sender.clone();
let document_receiver_clone = self.document_receiver.clone();
let heap_size_in_bytes = self.heap_size_in_bytes;
let join_handle: JoinHandle<()> = thread::spawn(move || {
let mut heap = Heap::with_capacity(heap_size_in_bytes);
let mut heap = Heap::with_capacity(self.heap_size_in_bytes_per_thread);
let join_handle: JoinHandle<()> = thread::spawn(move || {
loop {
let segment = index.new_segment();
let segment_id = segment.id();
@@ -98,11 +106,14 @@ impl IndexWriter {
/// should work at the same time.
pub fn open(index: &Index,
num_threads: usize,
heap_size_in_bytes: usize) -> Result<IndexWriter> {
heap_size_in_bytes_per_thread: usize) -> Result<IndexWriter> {
if heap_size_in_bytes_per_thread <= HEAP_SIZE_LIMIT as usize {
panic!(format!("The heap size per thread needs to be at least {}.", HEAP_SIZE_LIMIT));
}
let (document_sender, document_receiver): (chan::Sender<Document>, chan::Receiver<Document>) = chan::sync(PIPELINE_MAX_SIZE_IN_DOCS);
let (segment_ready_sender, segment_ready_receiver): (chan::Sender<Result<(SegmentId, usize)>>, chan::Receiver<Result<(SegmentId, usize)>>) = chan::async();
let mut index_writer = IndexWriter {
heap_size_in_bytes: heap_size_in_bytes,
heap_size_in_bytes_per_thread: heap_size_in_bytes_per_thread,
index: index.clone(),
segment_ready_receiver: segment_ready_receiver,
segment_ready_sender: segment_ready_sender,
@@ -285,7 +296,7 @@ mod tests {
{
// writing the segment
let mut index_writer = index.writer_with_num_threads(3, 30_000_000).unwrap();
let mut index_writer = index.writer_with_num_threads(3, 40_000_000).unwrap();
{
let mut doc = Document::new();
doc.add_text(text_field, "a");

View File

@@ -292,7 +292,7 @@ mod tests {
let index = Index::create_in_ram(schema_builder.build());
{
let mut index_writer = index.writer_with_num_threads(1, 30_000_000).unwrap();
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
{
// writing the segment
{
@@ -335,7 +335,7 @@ mod tests {
}
{
let segments = index.segments().unwrap();
let mut index_writer = index.writer_with_num_threads(1, 30_000_000).unwrap();
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
index_writer.merge(&segments).unwrap();
}
{

View File

@@ -18,6 +18,7 @@ use postings::SpecializedPostingsWriter;
use postings::{NothingRecorder, TermFrequencyRecorder, TFAndPositionRecorder};
use indexer::segment_serializer::SegmentSerializer;
use datastruct::stacker::Heap;
use indexer::index_writer::MARGIN_IN_BYTES;
pub struct SegmentWriter<'a> {
heap: &'a Heap,
@@ -101,7 +102,7 @@ impl<'a> SegmentWriter<'a> {
}
pub fn is_buffer_full(&self,) -> bool {
self.heap.free() <= 10_000_000
self.heap.num_free_bytes() <= MARGIN_IN_BYTES
}
pub fn add_document(&mut self, doc: &Document, schema: &Schema) -> io::Result<()> {
@@ -137,7 +138,6 @@ impl<'a> SegmentWriter<'a> {
}
}
}
}
self.fieldnorms_writer.fill_val_up_to(doc_id);

View File

@@ -139,7 +139,7 @@ mod tests {
let index = Index::create_from_tempdir(schema).unwrap();
{
// writing the segment
let mut index_writer = index.writer_with_num_threads(1, 30_000_000).unwrap();
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
{
let mut doc = Document::new();
doc.add_text(text_field, "af b");
@@ -165,7 +165,7 @@ mod tests {
let mut schema_builder = SchemaBuilder::new();
let text_field = schema_builder.add_text_field("text", TEXT);
let index = Index::create_in_ram(schema_builder.build());
let mut index_writer = index.writer_with_num_threads(1, 30_000_000).unwrap();
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
{
let mut doc = Document::new();
doc.add_text(text_field, "a b c");
@@ -211,7 +211,7 @@ mod tests {
let text_field = schema_builder.add_text_field("text", TEXT);
let index = Index::create_in_ram(schema_builder.build());
{
let mut index_writer = index.writer_with_num_threads(1, 30_000_000).unwrap();
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
{
let mut doc = Document::new();
doc.add_text(text_field, "a b c");
@@ -247,7 +247,7 @@ mod tests {
let index = Index::create_in_ram(schema);
{
// writing the segment
let mut index_writer = index.writer_with_num_threads(1, 30_000_000).unwrap();
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
{
let mut doc = Document::new();
doc.add_text(text_field, "af af af bc bc");
@@ -275,7 +275,7 @@ mod tests {
{
// writing the segment
let mut index_writer = index.writer_with_num_threads(1, 30_000_000).unwrap();
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
{
let mut doc = Document::new();
doc.add_text(text_field, "af af af b");
@@ -344,7 +344,7 @@ mod tests {
{
// writing the segment
let mut index_writer = index.writer_with_num_threads(1, 30_000_000).unwrap();
let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap();
{
let mut doc = Document::new();
doc.add_text(text_field, "af b");