mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 14:02:55 +00:00
Make DiskBtreeReader::{visit, get} async (#4863)
## Problem `DiskBtreeReader::get` and `DiskBtreeReader::visit` both call `read_blk` internally, which we would like to make async in the future. This PR focuses on making the interface of these two functions `async`. There is further work to be done in forms of making `visit` to not be recursive any more, similar to #4838. For that, see https://github.com/neondatabase/neon/pull/4884. Builds on top of https://github.com/neondatabase/neon/pull/4839, part of https://github.com/neondatabase/neon/issues/4743 ## Summary of changes Make `DiskBtreeReader::get` and `DiskBtreeReader::visit` async functions and `await` in the places that call these functions.
This commit is contained in:
@@ -107,23 +107,25 @@ async fn get_holes(path: &Path, max_holes: usize) -> Result<Vec<Hole>> {
|
||||
// min-heap (reserve space for one more element added before eviction)
|
||||
let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
|
||||
let mut prev_key: Option<Key> = None;
|
||||
tree_reader.visit(
|
||||
&[0u8; DELTA_KEY_SIZE],
|
||||
VisitDirection::Forwards,
|
||||
|key, _value| {
|
||||
let curr = Key::from_slice(&key[..KEY_SIZE]);
|
||||
if let Some(prev) = prev_key {
|
||||
if curr.to_i128() - prev.to_i128() >= MIN_HOLE_LENGTH {
|
||||
heap.push(Hole(prev..curr));
|
||||
if heap.len() > max_holes {
|
||||
heap.pop(); // remove smallest hole
|
||||
tree_reader
|
||||
.visit(
|
||||
&[0u8; DELTA_KEY_SIZE],
|
||||
VisitDirection::Forwards,
|
||||
|key, _value| {
|
||||
let curr = Key::from_slice(&key[..KEY_SIZE]);
|
||||
if let Some(prev) = prev_key {
|
||||
if curr.to_i128() - prev.to_i128() >= MIN_HOLE_LENGTH {
|
||||
heap.push(Hole(prev..curr));
|
||||
if heap.len() > max_holes {
|
||||
heap.pop(); // remove smallest hole
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
prev_key = Some(curr.next());
|
||||
true
|
||||
},
|
||||
)?;
|
||||
prev_key = Some(curr.next());
|
||||
true
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
let mut holes = heap.into_vec();
|
||||
holes.sort_by_key(|hole| hole.0.start);
|
||||
Ok(holes)
|
||||
|
||||
@@ -59,15 +59,17 @@ async fn read_delta_file(path: impl AsRef<Path>) -> Result<()> {
|
||||
);
|
||||
// TODO(chi): dedup w/ `delta_layer.rs` by exposing the API.
|
||||
let mut all = vec![];
|
||||
tree_reader.visit(
|
||||
&[0u8; DELTA_KEY_SIZE],
|
||||
VisitDirection::Forwards,
|
||||
|key, value_offset| {
|
||||
let curr = Key::from_slice(&key[..KEY_SIZE]);
|
||||
all.push((curr, BlobRef(value_offset)));
|
||||
true
|
||||
},
|
||||
)?;
|
||||
tree_reader
|
||||
.visit(
|
||||
&[0u8; DELTA_KEY_SIZE],
|
||||
VisitDirection::Forwards,
|
||||
|key, value_offset| {
|
||||
let curr = Key::from_slice(&key[..KEY_SIZE]);
|
||||
all.push((curr, BlobRef(value_offset)));
|
||||
true
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
let cursor = BlockCursor::new(&file);
|
||||
for (k, v) in all {
|
||||
let value = cursor.read_blob(v.pos())?;
|
||||
|
||||
@@ -230,14 +230,15 @@ where
|
||||
///
|
||||
/// Read the value for given key. Returns the value, or None if it doesn't exist.
|
||||
///
|
||||
pub fn get(&self, search_key: &[u8; L]) -> Result<Option<u64>> {
|
||||
pub async fn get(&self, search_key: &[u8; L]) -> Result<Option<u64>> {
|
||||
let mut result: Option<u64> = None;
|
||||
self.visit(search_key, VisitDirection::Forwards, |key, value| {
|
||||
if key == search_key {
|
||||
result = Some(value);
|
||||
}
|
||||
false
|
||||
})?;
|
||||
})
|
||||
.await?;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
@@ -246,7 +247,7 @@ where
|
||||
/// will be called for every key >= 'search_key' (or <= 'search_key', if scanning
|
||||
/// backwards)
|
||||
///
|
||||
pub fn visit<V>(
|
||||
pub async fn visit<V>(
|
||||
&self,
|
||||
search_key: &[u8; L],
|
||||
dir: VisitDirection,
|
||||
@@ -269,23 +270,9 @@ where
|
||||
V: FnMut(&[u8], u64) -> bool,
|
||||
{
|
||||
// Locate the node.
|
||||
let blk = self.reader.read_blk(self.start_blk + node_blknum)?;
|
||||
let node_buf = self.reader.read_blk(self.start_blk + node_blknum)?;
|
||||
|
||||
// Search all entries on this node
|
||||
self.search_node(blk.as_ref(), search_key, dir, visitor)
|
||||
}
|
||||
|
||||
fn search_node<V>(
|
||||
&self,
|
||||
node_buf: &[u8],
|
||||
search_key: &[u8; L],
|
||||
dir: VisitDirection,
|
||||
visitor: &mut V,
|
||||
) -> Result<bool>
|
||||
where
|
||||
V: FnMut(&[u8], u64) -> bool,
|
||||
{
|
||||
let node = OnDiskNode::deparse(node_buf)?;
|
||||
let node = OnDiskNode::deparse(node_buf.as_ref())?;
|
||||
let prefix_len = node.prefix_len as usize;
|
||||
let suffix_len = node.suffix_len as usize;
|
||||
|
||||
@@ -782,12 +769,12 @@ mod tests {
|
||||
|
||||
// Test the `get` function on all the keys.
|
||||
for (key, val) in all_data.iter() {
|
||||
assert_eq!(reader.get(key)?, Some(*val));
|
||||
assert_eq!(reader.get(key).await?, Some(*val));
|
||||
}
|
||||
// And on some keys that don't exist
|
||||
assert_eq!(reader.get(b"aaaaaa")?, None);
|
||||
assert_eq!(reader.get(b"zzzzzz")?, None);
|
||||
assert_eq!(reader.get(b"xaaabx")?, None);
|
||||
assert_eq!(reader.get(b"aaaaaa").await?, None);
|
||||
assert_eq!(reader.get(b"zzzzzz").await?, None);
|
||||
assert_eq!(reader.get(b"xaaabx").await?, None);
|
||||
|
||||
// Test search with `visit` function
|
||||
let search_key = b"xabaaa";
|
||||
@@ -798,10 +785,12 @@ mod tests {
|
||||
.collect();
|
||||
|
||||
let mut data = Vec::new();
|
||||
reader.visit(search_key, VisitDirection::Forwards, |key, value| {
|
||||
data.push((key.to_vec(), value));
|
||||
true
|
||||
})?;
|
||||
reader
|
||||
.visit(search_key, VisitDirection::Forwards, |key, value| {
|
||||
data.push((key.to_vec(), value));
|
||||
true
|
||||
})
|
||||
.await?;
|
||||
assert_eq!(data, expected);
|
||||
|
||||
// Test a backwards scan
|
||||
@@ -812,16 +801,20 @@ mod tests {
|
||||
.collect();
|
||||
expected.reverse();
|
||||
let mut data = Vec::new();
|
||||
reader.visit(search_key, VisitDirection::Backwards, |key, value| {
|
||||
data.push((key.to_vec(), value));
|
||||
true
|
||||
})?;
|
||||
reader
|
||||
.visit(search_key, VisitDirection::Backwards, |key, value| {
|
||||
data.push((key.to_vec(), value));
|
||||
true
|
||||
})
|
||||
.await?;
|
||||
assert_eq!(data, expected);
|
||||
|
||||
// Backward scan where nothing matches
|
||||
reader.visit(b"aaaaaa", VisitDirection::Backwards, |key, value| {
|
||||
panic!("found unexpected key {}: {}", hex::encode(key), value);
|
||||
})?;
|
||||
reader
|
||||
.visit(b"aaaaaa", VisitDirection::Backwards, |key, value| {
|
||||
panic!("found unexpected key {}: {}", hex::encode(key), value);
|
||||
})
|
||||
.await?;
|
||||
|
||||
// Full scan
|
||||
let expected: Vec<(Vec<u8>, u64)> = all_data
|
||||
@@ -829,10 +822,12 @@ mod tests {
|
||||
.map(|(key, value)| (key.to_vec(), *value))
|
||||
.collect();
|
||||
let mut data = Vec::new();
|
||||
reader.visit(&[0u8; 6], VisitDirection::Forwards, |key, value| {
|
||||
data.push((key.to_vec(), value));
|
||||
true
|
||||
})?;
|
||||
reader
|
||||
.visit(&[0u8; 6], VisitDirection::Forwards, |key, value| {
|
||||
data.push((key.to_vec(), value));
|
||||
true
|
||||
})
|
||||
.await?;
|
||||
assert_eq!(data, expected);
|
||||
|
||||
Ok(())
|
||||
@@ -880,13 +875,15 @@ mod tests {
|
||||
for search_key_int in 0..(NUM_KEYS * 2 + 10) {
|
||||
let search_key = u64::to_be_bytes(search_key_int);
|
||||
assert_eq!(
|
||||
reader.get(&search_key)?,
|
||||
reader.get(&search_key).await?,
|
||||
all_data.get(&search_key_int).cloned()
|
||||
);
|
||||
|
||||
// Test a forward scan starting with this key
|
||||
result.lock().unwrap().clear();
|
||||
reader.visit(&search_key, VisitDirection::Forwards, take_ten)?;
|
||||
reader
|
||||
.visit(&search_key, VisitDirection::Forwards, take_ten)
|
||||
.await?;
|
||||
let expected = all_data
|
||||
.range(search_key_int..)
|
||||
.take(10)
|
||||
@@ -896,7 +893,9 @@ mod tests {
|
||||
|
||||
// And a backwards scan
|
||||
result.lock().unwrap().clear();
|
||||
reader.visit(&search_key, VisitDirection::Backwards, take_ten)?;
|
||||
reader
|
||||
.visit(&search_key, VisitDirection::Backwards, take_ten)
|
||||
.await?;
|
||||
let expected = all_data
|
||||
.range(..=search_key_int)
|
||||
.rev()
|
||||
@@ -910,7 +909,9 @@ mod tests {
|
||||
let search_key = u64::to_be_bytes(0);
|
||||
limit.store(usize::MAX, Ordering::Relaxed);
|
||||
result.lock().unwrap().clear();
|
||||
reader.visit(&search_key, VisitDirection::Forwards, take_ten)?;
|
||||
reader
|
||||
.visit(&search_key, VisitDirection::Forwards, take_ten)
|
||||
.await?;
|
||||
let expected = all_data
|
||||
.iter()
|
||||
.map(|(&key, &val)| (key, val))
|
||||
@@ -921,7 +922,9 @@ mod tests {
|
||||
let search_key = u64::to_be_bytes(u64::MAX);
|
||||
limit.store(usize::MAX, Ordering::Relaxed);
|
||||
result.lock().unwrap().clear();
|
||||
reader.visit(&search_key, VisitDirection::Backwards, take_ten)?;
|
||||
reader
|
||||
.visit(&search_key, VisitDirection::Backwards, take_ten)
|
||||
.await?;
|
||||
let expected = all_data
|
||||
.iter()
|
||||
.rev()
|
||||
@@ -932,8 +935,8 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn random_data() -> Result<()> {
|
||||
#[tokio::test]
|
||||
async fn random_data() -> Result<()> {
|
||||
// Generate random keys with exponential distribution, to
|
||||
// exercise the prefix compression
|
||||
const NUM_KEYS: usize = 100000;
|
||||
@@ -960,19 +963,23 @@ mod tests {
|
||||
// Test get() operation on all the keys
|
||||
for (&key, &val) in all_data.iter() {
|
||||
let search_key = u128::to_be_bytes(key);
|
||||
assert_eq!(reader.get(&search_key)?, Some(val));
|
||||
assert_eq!(reader.get(&search_key).await?, Some(val));
|
||||
}
|
||||
|
||||
// Test get() operations on random keys, most of which will not exist
|
||||
for _ in 0..100000 {
|
||||
let key_int = rand::thread_rng().gen::<u128>();
|
||||
let search_key = u128::to_be_bytes(key_int);
|
||||
assert!(reader.get(&search_key)? == all_data.get(&key_int).cloned());
|
||||
assert!(reader.get(&search_key).await? == all_data.get(&key_int).cloned());
|
||||
}
|
||||
|
||||
// Test boundary cases
|
||||
assert!(reader.get(&u128::to_be_bytes(u128::MIN))? == all_data.get(&u128::MIN).cloned());
|
||||
assert!(reader.get(&u128::to_be_bytes(u128::MAX))? == all_data.get(&u128::MAX).cloned());
|
||||
assert!(
|
||||
reader.get(&u128::to_be_bytes(u128::MIN)).await? == all_data.get(&u128::MIN).cloned()
|
||||
);
|
||||
assert!(
|
||||
reader.get(&u128::to_be_bytes(u128::MAX)).await? == all_data.get(&u128::MAX).cloned()
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1014,15 +1021,17 @@ mod tests {
|
||||
|
||||
// Test get() operation on all the keys
|
||||
for (key, val) in disk_btree_test_data::TEST_DATA {
|
||||
assert_eq!(reader.get(&key)?, Some(val));
|
||||
assert_eq!(reader.get(&key).await?, Some(val));
|
||||
}
|
||||
|
||||
// Test full scan
|
||||
let mut count = 0;
|
||||
reader.visit(&[0u8; 26], VisitDirection::Forwards, |_key, _value| {
|
||||
count += 1;
|
||||
true
|
||||
})?;
|
||||
reader
|
||||
.visit(&[0u8; 26], VisitDirection::Forwards, |_key, _value| {
|
||||
count += 1;
|
||||
true
|
||||
})
|
||||
.await?;
|
||||
assert_eq!(count, disk_btree_test_data::TEST_DATA.len());
|
||||
|
||||
reader.dump().await?;
|
||||
|
||||
@@ -281,22 +281,24 @@ impl Layer for DeltaLayer {
|
||||
Ok(desc)
|
||||
};
|
||||
|
||||
tree_reader.visit(
|
||||
&[0u8; DELTA_KEY_SIZE],
|
||||
VisitDirection::Forwards,
|
||||
|delta_key, val| {
|
||||
let blob_ref = BlobRef(val);
|
||||
let key = DeltaKey::extract_key_from_buf(delta_key);
|
||||
let lsn = DeltaKey::extract_lsn_from_buf(delta_key);
|
||||
tree_reader
|
||||
.visit(
|
||||
&[0u8; DELTA_KEY_SIZE],
|
||||
VisitDirection::Forwards,
|
||||
|delta_key, val| {
|
||||
let blob_ref = BlobRef(val);
|
||||
let key = DeltaKey::extract_key_from_buf(delta_key);
|
||||
let lsn = DeltaKey::extract_lsn_from_buf(delta_key);
|
||||
|
||||
let desc = match dump_blob(blob_ref) {
|
||||
Ok(desc) => desc,
|
||||
Err(err) => format!("ERROR: {}", err),
|
||||
};
|
||||
println!(" key {} at {}: {}", key, lsn, desc);
|
||||
true
|
||||
},
|
||||
)?;
|
||||
let desc = match dump_blob(blob_ref) {
|
||||
Ok(desc) => desc,
|
||||
Err(err) => format!("ERROR: {}", err),
|
||||
};
|
||||
println!(" key {} at {}: {}", key, lsn, desc);
|
||||
true
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -328,19 +330,21 @@ impl Layer for DeltaLayer {
|
||||
|
||||
let mut offsets: Vec<(Lsn, u64)> = Vec::new();
|
||||
|
||||
tree_reader.visit(&search_key.0, VisitDirection::Backwards, |key, value| {
|
||||
let blob_ref = BlobRef(value);
|
||||
if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] {
|
||||
return false;
|
||||
}
|
||||
let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
|
||||
if entry_lsn < lsn_range.start {
|
||||
return false;
|
||||
}
|
||||
offsets.push((entry_lsn, blob_ref.pos()));
|
||||
tree_reader
|
||||
.visit(&search_key.0, VisitDirection::Backwards, |key, value| {
|
||||
let blob_ref = BlobRef(value);
|
||||
if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] {
|
||||
return false;
|
||||
}
|
||||
let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
|
||||
if entry_lsn < lsn_range.start {
|
||||
return false;
|
||||
}
|
||||
offsets.push((entry_lsn, blob_ref.pos()));
|
||||
|
||||
!blob_ref.will_init()
|
||||
})?;
|
||||
!blob_ref.will_init()
|
||||
})
|
||||
.await?;
|
||||
|
||||
// Ok, 'offsets' now contains the offsets of all the entries we need to read
|
||||
let cursor = file.block_cursor();
|
||||
@@ -618,7 +622,9 @@ impl DeltaLayer {
|
||||
let inner = self
|
||||
.load(LayerAccessKind::KeyIter, ctx)
|
||||
.context("load delta layer")?;
|
||||
DeltaLayerInner::load_val_refs(inner).context("Layer index is corrupted")
|
||||
DeltaLayerInner::load_val_refs(inner)
|
||||
.await
|
||||
.context("Layer index is corrupted")
|
||||
}
|
||||
|
||||
/// Loads all keys stored in the layer. Returns key, lsn and value size.
|
||||
@@ -626,7 +632,9 @@ impl DeltaLayer {
|
||||
let inner = self
|
||||
.load(LayerAccessKind::KeyIter, ctx)
|
||||
.context("load delta layer keys")?;
|
||||
inner.load_keys().context("Layer index is corrupted")
|
||||
DeltaLayerInner::load_keys(inner)
|
||||
.await
|
||||
.context("Layer index is corrupted")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -899,7 +907,7 @@ impl Drop for DeltaLayerWriter {
|
||||
}
|
||||
|
||||
impl DeltaLayerInner {
|
||||
fn load_val_refs(this: &Arc<DeltaLayerInner>) -> Result<Vec<(Key, Lsn, ValueRef)>> {
|
||||
async fn load_val_refs(this: &Arc<DeltaLayerInner>) -> Result<Vec<(Key, Lsn, ValueRef)>> {
|
||||
let file = &this.file;
|
||||
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
this.index_start_blk,
|
||||
@@ -908,23 +916,25 @@ impl DeltaLayerInner {
|
||||
);
|
||||
|
||||
let mut all_offsets = Vec::<(Key, Lsn, ValueRef)>::new();
|
||||
tree_reader.visit(
|
||||
&[0u8; DELTA_KEY_SIZE],
|
||||
VisitDirection::Forwards,
|
||||
|key, value| {
|
||||
let delta_key = DeltaKey::from_slice(key);
|
||||
let val_ref = ValueRef {
|
||||
blob_ref: BlobRef(value),
|
||||
reader: BlockCursor::new(Adapter(this.clone())),
|
||||
};
|
||||
all_offsets.push((delta_key.key(), delta_key.lsn(), val_ref));
|
||||
true
|
||||
},
|
||||
)?;
|
||||
tree_reader
|
||||
.visit(
|
||||
&[0u8; DELTA_KEY_SIZE],
|
||||
VisitDirection::Forwards,
|
||||
|key, value| {
|
||||
let delta_key = DeltaKey::from_slice(key);
|
||||
let val_ref = ValueRef {
|
||||
blob_ref: BlobRef(value),
|
||||
reader: BlockCursor::new(Adapter(this.clone())),
|
||||
};
|
||||
all_offsets.push((delta_key.key(), delta_key.lsn(), val_ref));
|
||||
true
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(all_offsets)
|
||||
}
|
||||
fn load_keys(&self) -> Result<Vec<(Key, Lsn, u64)>> {
|
||||
async fn load_keys(&self) -> Result<Vec<(Key, Lsn, u64)>> {
|
||||
let file = &self.file;
|
||||
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
self.index_start_blk,
|
||||
@@ -933,26 +943,28 @@ impl DeltaLayerInner {
|
||||
);
|
||||
|
||||
let mut all_keys: Vec<(Key, Lsn, u64)> = Vec::new();
|
||||
tree_reader.visit(
|
||||
&[0u8; DELTA_KEY_SIZE],
|
||||
VisitDirection::Forwards,
|
||||
|key, value| {
|
||||
let delta_key = DeltaKey::from_slice(key);
|
||||
let pos = BlobRef(value).pos();
|
||||
if let Some(last) = all_keys.last_mut() {
|
||||
if last.0 == delta_key.key() {
|
||||
return true;
|
||||
} else {
|
||||
// subtract offset of new key BLOB and first blob of this key
|
||||
// to get total size if values associated with this key
|
||||
let first_pos = last.2;
|
||||
last.2 = pos - first_pos;
|
||||
tree_reader
|
||||
.visit(
|
||||
&[0u8; DELTA_KEY_SIZE],
|
||||
VisitDirection::Forwards,
|
||||
|key, value| {
|
||||
let delta_key = DeltaKey::from_slice(key);
|
||||
let pos = BlobRef(value).pos();
|
||||
if let Some(last) = all_keys.last_mut() {
|
||||
if last.0 == delta_key.key() {
|
||||
return true;
|
||||
} else {
|
||||
// subtract offset of new key BLOB and first blob of this key
|
||||
// to get total size if values associated with this key
|
||||
let first_pos = last.2;
|
||||
last.2 = pos - first_pos;
|
||||
}
|
||||
}
|
||||
}
|
||||
all_keys.push((delta_key.key(), delta_key.lsn(), pos));
|
||||
true
|
||||
},
|
||||
)?;
|
||||
all_keys.push((delta_key.key(), delta_key.lsn(), pos));
|
||||
true
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
if let Some(last) = all_keys.last_mut() {
|
||||
// Last key occupies all space till end of layer
|
||||
last.2 = std::fs::metadata(&file.file.path)?.len() - last.2;
|
||||
|
||||
@@ -175,10 +175,12 @@ impl Layer for ImageLayer {
|
||||
|
||||
tree_reader.dump().await?;
|
||||
|
||||
tree_reader.visit(&[0u8; KEY_SIZE], VisitDirection::Forwards, |key, value| {
|
||||
println!("key: {} offset {}", hex::encode(key), value);
|
||||
true
|
||||
})?;
|
||||
tree_reader
|
||||
.visit(&[0u8; KEY_SIZE], VisitDirection::Forwards, |key, value| {
|
||||
println!("key: {} offset {}", hex::encode(key), value);
|
||||
true
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -202,7 +204,7 @@ impl Layer for ImageLayer {
|
||||
|
||||
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
||||
key.write_to_byte_slice(&mut keybuf);
|
||||
if let Some(offset) = tree_reader.get(&keybuf)? {
|
||||
if let Some(offset) = tree_reader.get(&keybuf).await? {
|
||||
let blob = file.block_cursor().read_blob(offset).with_context(|| {
|
||||
format!(
|
||||
"failed to read value from data file {} at offset {}",
|
||||
|
||||
Reference in New Issue
Block a user