mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 22:10:39 +00:00
Implement deletion
This commit is contained in:
@@ -126,6 +126,13 @@ impl<Value: Clone> PersistentLayerMap<Value> {
|
||||
|
||||
pub fn trim(self: &mut Self, begin: &u64) {
|
||||
self.historic.split_off(begin);
|
||||
self.head = self
|
||||
.historic
|
||||
.iter()
|
||||
.rev()
|
||||
.next()
|
||||
.map(|(_, v)| v.clone())
|
||||
.unwrap_or_default();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -292,10 +299,11 @@ pub struct RetroactiveLayerMap<Value> {
|
||||
map: PersistentLayerMap<Arc<Vec<Value>>>,
|
||||
|
||||
/// We buffer insertion into the PersistentLayerMap to decrease the number of rebuilds.
|
||||
buffer: BTreeMap<u64, Vec<(Range<i128>, Range<u64>, Value)>>,
|
||||
/// A value of None means we want to delete this item.
|
||||
buffer: BTreeMap<(u64, u64, i128, i128), Option<Value>>,
|
||||
|
||||
/// All current layers. This is not used for search. Only to make rebuilds easier.
|
||||
layers: BTreeMap<u64, Vec<(Range<i128>, Range<u64>, Value)>>,
|
||||
layers: BTreeMap<(u64, u64, i128, i128), Value>,
|
||||
}
|
||||
|
||||
impl<Value: std::fmt::Debug> std::fmt::Debug for RetroactiveLayerMap<Value> {
|
||||
@@ -320,38 +328,58 @@ impl<Value: Clone> RetroactiveLayerMap<Value> {
|
||||
}
|
||||
|
||||
pub fn insert(self: &mut Self, key: Range<i128>, lsn: Range<u64>, value: Value) {
|
||||
self.buffer.insert(
|
||||
(lsn.start, lsn.end, key.start, key.end),
|
||||
Some(value.clone()),
|
||||
);
|
||||
}
|
||||
|
||||
pub fn remove(self: &mut Self, key: Range<i128>, lsn: Range<u64>) {
|
||||
self.buffer
|
||||
.entry(lsn.start)
|
||||
.and_modify(|vec| vec.push((key.clone(), lsn.clone(), value.clone())))
|
||||
.or_insert(vec![(key.clone(), lsn.clone(), value.clone())]);
|
||||
.insert((lsn.start, lsn.end, key.start, key.end), None);
|
||||
}
|
||||
|
||||
pub fn rebuild(self: &mut Self) {
|
||||
// Find the first LSN that needs to be rebuilt
|
||||
let rebuild_since: u64 = match self.buffer.iter().next() {
|
||||
Some((lsn, _)) => lsn.clone(),
|
||||
Some(((lsn_start, _, _, _), _)) => lsn_start.clone(),
|
||||
None => return, // No need to rebuild if buffer is empty
|
||||
};
|
||||
|
||||
// Move buffer elements into self.layers
|
||||
self.buffer.retain(|lsn, layers| {
|
||||
self.layers
|
||||
.entry(*lsn)
|
||||
.and_modify(|vec| vec.append(layers))
|
||||
.or_insert(layers.clone());
|
||||
// Apply buffered updates to self.layers
|
||||
self.buffer.retain(|rect, layer| {
|
||||
match layer {
|
||||
Some(l) => {
|
||||
let existing = self.layers.insert(rect.clone(), l.clone());
|
||||
if existing.is_some() {
|
||||
panic!("can't overwrite layer");
|
||||
}
|
||||
}
|
||||
None => {
|
||||
let existing = self.layers.remove(rect);
|
||||
if existing.is_none() {
|
||||
panic!("invalid layer deletion");
|
||||
}
|
||||
}
|
||||
};
|
||||
false
|
||||
});
|
||||
|
||||
// Rebuild
|
||||
self.map.trim(&rebuild_since);
|
||||
for (_start_lsn, layers) in self.layers.range(rebuild_since..) {
|
||||
for (key, end_lsn, value) in layers {
|
||||
let wrapped = Arc::new(vec![value.clone()]);
|
||||
self.map.insert(key.clone(), end_lsn.clone(), wrapped);
|
||||
}
|
||||
for ((lsn_start, lsn_end, key_start, key_end), layer) in
|
||||
self.layers.range((rebuild_since, 0, 0, 0)..)
|
||||
{
|
||||
let wrapped = Arc::new(vec![layer.clone()]);
|
||||
self.map
|
||||
.insert(*key_start..*key_end, *lsn_start..*lsn_end, wrapped);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn clear(self: &mut Self) {
|
||||
self.map.trim(&0);
|
||||
}
|
||||
|
||||
pub fn query(self: &Self, key: i128, lsn: u64) -> Option<Value> {
|
||||
if !self.buffer.is_empty() {
|
||||
panic!("rebuild pls")
|
||||
@@ -375,7 +403,8 @@ fn test_retroactive_simple() {
|
||||
// Append some images in increasing LSN order
|
||||
map.insert(0..5, 100..101, "Image 1".to_string());
|
||||
map.insert(3..9, 110..111, "Image 2".to_string());
|
||||
map.insert(5..6, 120..121, "Image 3".to_string());
|
||||
map.insert(4..6, 120..121, "Image 3".to_string());
|
||||
map.insert(8..9, 120..121, "Image 4".to_string());
|
||||
|
||||
// Add a delta layer out of order
|
||||
map.insert(2..5, 105..106, "Delta 1".to_string());
|
||||
@@ -383,9 +412,18 @@ fn test_retroactive_simple() {
|
||||
// Rebuild so we can start querying
|
||||
map.rebuild();
|
||||
|
||||
// Query
|
||||
// Query key 4
|
||||
assert_eq!(map.query(4, 90), None);
|
||||
assert_eq!(map.query(4, 102), Some("Image 1".to_string()));
|
||||
assert_eq!(map.query(4, 107), Some("Delta 1".to_string()));
|
||||
assert_eq!(map.query(4, 115), Some("Image 2".to_string()));
|
||||
assert_eq!(map.query(4, 125), Some("Image 3".to_string()));
|
||||
|
||||
// Remove Image 3
|
||||
map.remove(4..6, 120..121);
|
||||
map.rebuild();
|
||||
|
||||
// Check deletion worked
|
||||
assert_eq!(map.query(4, 125), Some("Image 2".to_string()));
|
||||
assert_eq!(map.query(8, 125), Some("Image 4".to_string()));
|
||||
}
|
||||
|
||||
@@ -393,7 +393,6 @@ impl LayerMap {
|
||||
/// Insert an on-disk layer
|
||||
///
|
||||
pub fn insert_historic(&mut self, layer: Arc<dyn Layer>) {
|
||||
// TODO the index needs to support out of order insertion for this to work
|
||||
let kr = layer.get_key_range();
|
||||
let lr = layer.get_lsn_range();
|
||||
self.index.insert(
|
||||
@@ -429,8 +428,14 @@ impl LayerMap {
|
||||
/// This should be called when the corresponding file on disk has been deleted.
|
||||
///
|
||||
pub fn remove_historic(&mut self, layer: Arc<dyn Layer>) {
|
||||
|
||||
// TODO remve from self.index and self.images
|
||||
let kr = layer.get_key_range();
|
||||
let lr = layer.get_lsn_range();
|
||||
self.index
|
||||
.remove(kr.start.to_i128()..kr.end.to_i128(), lr.start.0..lr.end.0);
|
||||
if !layer.is_incremental() {
|
||||
self.images
|
||||
.remove(kr.start.to_i128()..kr.end.to_i128(), lr.start.0..lr.end.0);
|
||||
}
|
||||
|
||||
if layer.get_key_range() == (Key::MIN..Key::MAX) {
|
||||
let len_before = self.l0_delta_layers.len();
|
||||
|
||||
Reference in New Issue
Block a user