diff --git a/libs/neonart/src/algorithm.rs b/libs/neonart/src/algorithm.rs index ddff1c860d..4056dc5031 100644 --- a/libs/neonart/src/algorithm.rs +++ b/libs/neonart/src/algorithm.rs @@ -17,6 +17,7 @@ use crate::{Key, Value}; pub(crate) type RootPtr = node_ptr::NodePtr; +#[derive(Debug)] pub enum ArtError { ConcurrentUpdate, // need to retry OutOfMemory, @@ -34,7 +35,9 @@ impl From for ArtError { } } -pub fn new_root(allocator: &impl ArtAllocator) -> RootPtr { +pub fn new_root( + allocator: &impl ArtAllocator, +) -> Result, OutOfMemoryError> { node_ptr::new_root(allocator) } @@ -80,7 +83,8 @@ pub(crate) fn update_fn<'e, 'g, K: Key, V: Value, A: ArtAllocator, F>( value_fn: F, root: RootPtr, guard: &'g mut TreeWriteGuard<'e, K, V, A>, -) where +) -> Result<(), OutOfMemoryError> +where F: FnOnce(Option<&V>) -> UpdateAction, { let value_fn_cell = std::cell::Cell::new(Some(value_fn)); @@ -99,13 +103,11 @@ pub(crate) fn update_fn<'e, 'g, K: Key, V: Value, A: ArtAllocator, F>( 0, key_bytes, ) { - Ok(()) => break, + Ok(()) => break Ok(()), Err(ArtError::ConcurrentUpdate) => { continue; // retry } - Err(ArtError::OutOfMemory) => { - panic!("todo: OOM: try to GC, propagate to caller"); - } + Err(ArtError::OutOfMemory) => break Err(OutOfMemoryError()), } } } diff --git a/libs/neonart/src/algorithm/node_ptr.rs b/libs/neonart/src/algorithm/node_ptr.rs index 117f5cd7b3..e97e5a7c63 100644 --- a/libs/neonart/src/algorithm/node_ptr.rs +++ b/libs/neonart/src/algorithm/node_ptr.rs @@ -8,6 +8,7 @@ use super::lock_and_version::AtomicLockAndVersion; use crate::Value; use crate::allocator::ArtAllocator; +use crate::allocator::OutOfMemoryError; pub(crate) const MAX_PREFIX_LEN: usize = 8; @@ -300,7 +301,10 @@ impl NodePtr { } } - pub(crate) fn grow(&self, allocator: &impl ArtAllocator) -> NodePtr { + pub(crate) fn grow( + &self, + allocator: &impl ArtAllocator, + ) -> Result, OutOfMemoryError> { let bigger = match self.variant() { NodeVariant::Internal4(n) => n.grow(allocator), NodeVariant::Internal16(n) => n.grow(allocator), @@ -308,22 +312,6 @@ impl NodePtr { NodeVariant::Internal256(_) => panic!("cannot grow Internal256 node"), NodeVariant::Leaf(_) => panic!("cannot grow Leaf node"), }; - /* - let mut key = 0; - loop { - let a = self.find_next_child(key); - let b = bigger.find_next_child(key); - assert_eq!(a, b); - if let Some((akey, _)) = a { - if akey == u8::MAX { - break; - } - key = akey + 1; - } else { - break; - } - } - */ bigger } @@ -357,7 +345,10 @@ impl NodePtr { } } - pub(crate) fn shrink(&self, allocator: &impl ArtAllocator) -> NodePtr { + pub(crate) fn shrink( + &self, + allocator: &impl ArtAllocator, + ) -> Result, OutOfMemoryError> { match self.variant() { NodeVariant::Internal4(_) => panic!("shrink called on internal4 node"), NodeVariant::Internal16(n) => n.shrink(allocator), @@ -398,23 +389,28 @@ impl NodePtr { } } -pub fn new_root(allocator: &impl ArtAllocator) -> NodePtr { +pub fn new_root( + allocator: &impl ArtAllocator, +) -> Result, OutOfMemoryError> { let ptr: *mut NodeInternal256 = allocator.alloc_node_internal256().cast(); if ptr.is_null() { - panic!("out of memory"); + return Err(OutOfMemoryError()); } unsafe { *ptr = NodeInternal256::::new(); } - ptr.into() + Ok(ptr.into()) } -pub fn new_internal(prefix: &[u8], allocator: &impl ArtAllocator) -> NodePtr { +pub fn new_internal( + prefix: &[u8], + allocator: &impl ArtAllocator, +) -> Result, OutOfMemoryError> { let ptr: *mut NodeInternal4 = allocator.alloc_node_internal4().cast(); if ptr.is_null() { - panic!("out of memory"); + return Err(OutOfMemoryError()); } let mut init = NodeInternal4 { tag: NodeTag::Internal4, @@ -430,13 +426,17 @@ pub fn new_internal(prefix: &[u8], allocator: &impl ArtAllocator) - init.prefix[0..prefix.len()].copy_from_slice(prefix); unsafe { ptr.write(init) }; - ptr.into() + Ok(ptr.into()) } -pub fn new_leaf(prefix: &[u8], value: V, allocator: &impl ArtAllocator) -> NodePtr { +pub fn new_leaf( + prefix: &[u8], + value: V, + allocator: &impl ArtAllocator, +) -> Result, OutOfMemoryError> { let ptr: *mut NodeLeaf = allocator.alloc_node_leaf().cast(); if ptr.is_null() { - panic!("out of memory"); + return Err(OutOfMemoryError()); } let mut init = NodeLeaf { tag: NodeTag::Leaf, @@ -450,7 +450,7 @@ pub fn new_leaf(prefix: &[u8], value: V, allocator: &impl ArtAllocator init.prefix[0..prefix.len()].copy_from_slice(prefix); unsafe { ptr.write(init) }; - ptr.into() + Ok(ptr.into()) } impl NodeInternal4 { @@ -549,10 +549,10 @@ impl NodeInternal4 { self.num_children += 1; } - fn grow(&self, allocator: &impl ArtAllocator) -> NodePtr { + fn grow(&self, allocator: &impl ArtAllocator) -> Result, OutOfMemoryError> { let ptr: *mut NodeInternal16 = allocator.alloc_node_internal16().cast(); if ptr.is_null() { - panic!("out of memory"); + return Err(OutOfMemoryError()); } let mut init = NodeInternal16 { tag: NodeTag::Internal16, @@ -570,7 +570,7 @@ impl NodeInternal4 { init.child_ptrs[i] = self.child_ptrs[i]; } unsafe { ptr.write(init) }; - ptr.into() + Ok(ptr.into()) } } @@ -670,10 +670,10 @@ impl NodeInternal16 { self.num_children += 1; } - fn grow(&self, allocator: &impl ArtAllocator) -> NodePtr { + fn grow(&self, allocator: &impl ArtAllocator) -> Result, OutOfMemoryError> { let ptr: *mut NodeInternal48 = allocator.alloc_node_internal48().cast(); if ptr.is_null() { - panic!("out of memory"); + return Err(OutOfMemoryError()); } let mut init = NodeInternal48 { tag: NodeTag::Internal48, @@ -693,14 +693,14 @@ impl NodeInternal16 { } init.validate(); unsafe { ptr.write(init) }; - ptr.into() + Ok(ptr.into()) } - fn shrink(&self, allocator: &impl ArtAllocator) -> NodePtr { + fn shrink(&self, allocator: &impl ArtAllocator) -> Result, OutOfMemoryError> { assert!(self.num_children <= 4); let ptr: *mut NodeInternal4 = allocator.alloc_node_internal4().cast(); if ptr.is_null() { - panic!("out of memory"); + return Err(OutOfMemoryError()); } let mut init = NodeInternal4 { tag: NodeTag::Internal4, @@ -718,7 +718,7 @@ impl NodeInternal16 { init.child_ptrs[i] = self.child_ptrs[i]; } unsafe { ptr.write(init) }; - ptr.into() + Ok(ptr.into()) } } @@ -844,10 +844,10 @@ impl NodeInternal48 { self.validate(); } - fn grow(&self, allocator: &impl ArtAllocator) -> NodePtr { + fn grow(&self, allocator: &impl ArtAllocator) -> Result, OutOfMemoryError> { let ptr: *mut NodeInternal256 = allocator.alloc_node_internal256().cast(); if ptr.is_null() { - panic!("out of memory"); + return Err(OutOfMemoryError()); } let mut init = NodeInternal256 { tag: NodeTag::Internal256, @@ -866,14 +866,14 @@ impl NodeInternal48 { } } unsafe { ptr.write(init) }; - ptr.into() + Ok(ptr.into()) } - fn shrink(&self, allocator: &impl ArtAllocator) -> NodePtr { + fn shrink(&self, allocator: &impl ArtAllocator) -> Result, OutOfMemoryError> { assert!(self.num_children <= 16); let ptr: *mut NodeInternal16 = allocator.alloc_node_internal16().cast(); if ptr.is_null() { - panic!("out of memory"); + return Err(OutOfMemoryError()); } let mut init = NodeInternal16 { tag: NodeTag::Internal16, @@ -897,7 +897,7 @@ impl NodeInternal48 { } assert_eq!(j, self.num_children as usize); unsafe { ptr.write(init) }; - ptr.into() + Ok(ptr.into()) } } @@ -977,11 +977,11 @@ impl NodeInternal256 { self.num_children += 1; } - fn shrink(&self, allocator: &impl ArtAllocator) -> NodePtr { + fn shrink(&self, allocator: &impl ArtAllocator) -> Result, OutOfMemoryError> { assert!(self.num_children <= 48); let ptr: *mut NodeInternal48 = allocator.alloc_node_internal48().cast(); if ptr.is_null() { - panic!("out of memory"); + return Err(OutOfMemoryError()); } let mut init = NodeInternal48 { tag: NodeTag::Internal48, @@ -1004,7 +1004,7 @@ impl NodeInternal256 { } assert_eq!(j as u16, self.num_children); unsafe { ptr.write(init) }; - ptr.into() + Ok(ptr.into()) } } diff --git a/libs/neonart/src/algorithm/node_ref.rs b/libs/neonart/src/algorithm/node_ref.rs index f8fc11c09c..5403aaabdf 100644 --- a/libs/neonart/src/algorithm/node_ref.rs +++ b/libs/neonart/src/algorithm/node_ref.rs @@ -208,8 +208,7 @@ impl<'e, V: Value> WriteLockedNodeRef<'e, V> { where A: ArtAllocator, { - // FIXME: check OOM - let new_node = self.ptr.grow(allocator); + let new_node = self.ptr.grow(allocator)?; Ok(NewNodeRef { ptr: new_node, allocator, @@ -224,8 +223,7 @@ impl<'e, V: Value> WriteLockedNodeRef<'e, V> { where A: ArtAllocator, { - // FIXME: check OOM - let new_node = self.ptr.shrink(allocator); + let new_node = self.ptr.shrink(allocator)?; Ok(NewNodeRef { ptr: new_node, allocator, @@ -328,7 +326,7 @@ where A: ArtAllocator, { Ok(NewNodeRef { - ptr: node_ptr::new_internal(prefix, allocator), + ptr: node_ptr::new_internal(prefix, allocator)?, allocator, extra_nodes: Vec::new(), }) @@ -344,7 +342,7 @@ where A: ArtAllocator, { Ok(NewNodeRef { - ptr: node_ptr::new_leaf(prefix, value, allocator), + ptr: node_ptr::new_leaf(prefix, value, allocator)?, allocator, extra_nodes: Vec::new(), }) diff --git a/libs/neonart/src/allocator.rs b/libs/neonart/src/allocator.rs index cb962fa33f..8568357a2f 100644 --- a/libs/neonart/src/allocator.rs +++ b/libs/neonart/src/allocator.rs @@ -18,6 +18,7 @@ pub use crate::algorithm::node_ptr::{ NodeInternal4, NodeInternal16, NodeInternal48, NodeInternal256, NodeLeaf, }; +#[derive(Debug)] pub struct OutOfMemoryError(); pub trait ArtAllocator { diff --git a/libs/neonart/src/lib.rs b/libs/neonart/src/lib.rs index 154fe0dd1a..ea3527071c 100644 --- a/libs/neonart/src/lib.rs +++ b/libs/neonart/src/lib.rs @@ -140,6 +140,7 @@ mod tests; use allocator::ArtAllocator; pub use allocator::ArtMultiSlabAllocator; +pub use allocator::OutOfMemoryError; /// Fixed-length key type. /// @@ -158,7 +159,11 @@ pub trait Value {} const MAX_GARBAGE: usize = 1024; +/// The root of the tree, plus other tree-wide data. This is stored in the shared memory. pub struct Tree { + /// For simplicity, so that we never need to grow or shrink the root, the root node is always an + /// Internal256 node. Also, it never has a prefix (that's actually a bit wasteful, incurring one + /// indirection to every lookup) root: RootPtr, writer_attached: AtomicBool, @@ -239,7 +244,7 @@ impl<'a, 't: 'a, K: Key, V: Value, A: ArtAllocator> TreeInitStruct<'t, K, V, let tree_ptr = allocator.alloc_tree(); let tree_ptr = NonNull::new(tree_ptr).expect("out of memory"); let init = Tree { - root: algorithm::new_root(allocator), + root: algorithm::new_root(allocator).expect("out of memory"), writer_attached: AtomicBool::new(false), epoch: epoch::EpochShared::new(), }; @@ -351,7 +356,7 @@ impl<'e, K: Key, V: Value, A: ArtAllocator> TreeWriteGuard<'e, K, V, A> { } /// Insert a value - pub fn insert(self, key: &K, value: V) -> Result<(), ()> { + pub fn insert(self, key: &K, value: V) -> Result { let mut success = None; self.update_with_fn(key, |existing| { @@ -362,24 +367,24 @@ impl<'e, K: Key, V: Value, A: ArtAllocator> TreeWriteGuard<'e, K, V, A> { success = Some(true); UpdateAction::Insert(value) } - }); - if success.expect("value_fn not called") { - Ok(()) - } else { - Err(()) - } + })?; + Ok(success.expect("value_fn not called")) } /// Remove value. Returns true if it existed pub fn remove(self, key: &K) -> bool { let mut result = false; + // FIXME: It's not clear if OOM is expected while removing. It seems + // not nice, but shrinking a node can OOM. Then again, we could opt + // to not shrink a node if we cannot allocate, to live a little longer. self.update_with_fn(key, |existing| match existing { Some(_) => { result = true; UpdateAction::Remove } None => UpdateAction::Nothing, - }); + }) + .expect("out of memory while removing"); result } @@ -392,7 +397,8 @@ impl<'e, K: Key, V: Value, A: ArtAllocator> TreeWriteGuard<'e, K, V, A> { self.update_with_fn(key, |existing| { old = existing.cloned(); UpdateAction::Remove - }); + }) + .expect("out of memory while removing"); old } @@ -402,15 +408,16 @@ impl<'e, K: Key, V: Value, A: ArtAllocator> TreeWriteGuard<'e, K, V, A> { /// returns None, the value is removed from the tree (or if there was no existing value, /// does nothing). If the function returns Some, the existing value is replaced, of if there /// was no existing value, it is inserted. FIXME: update comment - pub fn update_with_fn(mut self, key: &K, value_fn: F) + pub fn update_with_fn(mut self, key: &K, value_fn: F) -> Result<(), OutOfMemoryError> where F: FnOnce(Option<&V>) -> UpdateAction, { - algorithm::update_fn(key, value_fn, self.tree_writer.tree.root, &mut self); + algorithm::update_fn(key, value_fn, self.tree_writer.tree.root, &mut self)?; if self.created_garbage { let _ = self.collect_garbage(); } + Ok(()) } fn remember_obsolete_node(&mut self, ptr: NodePtr) { diff --git a/libs/neonart/src/tests.rs b/libs/neonart/src/tests.rs index b9724d2760..db674597f7 100644 --- a/libs/neonart/src/tests.rs +++ b/libs/neonart/src/tests.rs @@ -170,7 +170,8 @@ fn apply_op>( UpdateAction::Nothing } } - }); + }) + .expect("out of memory"); } fn test_iter>( diff --git a/pgxn/neon/communicator/src/integrated_cache.rs b/pgxn/neon/communicator/src/integrated_cache.rs index 3e8029d6b6..c544541d17 100644 --- a/pgxn/neon/communicator/src/integrated_cache.rs +++ b/pgxn/neon/communicator/src/integrated_cache.rs @@ -448,7 +448,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { pub fn remember_rel_size(&'t self, rel: &RelTag, nblocks: u32) { let w = self.cache_tree.start_write(); - w.update_with_fn(&TreeKey::from(rel), |existing| match existing { + let result = w.update_with_fn(&TreeKey::from(rel), |existing| match existing { None => { tracing::info!("inserting rel entry for {rel:?}, {nblocks} blocks"); UpdateAction::Insert(TreeEntry::Rel(RelEntry { @@ -462,6 +462,10 @@ impl<'t> IntegratedCacheWriteAccess<'t> { UpdateAction::Nothing } }); + + // FIXME: what to do if we run out of memory? Evict other relation entries? Remove + // block entries first? + result.expect("out of memory"); } /// Remember the given page contents in the cache. @@ -489,7 +493,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { let mut old_cache_block = None; let mut found_existing = false; - w.update_with_fn(&key, |existing| { + let res = w.update_with_fn(&key, |existing| { if let Some(existing) = existing { let block_entry = if let TreeEntry::Block(e) = existing { e @@ -518,6 +522,10 @@ impl<'t> IntegratedCacheWriteAccess<'t> { UpdateAction::Nothing }); + // FIXME: what to do if we run out of memory? Evict other relation entries? Remove + // block entries first? + res.expect("out of memory"); + // Allocate a new block if required let cache_block = old_cache_block.unwrap_or_else(|| { loop { @@ -540,7 +548,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { // Update the block entry let w = self.cache_tree.start_write(); - w.update_with_fn(&key, |existing| { + let res = w.update_with_fn(&key, |existing| { assert_eq!(found_existing, existing.is_some()); if let Some(existing) = existing { let block_entry = if let TreeEntry::Block(e) = existing { @@ -574,6 +582,10 @@ impl<'t> IntegratedCacheWriteAccess<'t> { })) } }); + + // FIXME: what to do if we run out of memory? Evict other relation entries? Remove + // block entries first? + res.expect("out of memory"); } else { // !is_write // @@ -602,7 +614,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { let w = self.cache_tree.start_write(); - w.update_with_fn(&key, |existing| { + let res = w.update_with_fn(&key, |existing| { if let Some(existing) = existing { let block_entry = if let TreeEntry::Block(e) = existing { e @@ -626,6 +638,10 @@ impl<'t> IntegratedCacheWriteAccess<'t> { })) } }); + + // FIXME: what to do if we run out of memory? Evict other relation entries? Remove + // block entries first? + res.expect("out of memory"); } } @@ -643,7 +659,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { let mut evicted_cache_block = None; - w.update_with_fn(&k, |e| { + let res = w.update_with_fn(&k, |e| { if let Some(e) = e { let block_entry = if let TreeEntry::Block(e) = e { e @@ -662,6 +678,10 @@ impl<'t> IntegratedCacheWriteAccess<'t> { } }); + // FIXME: It's pretty surprising to run out of memory while removing. But + // maybe it can happen because of trying to shrink a node? + res.expect("out of memory"); + if let Some(evicted_cache_block) = evicted_cache_block { self.file_cache .as_ref() @@ -699,7 +719,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { let w = self.cache_tree.start_write(); let mut evicted_cache_block = None; - w.update_with_fn(&k, |old| { + let res = w.update_with_fn(&k, |old| { match old { None => UpdateAction::Nothing, Some(TreeEntry::Rel(_)) => panic!("unexpected Rel entry"), @@ -727,6 +747,12 @@ impl<'t> IntegratedCacheWriteAccess<'t> { } } }); + + // FIXME: what to do if we run out of memory? Evict other relation entries? Remove + // block entries first? It probably shouldn't happen here, as we're not + // actually updating the tree. + res.expect("out of memory"); + if evicted_cache_block.is_some() { self.page_evictions_counter.inc(); return evicted_cache_block;