From 9dec65b75b5262c63d89ecaaf85a2dfb4d5e84f1 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Tue, 5 Mar 2024 13:35:45 +0000 Subject: [PATCH] pageserver: fix vectored read path delta layer index traversal (#7001) ## Problem Last weeks enablement of vectored get generated a number of panics. From them, I diagnosed two issues in the delta layer index traversal logic 1. The `key >= range.start && lsn >= lsn_range.start` was too aggressive. Lsns are not monotonically increasing in the delta layer index (keys are though), so we cannot assert on them. 2. Lsns greater or equal to `lsn_range.end` were not skipped. This caused the query to consider records newer than the request Lsn. ## Summary of changes * Fix the issues mentioned above inline * Refactor the layer traversal logic to make it unit testable * Add unit test which reproduces the failure modes listed above. --- pageserver/src/tenant/disk_btree.rs | 95 ++++++- .../src/tenant/storage_layer/delta_layer.rs | 257 ++++++++++++++---- .../src/tenant/storage_layer/image_layer.rs | 44 +-- pageserver/src/tenant/vectored_blob_io.rs | 12 +- 4 files changed, 322 insertions(+), 86 deletions(-) diff --git a/pageserver/src/tenant/disk_btree.rs b/pageserver/src/tenant/disk_btree.rs index ca30b0ac4f..6d85d1e60e 100644 --- a/pageserver/src/tenant/disk_btree.rs +++ b/pageserver/src/tenant/disk_btree.rs @@ -18,10 +18,19 @@ //! - An Iterator interface would be more convenient for the callers than the //! 'visit' function //! +use async_stream::try_stream; use byteorder::{ReadBytesExt, BE}; use bytes::{BufMut, Bytes, BytesMut}; use either::Either; -use std::{cmp::Ordering, io, result}; +use futures::Stream; +use hex; +use std::{ + cmp::Ordering, + io, + iter::Rev, + ops::{Range, RangeInclusive}, + result, +}; use thiserror::Error; use tracing::error; @@ -250,6 +259,90 @@ where Ok(result) } + /// Return a stream which yields all key, value pairs from the index + /// starting from the first key greater or equal to `start_key`. + /// + /// Note that this is a copy of [`Self::visit`]. + /// TODO: Once the sequential read path is removed this will become + /// the only index traversal method. + pub fn get_stream_from<'a>( + &'a self, + start_key: &'a [u8; L], + ctx: &'a RequestContext, + ) -> impl Stream, u64), DiskBtreeError>> + 'a { + try_stream! { + let mut stack = Vec::new(); + stack.push((self.root_blk, None)); + let block_cursor = self.reader.block_cursor(); + while let Some((node_blknum, opt_iter)) = stack.pop() { + // Locate the node. + let node_buf = block_cursor + .read_blk(self.start_blk + node_blknum, ctx) + .await?; + + let node = OnDiskNode::deparse(node_buf.as_ref())?; + let prefix_len = node.prefix_len as usize; + let suffix_len = node.suffix_len as usize; + + assert!(node.num_children > 0); + + let mut keybuf = Vec::new(); + keybuf.extend(node.prefix); + keybuf.resize(prefix_len + suffix_len, 0); + + let mut iter: Either, Rev>> = if let Some(iter) = opt_iter { + iter + } else { + // Locate the first match + let idx = match node.binary_search(start_key, keybuf.as_mut_slice()) { + Ok(idx) => idx, + Err(idx) => { + if node.level == 0 { + // Imagine that the node contains the following keys: + // + // 1 + // 3 <-- idx + // 5 + // + // If the search key is '2' and there is exact match, + // the binary search would return the index of key + // '3'. That's cool, '3' is the first key to return. + idx + } else { + // This is an internal page, so each key represents a lower + // bound for what's in the child page. If there is no exact + // match, we have to return the *previous* entry. + // + // 1 <-- return this + // 3 <-- idx + // 5 + idx.saturating_sub(1) + } + } + }; + Either::Left(idx..node.num_children.into()) + }; + + // idx points to the first match now. Keep going from there + while let Some(idx) = iter.next() { + let key_off = idx * suffix_len; + let suffix = &node.keys[key_off..key_off + suffix_len]; + keybuf[prefix_len..].copy_from_slice(suffix); + let value = node.value(idx); + #[allow(clippy::collapsible_if)] + if node.level == 0 { + // leaf + yield (keybuf.clone(), value.to_u64()); + } else { + stack.push((node_blknum, Some(iter))); + stack.push((value.to_blknum(), None)); + break; + } + } + } + } + } + /// /// Scan the tree, starting from 'search_key', in the given direction. 'visitor' /// will be called for every key >= 'search_key' (or <= 'search_key', if scanning diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 5eaf1cc1ce..b7132ee3bf 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -46,6 +46,7 @@ use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION}; use anyhow::{anyhow, bail, ensure, Context, Result}; use bytes::BytesMut; use camino::{Utf8Path, Utf8PathBuf}; +use futures::StreamExt; use pageserver_api::keyspace::KeySpace; use pageserver_api::models::LayerAccessKind; use pageserver_api::shard::TenantShardId; @@ -847,10 +848,33 @@ impl DeltaLayerInner { reconstruct_state: &mut ValuesReconstructState, ctx: &RequestContext, ) -> Result<(), GetVectoredError> { - let reads = self - .plan_reads(keyspace, lsn_range, reconstruct_state, ctx) - .await - .map_err(GetVectoredError::Other)?; + let block_reader = FileBlockReader::new(&self.file, self.file_id); + let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new( + self.index_start_blk, + self.index_root_blk, + block_reader, + ); + + let planner = VectoredReadPlanner::new( + self.max_vectored_read_bytes + .expect("Layer is loaded with max vectored bytes config") + .0 + .into(), + ); + + let data_end_offset = self.index_start_blk as u64 * PAGE_SZ as u64; + + let reads = Self::plan_reads( + keyspace, + lsn_range, + data_end_offset, + index_reader, + planner, + reconstruct_state, + ctx, + ) + .await + .map_err(GetVectoredError::Other)?; self.do_reads_and_update_state(reads, reconstruct_state) .await; @@ -858,73 +882,64 @@ impl DeltaLayerInner { Ok(()) } - async fn plan_reads( - &self, + async fn plan_reads( keyspace: KeySpace, lsn_range: Range, + data_end_offset: u64, + index_reader: DiskBtreeReader, + mut planner: VectoredReadPlanner, reconstruct_state: &mut ValuesReconstructState, ctx: &RequestContext, - ) -> anyhow::Result> { - let mut planner = VectoredReadPlanner::new( - self.max_vectored_read_bytes - .expect("Layer is loaded with max vectored bytes config") - .0 - .into(), - ); - - let block_reader = FileBlockReader::new(&self.file, self.file_id); - let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new( - self.index_start_blk, - self.index_root_blk, - block_reader, - ); + ) -> anyhow::Result> + where + Reader: BlockReader, + { + let ctx = RequestContextBuilder::extend(ctx) + .page_content_kind(PageContentKind::DeltaLayerBtreeNode) + .build(); for range in keyspace.ranges.iter() { let mut range_end_handled = false; let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start); - tree_reader - .visit( - &start_key.0, - VisitDirection::Forwards, - |raw_key, value| { - let key = Key::from_slice(&raw_key[..KEY_SIZE]); - let lsn = DeltaKey::extract_lsn_from_buf(raw_key); - let blob_ref = BlobRef(value); + let index_stream = index_reader.get_stream_from(&start_key.0, &ctx); + let mut index_stream = std::pin::pin!(index_stream); - assert!(key >= range.start && lsn >= lsn_range.start); + while let Some(index_entry) = index_stream.next().await { + let (raw_key, value) = index_entry?; + let key = Key::from_slice(&raw_key[..KEY_SIZE]); + let lsn = DeltaKey::extract_lsn_from_buf(&raw_key); + let blob_ref = BlobRef(value); - let cached_lsn = reconstruct_state.get_cached_lsn(&key); - let flag = { - if cached_lsn >= Some(lsn) { - BlobFlag::Ignore - } else if blob_ref.will_init() { - BlobFlag::Replaces - } else { - BlobFlag::None - } - }; + // Lsns are not monotonically increasing across keys, so we don't assert on them. + assert!(key >= range.start); - if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) { - planner.handle_range_end(blob_ref.pos()); - range_end_handled = true; - false - } else { - planner.handle(key, lsn, blob_ref.pos(), flag); - true - } - }, - &RequestContextBuilder::extend(ctx) - .page_content_kind(PageContentKind::DeltaLayerBtreeNode) - .build(), - ) - .await - .map_err(|err| anyhow!(err))?; + let outside_lsn_range = !lsn_range.contains(&lsn); + let below_cached_lsn = reconstruct_state.get_cached_lsn(&key) >= Some(lsn); + + let flag = { + if outside_lsn_range || below_cached_lsn { + BlobFlag::Ignore + } else if blob_ref.will_init() { + BlobFlag::ReplaceAll + } else { + // Usual path: add blob to the read + BlobFlag::None + } + }; + + if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) { + planner.handle_range_end(blob_ref.pos()); + range_end_handled = true; + break; + } else { + planner.handle(key, lsn, blob_ref.pos(), flag); + } + } if !range_end_handled { - let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64; - tracing::info!("Handling range end fallback at {}", payload_end); - planner.handle_range_end(payload_end); + tracing::info!("Handling range end fallback at {}", data_end_offset); + planner.handle_range_end(data_end_offset); } } @@ -1190,3 +1205,131 @@ impl<'a> pageserver_compaction::interface::CompactionDeltaEntry<'a, Key> for Del self.size } } + +#[cfg(test)] +mod test { + use std::collections::BTreeMap; + + use super::*; + use crate::{ + context::DownloadBehavior, task_mgr::TaskKind, tenant::disk_btree::tests::TestDisk, + }; + + /// Construct an index for a fictional delta layer and and then + /// traverse in order to plan vectored reads for a query. Finally, + /// verify that the traversal fed the right index key and value + /// pairs into the planner. + #[tokio::test] + async fn test_delta_layer_index_traversal() { + let base_key = Key { + field1: 0, + field2: 1663, + field3: 12972, + field4: 16396, + field5: 0, + field6: 246080, + }; + + // Populate the index with some entries + let entries: BTreeMap> = BTreeMap::from([ + (base_key, vec![Lsn(1), Lsn(5), Lsn(25), Lsn(26), Lsn(28)]), + (base_key.add(1), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]), + (base_key.add(2), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]), + (base_key.add(5), vec![Lsn(10), Lsn(15), Lsn(16), Lsn(20)]), + ]); + + let mut disk = TestDisk::default(); + let mut writer = DiskBtreeBuilder::<_, DELTA_KEY_SIZE>::new(&mut disk); + + let mut disk_offset = 0; + for (key, lsns) in &entries { + for lsn in lsns { + let index_key = DeltaKey::from_key_lsn(key, *lsn); + let blob_ref = BlobRef::new(disk_offset, false); + writer + .append(&index_key.0, blob_ref.0) + .expect("In memory disk append should never fail"); + + disk_offset += 1; + } + } + + // Prepare all the arguments for the call into `plan_reads` below + let (root_offset, _writer) = writer + .finish() + .expect("In memory disk finish should never fail"); + let reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(0, root_offset, disk); + let planner = VectoredReadPlanner::new(100); + let mut reconstruct_state = ValuesReconstructState::new(); + let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); + + let keyspace = KeySpace { + ranges: vec![ + base_key..base_key.add(3), + base_key.add(3)..base_key.add(100), + ], + }; + let lsn_range = Lsn(2)..Lsn(40); + + // Plan and validate + let vectored_reads = DeltaLayerInner::plan_reads( + keyspace.clone(), + lsn_range.clone(), + disk_offset, + reader, + planner, + &mut reconstruct_state, + &ctx, + ) + .await + .expect("Read planning should not fail"); + + validate(keyspace, lsn_range, vectored_reads, entries); + } + + fn validate( + keyspace: KeySpace, + lsn_range: Range, + vectored_reads: Vec, + index_entries: BTreeMap>, + ) { + #[derive(Debug, PartialEq, Eq)] + struct BlobSpec { + key: Key, + lsn: Lsn, + at: u64, + } + + let mut planned_blobs = Vec::new(); + for read in vectored_reads { + for (at, meta) in read.blobs_at.as_slice() { + planned_blobs.push(BlobSpec { + key: meta.key, + lsn: meta.lsn, + at: *at, + }); + } + } + + let mut expected_blobs = Vec::new(); + let mut disk_offset = 0; + for (key, lsns) in index_entries { + for lsn in lsns { + let key_included = keyspace.ranges.iter().any(|range| range.contains(&key)); + let lsn_included = lsn_range.contains(&lsn); + + if key_included && lsn_included { + expected_blobs.push(BlobSpec { + key, + lsn, + at: disk_offset, + }); + } + + disk_offset += 1; + } + } + + assert_eq!(planned_blobs, expected_blobs); + } +} diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 56cfaeda15..14c79e413c 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -43,6 +43,7 @@ use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX}; use anyhow::{anyhow, bail, ensure, Context, Result}; use bytes::{Bytes, BytesMut}; use camino::{Utf8Path, Utf8PathBuf}; +use hex; use pageserver_api::keyspace::KeySpace; use pageserver_api::models::LayerAccessKind; use pageserver_api::shard::TenantShardId; @@ -54,6 +55,7 @@ use std::ops::Range; use std::os::unix::prelude::FileExt; use std::sync::Arc; use tokio::sync::OnceCell; +use tokio_stream::StreamExt; use tracing::*; use utils::{ @@ -488,35 +490,33 @@ impl ImageLayerInner { let tree_reader = DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader); + let ctx = RequestContextBuilder::extend(ctx) + .page_content_kind(PageContentKind::ImageLayerBtreeNode) + .build(); + for range in keyspace.ranges.iter() { let mut range_end_handled = false; let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE]; range.start.write_to_byte_slice(&mut search_key); - tree_reader - .visit( - &search_key, - VisitDirection::Forwards, - |raw_key, offset| { - let key = Key::from_slice(&raw_key[..KEY_SIZE]); - assert!(key >= range.start); + let index_stream = tree_reader.get_stream_from(&search_key, &ctx); + let mut index_stream = std::pin::pin!(index_stream); - if key >= range.end { - planner.handle_range_end(offset); - range_end_handled = true; - false - } else { - planner.handle(key, self.lsn, offset, BlobFlag::None); - true - } - }, - &RequestContextBuilder::extend(ctx) - .page_content_kind(PageContentKind::ImageLayerBtreeNode) - .build(), - ) - .await - .map_err(|err| GetVectoredError::Other(anyhow!(err)))?; + while let Some(index_entry) = index_stream.next().await { + let (raw_key, offset) = index_entry?; + + let key = Key::from_slice(&raw_key[..KEY_SIZE]); + assert!(key >= range.start); + + if key >= range.end { + planner.handle_range_end(offset); + range_end_handled = true; + break; + } else { + planner.handle(key, self.lsn, offset, BlobFlag::None); + } + } if !range_end_handled { let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64; diff --git a/pageserver/src/tenant/vectored_blob_io.rs b/pageserver/src/tenant/vectored_blob_io.rs index a8d9649d36..805f70b23b 100644 --- a/pageserver/src/tenant/vectored_blob_io.rs +++ b/pageserver/src/tenant/vectored_blob_io.rs @@ -128,7 +128,7 @@ impl VectoredReadBuilder { pub enum BlobFlag { None, Ignore, - Replaces, + ReplaceAll, } /// Planner for vectored blob reads. @@ -170,7 +170,7 @@ impl VectoredReadPlanner { /// incorrect data to the user. /// /// The `flag` argument has two interesting values: - /// * [`BlobFlag::Replaces`]: The blob for this key should replace all existing blobs. + /// * [`BlobFlag::ReplaceAll`]: The blob for this key should replace all existing blobs. /// This is used for WAL records that `will_init`. /// * [`BlobFlag::Ignore`]: This blob should not be included in the read. This happens /// if the blob is cached. @@ -204,7 +204,7 @@ impl VectoredReadPlanner { let blobs_for_key = self.blobs.entry(key).or_default(); blobs_for_key.push((lsn, start_offset, end_offset)); } - BlobFlag::Replaces => { + BlobFlag::ReplaceAll => { let blobs_for_key = self.blobs.entry(key).or_default(); blobs_for_key.clear(); blobs_for_key.push((lsn, start_offset, end_offset)); @@ -411,10 +411,10 @@ mod tests { let blob_descriptions = vec![ (first_key, lsn, 0, BlobFlag::None), // First in read 1 (first_key, lsn, 1024, BlobFlag::None), // Last in read 1 - (second_key, lsn, 2 * 1024, BlobFlag::Replaces), + (second_key, lsn, 2 * 1024, BlobFlag::ReplaceAll), (second_key, lsn, 3 * 1024, BlobFlag::None), - (second_key, lsn, 4 * 1024, BlobFlag::Replaces), // First in read 2 - (second_key, lsn, 5 * 1024, BlobFlag::None), // Last in read 2 + (second_key, lsn, 4 * 1024, BlobFlag::ReplaceAll), // First in read 2 + (second_key, lsn, 5 * 1024, BlobFlag::None), // Last in read 2 ]; let ranges = [&blob_descriptions[0..2], &blob_descriptions[4..]];