diff --git a/pageserver/ctl/src/layer_map_analyzer.rs b/pageserver/ctl/src/layer_map_analyzer.rs index da83c061c0..9d9a1fa151 100644 --- a/pageserver/ctl/src/layer_map_analyzer.rs +++ b/pageserver/ctl/src/layer_map_analyzer.rs @@ -107,23 +107,25 @@ async fn get_holes(path: &Path, max_holes: usize) -> Result> { // min-heap (reserve space for one more element added before eviction) let mut heap: BinaryHeap = BinaryHeap::with_capacity(max_holes + 1); let mut prev_key: Option = None; - tree_reader.visit( - &[0u8; DELTA_KEY_SIZE], - VisitDirection::Forwards, - |key, _value| { - let curr = Key::from_slice(&key[..KEY_SIZE]); - if let Some(prev) = prev_key { - if curr.to_i128() - prev.to_i128() >= MIN_HOLE_LENGTH { - heap.push(Hole(prev..curr)); - if heap.len() > max_holes { - heap.pop(); // remove smallest hole + tree_reader + .visit( + &[0u8; DELTA_KEY_SIZE], + VisitDirection::Forwards, + |key, _value| { + let curr = Key::from_slice(&key[..KEY_SIZE]); + if let Some(prev) = prev_key { + if curr.to_i128() - prev.to_i128() >= MIN_HOLE_LENGTH { + heap.push(Hole(prev..curr)); + if heap.len() > max_holes { + heap.pop(); // remove smallest hole + } } } - } - prev_key = Some(curr.next()); - true - }, - )?; + prev_key = Some(curr.next()); + true + }, + ) + .await?; let mut holes = heap.into_vec(); holes.sort_by_key(|hole| hole.0.start); Ok(holes) diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index 74613eb5f4..e3ddefc661 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -59,15 +59,17 @@ async fn read_delta_file(path: impl AsRef) -> Result<()> { ); // TODO(chi): dedup w/ `delta_layer.rs` by exposing the API. let mut all = vec![]; - tree_reader.visit( - &[0u8; DELTA_KEY_SIZE], - VisitDirection::Forwards, - |key, value_offset| { - let curr = Key::from_slice(&key[..KEY_SIZE]); - all.push((curr, BlobRef(value_offset))); - true - }, - )?; + tree_reader + .visit( + &[0u8; DELTA_KEY_SIZE], + VisitDirection::Forwards, + |key, value_offset| { + let curr = Key::from_slice(&key[..KEY_SIZE]); + all.push((curr, BlobRef(value_offset))); + true + }, + ) + .await?; let cursor = BlockCursor::new(&file); for (k, v) in all { let value = cursor.read_blob(v.pos())?; diff --git a/pageserver/src/tenant/disk_btree.rs b/pageserver/src/tenant/disk_btree.rs index 518286ddb0..b163d27b42 100644 --- a/pageserver/src/tenant/disk_btree.rs +++ b/pageserver/src/tenant/disk_btree.rs @@ -230,14 +230,15 @@ where /// /// Read the value for given key. Returns the value, or None if it doesn't exist. /// - pub fn get(&self, search_key: &[u8; L]) -> Result> { + pub async fn get(&self, search_key: &[u8; L]) -> Result> { let mut result: Option = None; self.visit(search_key, VisitDirection::Forwards, |key, value| { if key == search_key { result = Some(value); } false - })?; + }) + .await?; Ok(result) } @@ -246,7 +247,7 @@ where /// will be called for every key >= 'search_key' (or <= 'search_key', if scanning /// backwards) /// - pub fn visit( + pub async fn visit( &self, search_key: &[u8; L], dir: VisitDirection, @@ -269,23 +270,9 @@ where V: FnMut(&[u8], u64) -> bool, { // Locate the node. - let blk = self.reader.read_blk(self.start_blk + node_blknum)?; + let node_buf = self.reader.read_blk(self.start_blk + node_blknum)?; - // Search all entries on this node - self.search_node(blk.as_ref(), search_key, dir, visitor) - } - - fn search_node( - &self, - node_buf: &[u8], - search_key: &[u8; L], - dir: VisitDirection, - visitor: &mut V, - ) -> Result - where - V: FnMut(&[u8], u64) -> bool, - { - let node = OnDiskNode::deparse(node_buf)?; + let node = OnDiskNode::deparse(node_buf.as_ref())?; let prefix_len = node.prefix_len as usize; let suffix_len = node.suffix_len as usize; @@ -782,12 +769,12 @@ mod tests { // Test the `get` function on all the keys. for (key, val) in all_data.iter() { - assert_eq!(reader.get(key)?, Some(*val)); + assert_eq!(reader.get(key).await?, Some(*val)); } // And on some keys that don't exist - assert_eq!(reader.get(b"aaaaaa")?, None); - assert_eq!(reader.get(b"zzzzzz")?, None); - assert_eq!(reader.get(b"xaaabx")?, None); + assert_eq!(reader.get(b"aaaaaa").await?, None); + assert_eq!(reader.get(b"zzzzzz").await?, None); + assert_eq!(reader.get(b"xaaabx").await?, None); // Test search with `visit` function let search_key = b"xabaaa"; @@ -798,10 +785,12 @@ mod tests { .collect(); let mut data = Vec::new(); - reader.visit(search_key, VisitDirection::Forwards, |key, value| { - data.push((key.to_vec(), value)); - true - })?; + reader + .visit(search_key, VisitDirection::Forwards, |key, value| { + data.push((key.to_vec(), value)); + true + }) + .await?; assert_eq!(data, expected); // Test a backwards scan @@ -812,16 +801,20 @@ mod tests { .collect(); expected.reverse(); let mut data = Vec::new(); - reader.visit(search_key, VisitDirection::Backwards, |key, value| { - data.push((key.to_vec(), value)); - true - })?; + reader + .visit(search_key, VisitDirection::Backwards, |key, value| { + data.push((key.to_vec(), value)); + true + }) + .await?; assert_eq!(data, expected); // Backward scan where nothing matches - reader.visit(b"aaaaaa", VisitDirection::Backwards, |key, value| { - panic!("found unexpected key {}: {}", hex::encode(key), value); - })?; + reader + .visit(b"aaaaaa", VisitDirection::Backwards, |key, value| { + panic!("found unexpected key {}: {}", hex::encode(key), value); + }) + .await?; // Full scan let expected: Vec<(Vec, u64)> = all_data @@ -829,10 +822,12 @@ mod tests { .map(|(key, value)| (key.to_vec(), *value)) .collect(); let mut data = Vec::new(); - reader.visit(&[0u8; 6], VisitDirection::Forwards, |key, value| { - data.push((key.to_vec(), value)); - true - })?; + reader + .visit(&[0u8; 6], VisitDirection::Forwards, |key, value| { + data.push((key.to_vec(), value)); + true + }) + .await?; assert_eq!(data, expected); Ok(()) @@ -880,13 +875,15 @@ mod tests { for search_key_int in 0..(NUM_KEYS * 2 + 10) { let search_key = u64::to_be_bytes(search_key_int); assert_eq!( - reader.get(&search_key)?, + reader.get(&search_key).await?, all_data.get(&search_key_int).cloned() ); // Test a forward scan starting with this key result.lock().unwrap().clear(); - reader.visit(&search_key, VisitDirection::Forwards, take_ten)?; + reader + .visit(&search_key, VisitDirection::Forwards, take_ten) + .await?; let expected = all_data .range(search_key_int..) .take(10) @@ -896,7 +893,9 @@ mod tests { // And a backwards scan result.lock().unwrap().clear(); - reader.visit(&search_key, VisitDirection::Backwards, take_ten)?; + reader + .visit(&search_key, VisitDirection::Backwards, take_ten) + .await?; let expected = all_data .range(..=search_key_int) .rev() @@ -910,7 +909,9 @@ mod tests { let search_key = u64::to_be_bytes(0); limit.store(usize::MAX, Ordering::Relaxed); result.lock().unwrap().clear(); - reader.visit(&search_key, VisitDirection::Forwards, take_ten)?; + reader + .visit(&search_key, VisitDirection::Forwards, take_ten) + .await?; let expected = all_data .iter() .map(|(&key, &val)| (key, val)) @@ -921,7 +922,9 @@ mod tests { let search_key = u64::to_be_bytes(u64::MAX); limit.store(usize::MAX, Ordering::Relaxed); result.lock().unwrap().clear(); - reader.visit(&search_key, VisitDirection::Backwards, take_ten)?; + reader + .visit(&search_key, VisitDirection::Backwards, take_ten) + .await?; let expected = all_data .iter() .rev() @@ -932,8 +935,8 @@ mod tests { Ok(()) } - #[test] - fn random_data() -> Result<()> { + #[tokio::test] + async fn random_data() -> Result<()> { // Generate random keys with exponential distribution, to // exercise the prefix compression const NUM_KEYS: usize = 100000; @@ -960,19 +963,23 @@ mod tests { // Test get() operation on all the keys for (&key, &val) in all_data.iter() { let search_key = u128::to_be_bytes(key); - assert_eq!(reader.get(&search_key)?, Some(val)); + assert_eq!(reader.get(&search_key).await?, Some(val)); } // Test get() operations on random keys, most of which will not exist for _ in 0..100000 { let key_int = rand::thread_rng().gen::(); let search_key = u128::to_be_bytes(key_int); - assert!(reader.get(&search_key)? == all_data.get(&key_int).cloned()); + assert!(reader.get(&search_key).await? == all_data.get(&key_int).cloned()); } // Test boundary cases - assert!(reader.get(&u128::to_be_bytes(u128::MIN))? == all_data.get(&u128::MIN).cloned()); - assert!(reader.get(&u128::to_be_bytes(u128::MAX))? == all_data.get(&u128::MAX).cloned()); + assert!( + reader.get(&u128::to_be_bytes(u128::MIN)).await? == all_data.get(&u128::MIN).cloned() + ); + assert!( + reader.get(&u128::to_be_bytes(u128::MAX)).await? == all_data.get(&u128::MAX).cloned() + ); Ok(()) } @@ -1014,15 +1021,17 @@ mod tests { // Test get() operation on all the keys for (key, val) in disk_btree_test_data::TEST_DATA { - assert_eq!(reader.get(&key)?, Some(val)); + assert_eq!(reader.get(&key).await?, Some(val)); } // Test full scan let mut count = 0; - reader.visit(&[0u8; 26], VisitDirection::Forwards, |_key, _value| { - count += 1; - true - })?; + reader + .visit(&[0u8; 26], VisitDirection::Forwards, |_key, _value| { + count += 1; + true + }) + .await?; assert_eq!(count, disk_btree_test_data::TEST_DATA.len()); reader.dump().await?; diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 54fcb6ec5d..70249eb60e 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -281,22 +281,24 @@ impl Layer for DeltaLayer { Ok(desc) }; - tree_reader.visit( - &[0u8; DELTA_KEY_SIZE], - VisitDirection::Forwards, - |delta_key, val| { - let blob_ref = BlobRef(val); - let key = DeltaKey::extract_key_from_buf(delta_key); - let lsn = DeltaKey::extract_lsn_from_buf(delta_key); + tree_reader + .visit( + &[0u8; DELTA_KEY_SIZE], + VisitDirection::Forwards, + |delta_key, val| { + let blob_ref = BlobRef(val); + let key = DeltaKey::extract_key_from_buf(delta_key); + let lsn = DeltaKey::extract_lsn_from_buf(delta_key); - let desc = match dump_blob(blob_ref) { - Ok(desc) => desc, - Err(err) => format!("ERROR: {}", err), - }; - println!(" key {} at {}: {}", key, lsn, desc); - true - }, - )?; + let desc = match dump_blob(blob_ref) { + Ok(desc) => desc, + Err(err) => format!("ERROR: {}", err), + }; + println!(" key {} at {}: {}", key, lsn, desc); + true + }, + ) + .await?; Ok(()) } @@ -328,19 +330,21 @@ impl Layer for DeltaLayer { let mut offsets: Vec<(Lsn, u64)> = Vec::new(); - tree_reader.visit(&search_key.0, VisitDirection::Backwards, |key, value| { - let blob_ref = BlobRef(value); - if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] { - return false; - } - let entry_lsn = DeltaKey::extract_lsn_from_buf(key); - if entry_lsn < lsn_range.start { - return false; - } - offsets.push((entry_lsn, blob_ref.pos())); + tree_reader + .visit(&search_key.0, VisitDirection::Backwards, |key, value| { + let blob_ref = BlobRef(value); + if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] { + return false; + } + let entry_lsn = DeltaKey::extract_lsn_from_buf(key); + if entry_lsn < lsn_range.start { + return false; + } + offsets.push((entry_lsn, blob_ref.pos())); - !blob_ref.will_init() - })?; + !blob_ref.will_init() + }) + .await?; // Ok, 'offsets' now contains the offsets of all the entries we need to read let cursor = file.block_cursor(); @@ -618,7 +622,9 @@ impl DeltaLayer { let inner = self .load(LayerAccessKind::KeyIter, ctx) .context("load delta layer")?; - DeltaLayerInner::load_val_refs(inner).context("Layer index is corrupted") + DeltaLayerInner::load_val_refs(inner) + .await + .context("Layer index is corrupted") } /// Loads all keys stored in the layer. Returns key, lsn and value size. @@ -626,7 +632,9 @@ impl DeltaLayer { let inner = self .load(LayerAccessKind::KeyIter, ctx) .context("load delta layer keys")?; - inner.load_keys().context("Layer index is corrupted") + DeltaLayerInner::load_keys(inner) + .await + .context("Layer index is corrupted") } } @@ -899,7 +907,7 @@ impl Drop for DeltaLayerWriter { } impl DeltaLayerInner { - fn load_val_refs(this: &Arc) -> Result> { + async fn load_val_refs(this: &Arc) -> Result> { let file = &this.file; let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new( this.index_start_blk, @@ -908,23 +916,25 @@ impl DeltaLayerInner { ); let mut all_offsets = Vec::<(Key, Lsn, ValueRef)>::new(); - tree_reader.visit( - &[0u8; DELTA_KEY_SIZE], - VisitDirection::Forwards, - |key, value| { - let delta_key = DeltaKey::from_slice(key); - let val_ref = ValueRef { - blob_ref: BlobRef(value), - reader: BlockCursor::new(Adapter(this.clone())), - }; - all_offsets.push((delta_key.key(), delta_key.lsn(), val_ref)); - true - }, - )?; + tree_reader + .visit( + &[0u8; DELTA_KEY_SIZE], + VisitDirection::Forwards, + |key, value| { + let delta_key = DeltaKey::from_slice(key); + let val_ref = ValueRef { + blob_ref: BlobRef(value), + reader: BlockCursor::new(Adapter(this.clone())), + }; + all_offsets.push((delta_key.key(), delta_key.lsn(), val_ref)); + true + }, + ) + .await?; Ok(all_offsets) } - fn load_keys(&self) -> Result> { + async fn load_keys(&self) -> Result> { let file = &self.file; let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new( self.index_start_blk, @@ -933,26 +943,28 @@ impl DeltaLayerInner { ); let mut all_keys: Vec<(Key, Lsn, u64)> = Vec::new(); - tree_reader.visit( - &[0u8; DELTA_KEY_SIZE], - VisitDirection::Forwards, - |key, value| { - let delta_key = DeltaKey::from_slice(key); - let pos = BlobRef(value).pos(); - if let Some(last) = all_keys.last_mut() { - if last.0 == delta_key.key() { - return true; - } else { - // subtract offset of new key BLOB and first blob of this key - // to get total size if values associated with this key - let first_pos = last.2; - last.2 = pos - first_pos; + tree_reader + .visit( + &[0u8; DELTA_KEY_SIZE], + VisitDirection::Forwards, + |key, value| { + let delta_key = DeltaKey::from_slice(key); + let pos = BlobRef(value).pos(); + if let Some(last) = all_keys.last_mut() { + if last.0 == delta_key.key() { + return true; + } else { + // subtract offset of new key BLOB and first blob of this key + // to get total size if values associated with this key + let first_pos = last.2; + last.2 = pos - first_pos; + } } - } - all_keys.push((delta_key.key(), delta_key.lsn(), pos)); - true - }, - )?; + all_keys.push((delta_key.key(), delta_key.lsn(), pos)); + true + }, + ) + .await?; if let Some(last) = all_keys.last_mut() { // Last key occupies all space till end of layer last.2 = std::fs::metadata(&file.file.path)?.len() - last.2; diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index f3aaed61b9..c92d27a6b8 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -175,10 +175,12 @@ impl Layer for ImageLayer { tree_reader.dump().await?; - tree_reader.visit(&[0u8; KEY_SIZE], VisitDirection::Forwards, |key, value| { - println!("key: {} offset {}", hex::encode(key), value); - true - })?; + tree_reader + .visit(&[0u8; KEY_SIZE], VisitDirection::Forwards, |key, value| { + println!("key: {} offset {}", hex::encode(key), value); + true + }) + .await?; Ok(()) } @@ -202,7 +204,7 @@ impl Layer for ImageLayer { let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE]; key.write_to_byte_slice(&mut keybuf); - if let Some(offset) = tree_reader.get(&keybuf)? { + if let Some(offset) = tree_reader.get(&keybuf).await? { let blob = file.block_cursor().read_blob(offset).with_context(|| { format!( "failed to read value from data file {} at offset {}",