mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-09 06:22:57 +00:00
Refactor Layer::get_page_reconstruct_data function
Previously, the InMemoryLayer and DeltaLayer implementations of get_page_reconstruct_data would recursively call the predecessor layer's get_page_reconstruct_data function. Refactor so that we iterate in the caller instead. Make get_page_reconstruct_data() return the predecessor layer along with the continuation LSN, so that the caller can iterate. IMO this makes the logic more clear, although this is more lines of code.
This commit is contained in:
@@ -55,7 +55,9 @@ use image_layer::ImageLayer;
|
||||
use filename::{DeltaFileName, ImageFileName};
|
||||
use inmemory_layer::InMemoryLayer;
|
||||
use layer_map::LayerMap;
|
||||
use storage_layer::{Layer, PageReconstructData, SegmentTag, RELISH_SEG_SIZE};
|
||||
use storage_layer::{
|
||||
Layer, PageReconstructData, PageReconstructResult, SegmentTag, RELISH_SEG_SIZE,
|
||||
};
|
||||
|
||||
static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]);
|
||||
|
||||
@@ -1319,25 +1321,45 @@ impl LayeredTimeline {
|
||||
page_img: None,
|
||||
};
|
||||
|
||||
if let Some(_cont_lsn) = layer.get_page_reconstruct_data(blknum, lsn, &mut data)? {
|
||||
// The layers are currently fully self-contained, so we should have found all
|
||||
// the data we need to reconstruct the page in the layer.
|
||||
if data.records.is_empty() {
|
||||
// no records, and no base image. This can happen if PostgreSQL extends a relation
|
||||
// but never writes the page.
|
||||
//
|
||||
// Would be nice to detect that situation better.
|
||||
warn!("Page {} blk {} at {} not found", seg.rel, blknum, lsn);
|
||||
return Ok(ZERO_PAGE.clone());
|
||||
// Holds an Arc reference to 'layer_ref' when iterating in the loop below.
|
||||
let mut layer_arc: Arc<dyn Layer>;
|
||||
|
||||
// Call the layer's get_page_reconstruct_data function to get the base image
|
||||
// and WAL records needed to materialize the page. If it returns 'Continue',
|
||||
// call it again on the predecessor layer until we have all the required data.
|
||||
let mut layer_ref = layer;
|
||||
let mut curr_lsn = lsn;
|
||||
loop {
|
||||
match layer_ref.get_page_reconstruct_data(blknum, curr_lsn, &mut data)? {
|
||||
PageReconstructResult::Complete => break,
|
||||
PageReconstructResult::Continue(cont_lsn, cont_layer) => {
|
||||
// Fetch base image / more WAL from the returned predecessor layer
|
||||
layer_arc = cont_layer;
|
||||
layer_ref = &*layer_arc;
|
||||
curr_lsn = cont_lsn;
|
||||
continue;
|
||||
}
|
||||
PageReconstructResult::Missing(lsn) => {
|
||||
// Oops, we could not reconstruct the page.
|
||||
if data.records.is_empty() {
|
||||
// no records, and no base image. This can happen if PostgreSQL extends a relation
|
||||
// but never writes the page.
|
||||
//
|
||||
// Would be nice to detect that situation better.
|
||||
warn!("Page {} blk {} at {} not found", seg.rel, blknum, lsn);
|
||||
return Ok(ZERO_PAGE.clone());
|
||||
}
|
||||
bail!(
|
||||
"No base image found for page {} blk {} at {}/{}",
|
||||
seg.rel,
|
||||
blknum,
|
||||
self.timelineid,
|
||||
lsn,
|
||||
);
|
||||
}
|
||||
}
|
||||
bail!(
|
||||
"No base image found for page {} blk {} at {}/{}",
|
||||
seg.rel,
|
||||
blknum,
|
||||
self.timelineid,
|
||||
lsn,
|
||||
);
|
||||
}
|
||||
|
||||
self.reconstruct_page(seg.rel, blknum, lsn, data)
|
||||
}
|
||||
|
||||
|
||||
@@ -40,7 +40,7 @@
|
||||
use crate::layered_repository::blob::BlobWriter;
|
||||
use crate::layered_repository::filename::{DeltaFileName, PathOrConf};
|
||||
use crate::layered_repository::storage_layer::{
|
||||
Layer, PageReconstructData, PageVersion, SegmentTag,
|
||||
Layer, PageReconstructData, PageReconstructResult, PageVersion, SegmentTag,
|
||||
};
|
||||
use crate::repository::WALRecord;
|
||||
use crate::waldecoder;
|
||||
@@ -167,15 +167,19 @@ impl Layer for DeltaLayer {
|
||||
blknum: u32,
|
||||
lsn: Lsn,
|
||||
reconstruct_data: &mut PageReconstructData,
|
||||
) -> Result<Option<Lsn>> {
|
||||
// Scan the BTreeMap backwards, starting from the given entry.
|
||||
let mut need_base_image_lsn: Option<Lsn> = Some(lsn);
|
||||
) -> Result<PageReconstructResult> {
|
||||
let mut cont_lsn: Option<Lsn> = Some(lsn);
|
||||
|
||||
assert!(self.seg.blknum_in_seg(blknum));
|
||||
|
||||
// TODO: avoid opening the snapshot file for each read
|
||||
let (_path, book) = self.open_book()?;
|
||||
let page_version_reader = book.chapter_reader(PAGE_VERSIONS_CHAPTER)?;
|
||||
{
|
||||
// Open the file and lock the metadata in memory
|
||||
// TODO: avoid opening the snapshot file for each read
|
||||
let (_path, book) = self.open_book()?;
|
||||
let page_version_reader = book.chapter_reader(PAGE_VERSIONS_CHAPTER)?;
|
||||
let inner = self.load()?;
|
||||
|
||||
// Scan the metadata BTreeMap backwards, starting from the given entry.
|
||||
let minkey = (blknum, Lsn(0));
|
||||
let maxkey = (blknum, lsn);
|
||||
let mut iter = inner
|
||||
@@ -183,9 +187,10 @@ impl Layer for DeltaLayer {
|
||||
.range((Included(&minkey), Included(&maxkey)));
|
||||
while let Some(((_blknum, entry_lsn), entry)) = iter.next_back() {
|
||||
if let Some(img_range) = &entry.page_image_range {
|
||||
// Found a page image, return it
|
||||
let img = Bytes::from(read_blob(&page_version_reader, img_range)?);
|
||||
reconstruct_data.page_img = Some(img);
|
||||
need_base_image_lsn = None;
|
||||
cont_lsn = None;
|
||||
break;
|
||||
} else if let Some(rec_range) = &entry.record_range {
|
||||
let rec = WALRecord::des(&read_blob(&page_version_reader, rec_range)?)?;
|
||||
@@ -193,10 +198,11 @@ impl Layer for DeltaLayer {
|
||||
reconstruct_data.records.push(rec);
|
||||
if will_init {
|
||||
// This WAL record initializes the page, so no need to go further back
|
||||
need_base_image_lsn = None;
|
||||
cont_lsn = None;
|
||||
break;
|
||||
} else {
|
||||
need_base_image_lsn = Some(*entry_lsn);
|
||||
// This WAL record needs to be applied against an older page image
|
||||
cont_lsn = Some(*entry_lsn);
|
||||
}
|
||||
} else {
|
||||
// No base image, and no WAL record. Huh?
|
||||
@@ -204,28 +210,23 @@ impl Layer for DeltaLayer {
|
||||
}
|
||||
}
|
||||
|
||||
// Use the base image, if needed
|
||||
if let Some(need_lsn) = need_base_image_lsn {
|
||||
if let Some(predecessor) = &self.predecessor {
|
||||
need_base_image_lsn = predecessor.get_page_reconstruct_data(
|
||||
blknum,
|
||||
need_lsn,
|
||||
reconstruct_data,
|
||||
)?;
|
||||
} else {
|
||||
bail!(
|
||||
"no base img found for {} at blk {} at LSN {}",
|
||||
self.seg,
|
||||
blknum,
|
||||
lsn
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// release lock on 'inner'
|
||||
// release metadata lock and close the file
|
||||
}
|
||||
|
||||
Ok(need_base_image_lsn)
|
||||
// If an older page image is needed to reconstruct the page, let the
|
||||
// caller know about the predecessor layer.
|
||||
if let Some(cont_lsn) = cont_lsn {
|
||||
if let Some(cont_layer) = &self.predecessor {
|
||||
Ok(PageReconstructResult::Continue(
|
||||
cont_lsn,
|
||||
Arc::clone(cont_layer),
|
||||
))
|
||||
} else {
|
||||
Ok(PageReconstructResult::Missing(cont_lsn))
|
||||
}
|
||||
} else {
|
||||
Ok(PageReconstructResult::Complete)
|
||||
}
|
||||
}
|
||||
|
||||
/// Get size of the relation at given LSN
|
||||
|
||||
@@ -21,12 +21,14 @@
|
||||
//! For non-blocky segments, the image can be found in NONBLOCKY_IMAGE_CHAPTER.
|
||||
//!
|
||||
use crate::layered_repository::filename::{ImageFileName, PathOrConf};
|
||||
use crate::layered_repository::storage_layer::{Layer, PageReconstructData, SegmentTag};
|
||||
use crate::layered_repository::storage_layer::{
|
||||
Layer, PageReconstructData, PageReconstructResult, SegmentTag,
|
||||
};
|
||||
use crate::layered_repository::LayeredTimeline;
|
||||
use crate::layered_repository::RELISH_SEG_SIZE;
|
||||
use crate::PageServerConf;
|
||||
use crate::{ZTenantId, ZTimelineId};
|
||||
use anyhow::{anyhow, bail, ensure, Result};
|
||||
use anyhow::{anyhow, ensure, Result};
|
||||
use bytes::Bytes;
|
||||
use log::*;
|
||||
use std::convert::TryInto;
|
||||
@@ -120,7 +122,7 @@ impl Layer for ImageLayer {
|
||||
blknum: u32,
|
||||
lsn: Lsn,
|
||||
reconstruct_data: &mut PageReconstructData,
|
||||
) -> Result<Option<Lsn>> {
|
||||
) -> Result<PageReconstructResult> {
|
||||
assert!(lsn >= self.lsn);
|
||||
|
||||
let inner = self.load()?;
|
||||
@@ -132,12 +134,7 @@ impl Layer for ImageLayer {
|
||||
let buf = match &inner.image_type {
|
||||
ImageType::Blocky { num_blocks } => {
|
||||
if base_blknum >= *num_blocks {
|
||||
bail!(
|
||||
"no base img found for {} at blk {} at LSN {}",
|
||||
self.seg,
|
||||
base_blknum,
|
||||
lsn
|
||||
);
|
||||
return Ok(PageReconstructResult::Missing(lsn));
|
||||
}
|
||||
|
||||
let mut buf = vec![0u8; BLOCK_SIZE];
|
||||
@@ -155,7 +152,7 @@ impl Layer for ImageLayer {
|
||||
};
|
||||
|
||||
reconstruct_data.page_img = Some(Bytes::from(buf));
|
||||
Ok(None)
|
||||
Ok(PageReconstructResult::Complete)
|
||||
}
|
||||
|
||||
/// Get size of the segment
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
//!
|
||||
use crate::layered_repository::filename::DeltaFileName;
|
||||
use crate::layered_repository::storage_layer::{
|
||||
Layer, PageReconstructData, PageVersion, SegmentTag, RELISH_SEG_SIZE,
|
||||
Layer, PageReconstructData, PageReconstructResult, PageVersion, SegmentTag, RELISH_SEG_SIZE,
|
||||
};
|
||||
use crate::layered_repository::LayeredTimeline;
|
||||
use crate::layered_repository::{DeltaLayer, ImageLayer};
|
||||
@@ -134,14 +134,15 @@ impl Layer for InMemoryLayer {
|
||||
blknum: u32,
|
||||
lsn: Lsn,
|
||||
reconstruct_data: &mut PageReconstructData,
|
||||
) -> Result<Option<Lsn>> {
|
||||
// Scan the BTreeMap backwards, starting from reconstruct_data.lsn.
|
||||
let mut need_base_image_lsn: Option<Lsn> = Some(lsn);
|
||||
) -> Result<PageReconstructResult> {
|
||||
let mut cont_lsn: Option<Lsn> = Some(lsn);
|
||||
|
||||
assert!(self.seg.blknum_in_seg(blknum));
|
||||
|
||||
{
|
||||
let inner = self.inner.lock().unwrap();
|
||||
|
||||
// Scan the BTreeMap backwards, starting from reconstruct_data.lsn.
|
||||
let minkey = (blknum, Lsn(0));
|
||||
let maxkey = (blknum, lsn);
|
||||
let mut iter = inner
|
||||
@@ -150,16 +151,17 @@ impl Layer for InMemoryLayer {
|
||||
while let Some(((_blknum, entry_lsn), entry)) = iter.next_back() {
|
||||
if let Some(img) = &entry.page_image {
|
||||
reconstruct_data.page_img = Some(img.clone());
|
||||
need_base_image_lsn = None;
|
||||
cont_lsn = None;
|
||||
break;
|
||||
} else if let Some(rec) = &entry.record {
|
||||
reconstruct_data.records.push(rec.clone());
|
||||
if rec.will_init {
|
||||
// This WAL record initializes the page, so no need to go further back
|
||||
need_base_image_lsn = None;
|
||||
cont_lsn = None;
|
||||
break;
|
||||
} else {
|
||||
need_base_image_lsn = Some(*entry_lsn);
|
||||
// This WAL record needs to be applied against an older page image
|
||||
cont_lsn = Some(*entry_lsn);
|
||||
}
|
||||
} else {
|
||||
// No base image, and no WAL record. Huh?
|
||||
@@ -167,25 +169,23 @@ impl Layer for InMemoryLayer {
|
||||
}
|
||||
}
|
||||
|
||||
// Use the base image, if needed
|
||||
if let Some(need_lsn) = need_base_image_lsn {
|
||||
if let Some(predecessor) = &self.predecessor {
|
||||
need_base_image_lsn =
|
||||
predecessor.get_page_reconstruct_data(blknum, need_lsn, reconstruct_data)?;
|
||||
} else {
|
||||
bail!(
|
||||
"no base img found for {} at blk {} at LSN {}",
|
||||
self.seg,
|
||||
blknum,
|
||||
lsn
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// release lock on 'inner'
|
||||
}
|
||||
|
||||
Ok(need_base_image_lsn)
|
||||
// If an older page image is needed to reconstruct the page, let the
|
||||
// caller know about the predecessor layer.
|
||||
if let Some(cont_lsn) = cont_lsn {
|
||||
if let Some(cont_layer) = &self.predecessor {
|
||||
Ok(PageReconstructResult::Continue(
|
||||
cont_lsn,
|
||||
Arc::clone(cont_layer),
|
||||
))
|
||||
} else {
|
||||
Ok(PageReconstructResult::Missing(cont_lsn))
|
||||
}
|
||||
} else {
|
||||
Ok(PageReconstructResult::Complete)
|
||||
}
|
||||
}
|
||||
|
||||
/// Get size of the relation at given LSN
|
||||
|
||||
@@ -10,6 +10,7 @@ use bytes::Bytes;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fmt;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use zenith_utils::lsn::Lsn;
|
||||
|
||||
@@ -82,6 +83,19 @@ pub struct PageReconstructData {
|
||||
pub page_img: Option<Bytes>,
|
||||
}
|
||||
|
||||
/// Return value from Layer::get_page_reconstruct_data
|
||||
pub enum PageReconstructResult {
|
||||
/// Got all the data needed to reconstruct the requested page
|
||||
Complete,
|
||||
/// This layer didn't contain all the required data, the caller should collect
|
||||
/// more data from the returned predecessor layer at the returned LSN.
|
||||
Continue(Lsn, Arc<dyn Layer>),
|
||||
/// This layer didn't contain data needed to reconstruct the page version at
|
||||
/// the returned LSN. This is usually considered an error, but might be OK
|
||||
/// in some circumstances.
|
||||
Missing(Lsn),
|
||||
}
|
||||
|
||||
///
|
||||
/// A Layer holds all page versions for one segment of a relish, in a range of LSNs.
|
||||
/// There are two kinds of layers, in-memory and snapshot layers. In-memory
|
||||
@@ -114,18 +128,21 @@ pub trait Layer: Send + Sync {
|
||||
/// It is up to the caller to collect more data from previous layer and
|
||||
/// perform WAL redo, if necessary.
|
||||
///
|
||||
/// If returns Some, the returned data is not complete. The caller needs
|
||||
/// to continue with the returned 'lsn'.
|
||||
///
|
||||
/// Note that the 'blknum' is the offset of the page from the beginning
|
||||
/// of the *relish*, not the beginning of the segment. The requested
|
||||
/// 'blknum' must be covered by this segment.
|
||||
///
|
||||
/// See PageReconstructResult for possible return values. The collected data
|
||||
/// is appended to reconstruct_data; the caller should pass an empty struct
|
||||
/// on first call. If this returns PageReconstructResult::Continue, call
|
||||
/// again on the returned predecessor layer with the same 'reconstruct_data'
|
||||
/// to collect more data.
|
||||
fn get_page_reconstruct_data(
|
||||
&self,
|
||||
blknum: u32,
|
||||
lsn: Lsn,
|
||||
reconstruct_data: &mut PageReconstructData,
|
||||
) -> Result<Option<Lsn>>;
|
||||
) -> Result<PageReconstructResult>;
|
||||
|
||||
/// Return size of the segment at given LSN. (Only for blocky relations.)
|
||||
fn get_seg_size(&self, lsn: Lsn) -> Result<u32>;
|
||||
|
||||
Reference in New Issue
Block a user