mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 22:10:39 +00:00
WIP rebuild API
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
// TODO the `im` crate has 20x more downloads and also has
|
||||
// persistent/immutable BTree. See if it's better.
|
||||
@@ -90,10 +91,9 @@ impl<Value: Clone> PersistentLayerMap<Value> {
|
||||
version.range(0..=key).rev().next()?.1.as_ref()
|
||||
}
|
||||
|
||||
// TODO Add API for delta layers with lsn range.
|
||||
// The easy solution is to only store images, and then from every
|
||||
// image point to deltas on top of it. There might be something
|
||||
// nicer but we have this solution as backup.
|
||||
pub fn trim(self: &mut Self, begin: &u64) {
|
||||
self.historic.split_off(begin);
|
||||
}
|
||||
}
|
||||
|
||||
/// Basic test for the immutable bst library, just to show usage.
|
||||
@@ -140,3 +140,126 @@ fn test_persistent_simple() {
|
||||
assert_eq!(map.query(5, 125), Some(&"Layer 3".to_string()));
|
||||
assert_eq!(map.query(7, 125), Some(&"Layer 2".to_string()));
|
||||
}
|
||||
|
||||
/// Layer map that supports:
|
||||
/// - efficient historical queries
|
||||
/// - efficient append only updates
|
||||
/// - tombstones and similar methods for non-latest updates
|
||||
/// - compaction/rebuilding to remove tombstones
|
||||
///
|
||||
/// See this for better retroactive techniques we can try
|
||||
/// https://www.youtube.com/watch?v=WqCWghETNDc&t=581s
|
||||
///
|
||||
/// Layer type is abstracted as Value to make unit testing easier.
|
||||
pub struct RetroactiveLayerMap<Value> {
|
||||
/// Using Arc and Vec allows us to hack around the lack of retroactive
|
||||
/// insert/delete functionality in PersistentLayerMap:
|
||||
/// - For normal append-only updates, we insert Arc::new(vec![value]).
|
||||
/// - For retroactive deletion (during gc) we empty the vector. The use
|
||||
/// of Arc gives us a useful indirection layer so that the delete would
|
||||
/// effectively retroactively update future versions, instead of creating
|
||||
/// a new branch.
|
||||
/// - For retroactive updates (during compaction), we find all layers below
|
||||
/// the layer we're inserting, and append to their Vec-s. This is O(N), but
|
||||
/// also amortized O(log N). Here's why: We don't insert image layers
|
||||
/// retroactively, only deltas. And after an image gets covered by K (currently
|
||||
/// K = 3) deltas, we do compaction.
|
||||
///
|
||||
/// This complexity might be a limitation, or a feature. Here's how it might
|
||||
/// actually help: It gives us the option to store the entire reconstruction
|
||||
/// result in a single colocated Vec, and get the initial image and all necessary
|
||||
/// deltas in one query.
|
||||
map: PersistentLayerMap<Arc<Vec<Value>>>,
|
||||
|
||||
/// We buffer insertion into the PersistentLayerMap to decrease the number of rebuilds.
|
||||
buffer: BTreeMap<u64, Vec<(i128, i128, Value)>>,
|
||||
|
||||
/// All current layers. This is not used for search. Only to make rebuilds easier.
|
||||
layers: BTreeMap<u64, Vec<(i128, i128, Value)>>,
|
||||
}
|
||||
|
||||
impl<Value: std::fmt::Debug> std::fmt::Debug for RetroactiveLayerMap<Value> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "RetroactiveLayerMap: head: {:?}", self.map)
|
||||
}
|
||||
}
|
||||
|
||||
impl<Value: Clone> RetroactiveLayerMap<Value> {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
map: PersistentLayerMap::<Arc<Vec<Value>>>::new(),
|
||||
buffer: BTreeMap::new(),
|
||||
layers: BTreeMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn insert(self: &mut Self, key_begin: i128, key_end: i128, lsn: u64, value: Value) {
|
||||
self.buffer
|
||||
.entry(lsn)
|
||||
.and_modify(|vec| vec.push((key_begin, key_end, value.clone())))
|
||||
.or_insert(vec![(key_begin, key_end, value.clone())]);
|
||||
}
|
||||
|
||||
pub fn rebuild(self: &mut Self) {
|
||||
// Find the first LSN that needs to be rebuilt
|
||||
let rebuild_since: u64 = match self.buffer.iter().next() {
|
||||
Some((lsn, _)) => lsn.clone(),
|
||||
None => return, // No need to rebuild if buffer is empty
|
||||
};
|
||||
|
||||
// Move buffer elements into self.layers
|
||||
self.buffer.retain(|lsn, layers| {
|
||||
self.layers
|
||||
.entry(*lsn)
|
||||
.and_modify(|vec| vec.append(layers))
|
||||
.or_insert(layers.clone());
|
||||
false
|
||||
});
|
||||
|
||||
// Rebuild
|
||||
self.map.trim(&rebuild_since);
|
||||
for (lsn, layers) in self.layers.range(rebuild_since..) {
|
||||
for (key_begin, key_end, value) in layers {
|
||||
let wrapped = Arc::new(vec![value.clone()]);
|
||||
self.map.insert(*key_begin, *key_end, *lsn, wrapped);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn query(self: &Self, key: i128, lsn: u64) -> Option<Value> {
|
||||
if !self.buffer.is_empty() {
|
||||
panic!("rebuild pls")
|
||||
}
|
||||
|
||||
match self.map.query(key, lsn) {
|
||||
Some(vec) => match vec.len().cmp(&1) {
|
||||
std::cmp::Ordering::Less => todo!(),
|
||||
std::cmp::Ordering::Equal => Some(vec[0].clone()),
|
||||
std::cmp::Ordering::Greater => todo!(),
|
||||
},
|
||||
None => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_retroactive_simple() {
|
||||
let mut map = RetroactiveLayerMap::new();
|
||||
|
||||
// Append some images in increasing LSN order
|
||||
map.insert(0, 5, 100, "Image 1".to_string());
|
||||
map.insert(3, 9, 110, "Image 2".to_string());
|
||||
map.insert(5, 6, 120, "Image 3".to_string());
|
||||
|
||||
// Add a delta layer out of order
|
||||
map.insert(2, 5, 105, "Delta 1".to_string());
|
||||
|
||||
// Rebuild so we can start querying
|
||||
map.rebuild();
|
||||
|
||||
// Query
|
||||
assert_eq!(map.query(4, 90), None);
|
||||
assert_eq!(map.query(4, 102), Some("Image 1".to_string()));
|
||||
assert_eq!(map.query(4, 107), Some("Delta 1".to_string()));
|
||||
assert_eq!(map.query(4, 115), Some("Image 2".to_string()));
|
||||
}
|
||||
|
||||
@@ -250,9 +250,12 @@ impl LayerMap {
|
||||
// HACK use the index to query and return early. If this works I'll
|
||||
// rewrite the function.
|
||||
let result = self.index.query(key.to_i128(), end_lsn.0);
|
||||
// TODO check if this is correct. I'm returning the latest layer by
|
||||
// start lsn, but the current solution first looks for latest
|
||||
// by end lsn.
|
||||
return Ok(result.map(|layer| SearchResult {
|
||||
layer: Arc::clone(layer),
|
||||
lsn_floor: Lsn(0), // TODO what's this?
|
||||
lsn_floor: layer.get_lsn_range().start,
|
||||
}));
|
||||
|
||||
// linear search
|
||||
|
||||
Reference in New Issue
Block a user