mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 22:12:56 +00:00
pageserver: introduce vectored Timeline::get interface (#6372)
1. Introduce a naive `Timeline::get_vectored` implementation The return type is intended to be flexible enough for various types of callers. We return the pages in a map keyed by `Key` such that the caller doesn't have to map back to the key if it needs to know it. Some callers can ignore errors for specific pages, so we return a separate `Result<Bytes, PageReconstructError>` for each page and an overarching `GetVectoredError` for API misuse. The overhead of the mapping will be small and bounded since we enforce a maximum key count for the operation. 2. Use the `get_vectored` API for SLRU segment reconstruction and image layer creation.
This commit is contained in:
@@ -104,6 +104,7 @@ pub struct KeySpaceAccum {
|
||||
accum: Option<Range<Key>>,
|
||||
|
||||
ranges: Vec<Range<Key>>,
|
||||
size: u64,
|
||||
}
|
||||
|
||||
impl KeySpaceAccum {
|
||||
@@ -111,6 +112,7 @@ impl KeySpaceAccum {
|
||||
Self {
|
||||
accum: None,
|
||||
ranges: Vec::new(),
|
||||
size: 0,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -121,6 +123,8 @@ impl KeySpaceAccum {
|
||||
|
||||
#[inline(always)]
|
||||
pub fn add_range(&mut self, range: Range<Key>) {
|
||||
self.size += key_range_size(&range) as u64;
|
||||
|
||||
match self.accum.as_mut() {
|
||||
Some(accum) => {
|
||||
if range.start == accum.end {
|
||||
@@ -146,6 +150,23 @@ impl KeySpaceAccum {
|
||||
ranges: self.ranges,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn consume_keyspace(&mut self) -> KeySpace {
|
||||
if let Some(accum) = self.accum.take() {
|
||||
self.ranges.push(accum);
|
||||
}
|
||||
|
||||
let mut prev_accum = KeySpaceAccum::new();
|
||||
std::mem::swap(self, &mut prev_accum);
|
||||
|
||||
KeySpace {
|
||||
ranges: prev_accum.ranges,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn size(&self) -> u64 {
|
||||
self.size
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
@@ -254,6 +275,30 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn keyspace_consume() {
|
||||
let ranges = vec![kr(0..10), kr(20..35), kr(40..45)];
|
||||
|
||||
let mut accum = KeySpaceAccum::new();
|
||||
for range in &ranges {
|
||||
accum.add_range(range.clone());
|
||||
}
|
||||
|
||||
let expected_size: u64 = ranges.iter().map(|r| key_range_size(r) as u64).sum();
|
||||
assert_eq!(accum.size(), expected_size);
|
||||
|
||||
assert_ks_eq(&accum.consume_keyspace(), ranges.clone());
|
||||
assert_eq!(accum.size(), 0);
|
||||
|
||||
assert_ks_eq(&accum.consume_keyspace(), vec![]);
|
||||
assert_eq!(accum.size(), 0);
|
||||
|
||||
for range in &ranges {
|
||||
accum.add_range(range.clone());
|
||||
}
|
||||
assert_ks_eq(&accum.to_keyspace(), ranges);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn keyspace_add_range() {
|
||||
// two separate ranges
|
||||
|
||||
@@ -111,7 +111,19 @@ impl RelTag {
|
||||
/// These files are divided into segments, which are divided into
|
||||
/// pages of the same BLCKSZ as used for relation files.
|
||||
///
|
||||
#[derive(Debug, Clone, Copy, Hash, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
|
||||
#[derive(
|
||||
Debug,
|
||||
Clone,
|
||||
Copy,
|
||||
Hash,
|
||||
Serialize,
|
||||
Deserialize,
|
||||
PartialEq,
|
||||
Eq,
|
||||
PartialOrd,
|
||||
Ord,
|
||||
strum_macros::EnumIter,
|
||||
)]
|
||||
pub enum SlruKind {
|
||||
Clog,
|
||||
MultiXactMembers,
|
||||
|
||||
@@ -11,8 +11,9 @@
|
||||
//! from data stored in object storage.
|
||||
//!
|
||||
use anyhow::{anyhow, bail, ensure, Context};
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use fail::fail_point;
|
||||
use pageserver_api::key::Key;
|
||||
use postgres_ffi::pg_constants;
|
||||
use std::fmt::Write as FmtWrite;
|
||||
use std::time::SystemTime;
|
||||
@@ -23,7 +24,7 @@ use tracing::*;
|
||||
use tokio_tar::{Builder, EntryType, Header};
|
||||
|
||||
use crate::context::RequestContext;
|
||||
use crate::pgdatadir_mapping::Version;
|
||||
use crate::pgdatadir_mapping::{key_to_slru_block, Version};
|
||||
use crate::tenant::Timeline;
|
||||
use pageserver_api::reltag::{RelTag, SlruKind};
|
||||
|
||||
@@ -133,6 +134,87 @@ where
|
||||
ctx: &'a RequestContext,
|
||||
}
|
||||
|
||||
/// A sink that accepts SLRU blocks ordered by key and forwards
|
||||
/// full segments to the archive.
|
||||
struct SlruSegmentsBuilder<'a, 'b, W>
|
||||
where
|
||||
W: AsyncWrite + Send + Sync + Unpin,
|
||||
{
|
||||
ar: &'a mut Builder<&'b mut W>,
|
||||
buf: Vec<u8>,
|
||||
current_segment: Option<(SlruKind, u32)>,
|
||||
}
|
||||
|
||||
impl<'a, 'b, W> SlruSegmentsBuilder<'a, 'b, W>
|
||||
where
|
||||
W: AsyncWrite + Send + Sync + Unpin,
|
||||
{
|
||||
fn new(ar: &'a mut Builder<&'b mut W>) -> Self {
|
||||
Self {
|
||||
ar,
|
||||
buf: Vec::new(),
|
||||
current_segment: None,
|
||||
}
|
||||
}
|
||||
|
||||
async fn add_block(&mut self, key: &Key, block: Bytes) -> anyhow::Result<()> {
|
||||
let (kind, segno, _) = key_to_slru_block(*key)?;
|
||||
|
||||
match kind {
|
||||
SlruKind::Clog => {
|
||||
ensure!(block.len() == BLCKSZ as usize || block.len() == BLCKSZ as usize + 8);
|
||||
}
|
||||
SlruKind::MultiXactMembers | SlruKind::MultiXactOffsets => {
|
||||
ensure!(block.len() == BLCKSZ as usize);
|
||||
}
|
||||
}
|
||||
|
||||
let segment = (kind, segno);
|
||||
match self.current_segment {
|
||||
None => {
|
||||
self.current_segment = Some(segment);
|
||||
self.buf
|
||||
.extend_from_slice(block.slice(..BLCKSZ as usize).as_ref());
|
||||
}
|
||||
Some(current_seg) if current_seg == segment => {
|
||||
self.buf
|
||||
.extend_from_slice(block.slice(..BLCKSZ as usize).as_ref());
|
||||
}
|
||||
Some(_) => {
|
||||
self.flush().await?;
|
||||
|
||||
self.current_segment = Some(segment);
|
||||
self.buf
|
||||
.extend_from_slice(block.slice(..BLCKSZ as usize).as_ref());
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn flush(&mut self) -> anyhow::Result<()> {
|
||||
let nblocks = self.buf.len() / BLCKSZ as usize;
|
||||
let (kind, segno) = self.current_segment.take().unwrap();
|
||||
let segname = format!("{}/{:>04X}", kind.to_str(), segno);
|
||||
let header = new_tar_header(&segname, self.buf.len() as u64)?;
|
||||
self.ar.append(&header, self.buf.as_slice()).await?;
|
||||
|
||||
trace!("Added to basebackup slru {} relsize {}", segname, nblocks);
|
||||
|
||||
self.buf.clear();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn finish(mut self) -> anyhow::Result<()> {
|
||||
if self.current_segment.is_none() || self.buf.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.flush().await
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, W> Basebackup<'a, W>
|
||||
where
|
||||
W: AsyncWrite + Send + Sync + Unpin,
|
||||
@@ -168,20 +250,27 @@ where
|
||||
}
|
||||
|
||||
// Gather non-relational files from object storage pages.
|
||||
for kind in [
|
||||
SlruKind::Clog,
|
||||
SlruKind::MultiXactOffsets,
|
||||
SlruKind::MultiXactMembers,
|
||||
] {
|
||||
for segno in self
|
||||
let slru_partitions = self
|
||||
.timeline
|
||||
.get_slru_keyspace(Version::Lsn(self.lsn), self.ctx)
|
||||
.await?
|
||||
.partition(Timeline::MAX_GET_VECTORED_KEYS * BLCKSZ as u64);
|
||||
|
||||
let mut slru_builder = SlruSegmentsBuilder::new(&mut self.ar);
|
||||
|
||||
for part in slru_partitions.parts {
|
||||
let blocks = self
|
||||
.timeline
|
||||
.list_slru_segments(kind, Version::Lsn(self.lsn), self.ctx)
|
||||
.await?
|
||||
{
|
||||
self.add_slru_segment(kind, segno).await?;
|
||||
.get_vectored(&part.ranges, self.lsn, self.ctx)
|
||||
.await?;
|
||||
|
||||
for (key, block) in blocks {
|
||||
slru_builder.add_block(&key, block?).await?;
|
||||
}
|
||||
}
|
||||
|
||||
slru_builder.finish().await?;
|
||||
|
||||
let mut min_restart_lsn: Lsn = Lsn::MAX;
|
||||
// Create tablespace directories
|
||||
for ((spcnode, dbnode), has_relmap_file) in
|
||||
@@ -305,39 +394,6 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
//
|
||||
// Generate SLRU segment files from repository.
|
||||
//
|
||||
async fn add_slru_segment(&mut self, slru: SlruKind, segno: u32) -> anyhow::Result<()> {
|
||||
let nblocks = self
|
||||
.timeline
|
||||
.get_slru_segment_size(slru, segno, Version::Lsn(self.lsn), self.ctx)
|
||||
.await?;
|
||||
|
||||
let mut slru_buf: Vec<u8> = Vec::with_capacity(nblocks as usize * BLCKSZ as usize);
|
||||
for blknum in 0..nblocks {
|
||||
let img = self
|
||||
.timeline
|
||||
.get_slru_page_at_lsn(slru, segno, blknum, self.lsn, self.ctx)
|
||||
.await?;
|
||||
|
||||
if slru == SlruKind::Clog {
|
||||
ensure!(img.len() == BLCKSZ as usize || img.len() == BLCKSZ as usize + 8);
|
||||
} else {
|
||||
ensure!(img.len() == BLCKSZ as usize);
|
||||
}
|
||||
|
||||
slru_buf.extend_from_slice(&img[..BLCKSZ as usize]);
|
||||
}
|
||||
|
||||
let segname = format!("{}/{:>04X}", slru.to_str(), segno);
|
||||
let header = new_tar_header(&segname, slru_buf.len() as u64)?;
|
||||
self.ar.append(&header, slru_buf.as_slice()).await?;
|
||||
|
||||
trace!("Added to basebackup slru {} relsize {}", segname, nblocks);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
//
|
||||
// Include database/tablespace directories.
|
||||
//
|
||||
|
||||
@@ -27,6 +27,7 @@ use serde::{Deserialize, Serialize};
|
||||
use std::collections::{hash_map, HashMap, HashSet};
|
||||
use std::ops::ControlFlow;
|
||||
use std::ops::Range;
|
||||
use strum::IntoEnumIterator;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, trace, warn};
|
||||
use utils::bin_ser::DeserializeError;
|
||||
@@ -533,6 +534,33 @@ impl Timeline {
|
||||
Ok(Default::default())
|
||||
}
|
||||
|
||||
pub(crate) async fn get_slru_keyspace(
|
||||
&self,
|
||||
version: Version<'_>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<KeySpace, PageReconstructError> {
|
||||
let mut accum = KeySpaceAccum::new();
|
||||
|
||||
for kind in SlruKind::iter() {
|
||||
let mut segments: Vec<u32> = self
|
||||
.list_slru_segments(kind, version, ctx)
|
||||
.await?
|
||||
.into_iter()
|
||||
.collect();
|
||||
segments.sort_unstable();
|
||||
|
||||
for seg in segments {
|
||||
let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?;
|
||||
|
||||
accum.add_range(
|
||||
slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(accum.to_keyspace())
|
||||
}
|
||||
|
||||
/// Get a list of SLRU segments
|
||||
pub(crate) async fn list_slru_segments(
|
||||
&self,
|
||||
|
||||
@@ -283,15 +283,15 @@ impl LayerMap {
|
||||
///
|
||||
/// This is used for garbage collection, to determine if an old layer can
|
||||
/// be deleted.
|
||||
pub fn image_layer_exists(&self, key: &Range<Key>, lsn: &Range<Lsn>) -> Result<bool> {
|
||||
pub fn image_layer_exists(&self, key: &Range<Key>, lsn: &Range<Lsn>) -> bool {
|
||||
if key.is_empty() {
|
||||
// Vacuously true. There's a newer image for all 0 of the kerys in the range.
|
||||
return Ok(true);
|
||||
return true;
|
||||
}
|
||||
|
||||
let version = match self.historic.get().unwrap().get_version(lsn.end.0 - 1) {
|
||||
Some(v) => v,
|
||||
None => return Ok(false),
|
||||
None => return false,
|
||||
};
|
||||
|
||||
let start = key.start.to_i128();
|
||||
@@ -304,17 +304,17 @@ impl LayerMap {
|
||||
|
||||
// Check the start is covered
|
||||
if !layer_covers(version.image_coverage.query(start)) {
|
||||
return Ok(false);
|
||||
return false;
|
||||
}
|
||||
|
||||
// Check after all changes of coverage
|
||||
for (_, change_val) in version.image_coverage.range(start..end) {
|
||||
if !layer_covers(change_val) {
|
||||
return Ok(false);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
true
|
||||
}
|
||||
|
||||
pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<PersistentLayerDesc>> {
|
||||
@@ -325,18 +325,14 @@ impl LayerMap {
|
||||
/// Divide the whole given range of keys into sub-ranges based on the latest
|
||||
/// image layer that covers each range at the specified lsn (inclusive).
|
||||
/// This is used when creating new image layers.
|
||||
///
|
||||
// FIXME: clippy complains that the result type is very complex. She's probably
|
||||
// right...
|
||||
#[allow(clippy::type_complexity)]
|
||||
pub fn image_coverage(
|
||||
&self,
|
||||
key_range: &Range<Key>,
|
||||
lsn: Lsn,
|
||||
) -> Result<Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)>> {
|
||||
) -> Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)> {
|
||||
let version = match self.historic.get().unwrap().get_version(lsn.0) {
|
||||
Some(v) => v,
|
||||
None => return Ok(vec![]),
|
||||
None => return vec![],
|
||||
};
|
||||
|
||||
let start = key_range.start.to_i128();
|
||||
@@ -359,7 +355,7 @@ impl LayerMap {
|
||||
let kr = Key::from_i128(current_key)..Key::from_i128(end);
|
||||
coverage.push((kr, current_val.take()));
|
||||
|
||||
Ok(coverage)
|
||||
coverage
|
||||
}
|
||||
|
||||
pub fn is_l0(layer: &PersistentLayerDesc) -> bool {
|
||||
@@ -410,24 +406,19 @@ impl LayerMap {
|
||||
/// This number is used to compute the largest number of deltas that
|
||||
/// we'll need to visit for any page reconstruction in this region.
|
||||
/// We use this heuristic to decide whether to create an image layer.
|
||||
pub fn count_deltas(
|
||||
&self,
|
||||
key: &Range<Key>,
|
||||
lsn: &Range<Lsn>,
|
||||
limit: Option<usize>,
|
||||
) -> Result<usize> {
|
||||
pub fn count_deltas(&self, key: &Range<Key>, lsn: &Range<Lsn>, limit: Option<usize>) -> usize {
|
||||
// We get the delta coverage of the region, and for each part of the coverage
|
||||
// we recurse right underneath the delta. The recursion depth is limited by
|
||||
// the largest result this function could return, which is in practice between
|
||||
// 3 and 10 (since we usually try to create an image when the number gets larger).
|
||||
|
||||
if lsn.is_empty() || key.is_empty() || limit == Some(0) {
|
||||
return Ok(0);
|
||||
return 0;
|
||||
}
|
||||
|
||||
let version = match self.historic.get().unwrap().get_version(lsn.end.0 - 1) {
|
||||
Some(v) => v,
|
||||
None => return Ok(0),
|
||||
None => return 0,
|
||||
};
|
||||
|
||||
let start = key.start.to_i128();
|
||||
@@ -448,8 +439,7 @@ impl LayerMap {
|
||||
if !kr.is_empty() {
|
||||
let base_count = Self::is_reimage_worthy(&val, key) as usize;
|
||||
let new_limit = limit.map(|l| l - base_count);
|
||||
let max_stacked_deltas_underneath =
|
||||
self.count_deltas(&kr, &lr, new_limit)?;
|
||||
let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit);
|
||||
max_stacked_deltas = std::cmp::max(
|
||||
max_stacked_deltas,
|
||||
base_count + max_stacked_deltas_underneath,
|
||||
@@ -471,7 +461,7 @@ impl LayerMap {
|
||||
if !kr.is_empty() {
|
||||
let base_count = Self::is_reimage_worthy(&val, key) as usize;
|
||||
let new_limit = limit.map(|l| l - base_count);
|
||||
let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit)?;
|
||||
let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit);
|
||||
max_stacked_deltas = std::cmp::max(
|
||||
max_stacked_deltas,
|
||||
base_count + max_stacked_deltas_underneath,
|
||||
@@ -480,7 +470,7 @@ impl LayerMap {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(max_stacked_deltas)
|
||||
max_stacked_deltas
|
||||
}
|
||||
|
||||
/// Count how many reimage-worthy layers we need to visit for given key-lsn pair.
|
||||
@@ -592,10 +582,7 @@ impl LayerMap {
|
||||
if limit == Some(difficulty) {
|
||||
break;
|
||||
}
|
||||
for (img_range, last_img) in self
|
||||
.image_coverage(range, lsn)
|
||||
.expect("why would this err?")
|
||||
{
|
||||
for (img_range, last_img) in self.image_coverage(range, lsn) {
|
||||
if limit == Some(difficulty) {
|
||||
break;
|
||||
}
|
||||
@@ -606,9 +593,7 @@ impl LayerMap {
|
||||
};
|
||||
|
||||
if img_lsn < lsn {
|
||||
let num_deltas = self
|
||||
.count_deltas(&img_range, &(img_lsn..lsn), limit)
|
||||
.expect("why would this err lol?");
|
||||
let num_deltas = self.count_deltas(&img_range, &(img_lsn..lsn), limit);
|
||||
difficulty = std::cmp::max(difficulty, num_deltas);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@ use enumset::EnumSet;
|
||||
use fail::fail_point;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::{
|
||||
keyspace::{key_range_size, KeySpaceAccum},
|
||||
models::{
|
||||
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy,
|
||||
LayerMapInfo, TimelineState,
|
||||
@@ -32,7 +33,7 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::sync::gate::Gate;
|
||||
|
||||
use std::collections::{BinaryHeap, HashMap, HashSet};
|
||||
use std::collections::{BTreeMap, BinaryHeap, HashMap, HashSet};
|
||||
use std::ops::{Deref, Range};
|
||||
use std::pin::pin;
|
||||
use std::sync::atomic::Ordering as AtomicOrdering;
|
||||
@@ -404,6 +405,21 @@ pub(crate) enum PageReconstructError {
|
||||
WalRedo(anyhow::Error),
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
enum CreateImageLayersError {
|
||||
#[error("timeline shutting down")]
|
||||
Cancelled,
|
||||
|
||||
#[error(transparent)]
|
||||
GetVectoredError(GetVectoredError),
|
||||
|
||||
#[error(transparent)]
|
||||
PageReconstructError(PageReconstructError),
|
||||
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
enum FlushLayerError {
|
||||
/// Timeline cancellation token was cancelled
|
||||
@@ -411,12 +427,24 @@ enum FlushLayerError {
|
||||
Cancelled,
|
||||
|
||||
#[error(transparent)]
|
||||
PageReconstructError(#[from] PageReconstructError),
|
||||
CreateImageLayersError(CreateImageLayersError),
|
||||
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum GetVectoredError {
|
||||
#[error("timeline shutting down")]
|
||||
Cancelled,
|
||||
|
||||
#[error("Requested too many keys: {0} > {}", Timeline::MAX_GET_VECTORED_KEYS)]
|
||||
Oversized(u64),
|
||||
|
||||
#[error("Requested at invalid LSN: {0}")]
|
||||
InvalidLsn(Lsn),
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
pub enum LogicalSizeCalculationCause {
|
||||
Initial,
|
||||
@@ -456,6 +484,45 @@ pub(crate) enum WaitLsnError {
|
||||
Timeout(String),
|
||||
}
|
||||
|
||||
// The impls below achieve cancellation mapping for errors.
|
||||
// Perhaps there's a way of achieving this with less cruft.
|
||||
|
||||
impl From<CreateImageLayersError> for CompactionError {
|
||||
fn from(e: CreateImageLayersError) -> Self {
|
||||
match e {
|
||||
CreateImageLayersError::Cancelled => CompactionError::ShuttingDown,
|
||||
_ => CompactionError::Other(e.into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<CreateImageLayersError> for FlushLayerError {
|
||||
fn from(e: CreateImageLayersError) -> Self {
|
||||
match e {
|
||||
CreateImageLayersError::Cancelled => FlushLayerError::Cancelled,
|
||||
any => FlushLayerError::CreateImageLayersError(any),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<PageReconstructError> for CreateImageLayersError {
|
||||
fn from(e: PageReconstructError) -> Self {
|
||||
match e {
|
||||
PageReconstructError::Cancelled => CreateImageLayersError::Cancelled,
|
||||
_ => CreateImageLayersError::PageReconstructError(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetVectoredError> for CreateImageLayersError {
|
||||
fn from(e: GetVectoredError) -> Self {
|
||||
match e {
|
||||
GetVectoredError::Cancelled => CreateImageLayersError::Cancelled,
|
||||
_ => CreateImageLayersError::GetVectoredError(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Public interface functions
|
||||
impl Timeline {
|
||||
/// Get the LSN where this branch was created
|
||||
@@ -575,6 +642,53 @@ impl Timeline {
|
||||
res
|
||||
}
|
||||
|
||||
pub(crate) const MAX_GET_VECTORED_KEYS: u64 = 32;
|
||||
|
||||
/// Look up multiple page versions at a given LSN
|
||||
///
|
||||
/// This naive implementation will be replaced with a more efficient one
|
||||
/// which actually vectorizes the read path.
|
||||
pub(crate) async fn get_vectored(
|
||||
&self,
|
||||
key_ranges: &[Range<Key>],
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
|
||||
if !lsn.is_valid() {
|
||||
return Err(GetVectoredError::InvalidLsn(lsn));
|
||||
}
|
||||
|
||||
let key_count = key_ranges
|
||||
.iter()
|
||||
.map(|range| key_range_size(range) as u64)
|
||||
.sum();
|
||||
if key_count > Timeline::MAX_GET_VECTORED_KEYS {
|
||||
return Err(GetVectoredError::Oversized(key_count));
|
||||
}
|
||||
|
||||
let mut values = BTreeMap::new();
|
||||
for range in key_ranges {
|
||||
let mut key = range.start;
|
||||
while key != range.end {
|
||||
assert!(!self.shard_identity.is_key_disposable(&key));
|
||||
|
||||
let block = self.get(key, lsn, ctx).await;
|
||||
|
||||
if matches!(
|
||||
block,
|
||||
Err(PageReconstructError::Cancelled | PageReconstructError::AncestorStopping(_))
|
||||
) {
|
||||
return Err(GetVectoredError::Cancelled);
|
||||
}
|
||||
|
||||
values.insert(key, block);
|
||||
key = key.next();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(values)
|
||||
}
|
||||
|
||||
/// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
|
||||
pub fn get_last_record_lsn(&self) -> Lsn {
|
||||
self.last_record_lsn.load().last
|
||||
@@ -2582,7 +2696,7 @@ impl Timeline {
|
||||
return;
|
||||
}
|
||||
err @ Err(
|
||||
FlushLayerError::Other(_) | FlushLayerError::PageReconstructError(_),
|
||||
FlushLayerError::Other(_) | FlushLayerError::CreateImageLayersError(_),
|
||||
) => {
|
||||
error!("could not flush frozen layer: {err:?}");
|
||||
break err;
|
||||
@@ -2950,11 +3064,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// Is it time to create a new image layer for the given partition?
|
||||
async fn time_for_new_image_layer(
|
||||
&self,
|
||||
partition: &KeySpace,
|
||||
lsn: Lsn,
|
||||
) -> anyhow::Result<bool> {
|
||||
async fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> bool {
|
||||
let threshold = self.get_image_creation_threshold();
|
||||
|
||||
let guard = self.layers.read().await;
|
||||
@@ -2974,20 +3084,20 @@ impl Timeline {
|
||||
// but the range is already covered by image layers at more recent LSNs. Before we
|
||||
// create a new image layer, check if the range is already covered at more recent LSNs.
|
||||
if !layers
|
||||
.image_layer_exists(&img_range, &(Lsn::min(lsn, *cutoff_lsn)..lsn + 1))?
|
||||
.image_layer_exists(&img_range, &(Lsn::min(lsn, *cutoff_lsn)..lsn + 1))
|
||||
{
|
||||
debug!(
|
||||
"Force generation of layer {}-{} wanted by GC, cutoff={}, lsn={})",
|
||||
img_range.start, img_range.end, cutoff_lsn, lsn
|
||||
);
|
||||
return Ok(true);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for part_range in &partition.ranges {
|
||||
let image_coverage = layers.image_coverage(part_range, lsn)?;
|
||||
let image_coverage = layers.image_coverage(part_range, lsn);
|
||||
for (img_range, last_img) in image_coverage {
|
||||
let img_lsn = if let Some(last_img) = last_img {
|
||||
last_img.get_lsn_range().end
|
||||
@@ -3008,7 +3118,7 @@ impl Timeline {
|
||||
// after we read last_record_lsn, which is passed here in the 'lsn' argument.
|
||||
if img_lsn < lsn {
|
||||
let num_deltas =
|
||||
layers.count_deltas(&img_range, &(img_lsn..lsn), Some(threshold))?;
|
||||
layers.count_deltas(&img_range, &(img_lsn..lsn), Some(threshold));
|
||||
|
||||
max_deltas = max_deltas.max(num_deltas);
|
||||
if num_deltas >= threshold {
|
||||
@@ -3016,7 +3126,7 @@ impl Timeline {
|
||||
"key range {}-{}, has {} deltas on this timeline in LSN range {}..{}",
|
||||
img_range.start, img_range.end, num_deltas, img_lsn, lsn
|
||||
);
|
||||
return Ok(true);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3026,7 +3136,7 @@ impl Timeline {
|
||||
max_deltas,
|
||||
"none of the partitioned ranges had >= {threshold} deltas"
|
||||
);
|
||||
Ok(false)
|
||||
false
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all, fields(%lsn, %force))]
|
||||
@@ -3036,7 +3146,7 @@ impl Timeline {
|
||||
lsn: Lsn,
|
||||
force: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Vec<ResidentLayer>, PageReconstructError> {
|
||||
) -> Result<Vec<ResidentLayer>, CreateImageLayersError> {
|
||||
let timer = self.metrics.create_images_time_histo.start_timer();
|
||||
let mut image_layers = Vec::new();
|
||||
|
||||
@@ -3054,7 +3164,7 @@ impl Timeline {
|
||||
for partition in partitioning.parts.iter() {
|
||||
let img_range = start..partition.ranges.last().unwrap().end;
|
||||
start = img_range.end;
|
||||
if force || self.time_for_new_image_layer(partition, lsn).await? {
|
||||
if force || self.time_for_new_image_layer(partition, lsn).await {
|
||||
let mut image_layer_writer = ImageLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
@@ -3065,10 +3175,12 @@ impl Timeline {
|
||||
.await?;
|
||||
|
||||
fail_point!("image-layer-writer-fail-before-finish", |_| {
|
||||
Err(PageReconstructError::Other(anyhow::anyhow!(
|
||||
Err(CreateImageLayersError::Other(anyhow::anyhow!(
|
||||
"failpoint image-layer-writer-fail-before-finish"
|
||||
)))
|
||||
});
|
||||
|
||||
let mut key_request_accum = KeySpaceAccum::new();
|
||||
for range in &partition.ranges {
|
||||
let mut key = range.start;
|
||||
while key < range.end {
|
||||
@@ -3081,34 +3193,55 @@ impl Timeline {
|
||||
key = key.next();
|
||||
continue;
|
||||
}
|
||||
let img = match self.get(key, lsn, ctx).await {
|
||||
Ok(img) => img,
|
||||
Err(err) => {
|
||||
// If we fail to reconstruct a VM or FSM page, we can zero the
|
||||
// page without losing any actual user data. That seems better
|
||||
// than failing repeatedly and getting stuck.
|
||||
//
|
||||
// We had a bug at one point, where we truncated the FSM and VM
|
||||
// in the pageserver, but the Postgres didn't know about that
|
||||
// and continued to generate incremental WAL records for pages
|
||||
// that didn't exist in the pageserver. Trying to replay those
|
||||
// WAL records failed to find the previous image of the page.
|
||||
// This special case allows us to recover from that situation.
|
||||
// See https://github.com/neondatabase/neon/issues/2601.
|
||||
//
|
||||
// Unfortunately we cannot do this for the main fork, or for
|
||||
// any metadata keys, keys, as that would lead to actual data
|
||||
// loss.
|
||||
if is_rel_fsm_block_key(key) || is_rel_vm_block_key(key) {
|
||||
warn!("could not reconstruct FSM or VM key {key}, filling with zeros: {err:?}");
|
||||
ZERO_PAGE.clone()
|
||||
} else {
|
||||
return Err(err);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
image_layer_writer.put_image(key, &img).await?;
|
||||
key_request_accum.add_key(key);
|
||||
if key_request_accum.size() >= Timeline::MAX_GET_VECTORED_KEYS
|
||||
|| key.next() == range.end
|
||||
{
|
||||
let results = self
|
||||
.get_vectored(
|
||||
&key_request_accum.consume_keyspace().ranges,
|
||||
lsn,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
for (img_key, img) in results {
|
||||
let img = match img {
|
||||
Ok(img) => img,
|
||||
Err(err) => {
|
||||
// If we fail to reconstruct a VM or FSM page, we can zero the
|
||||
// page without losing any actual user data. That seems better
|
||||
// than failing repeatedly and getting stuck.
|
||||
//
|
||||
// We had a bug at one point, where we truncated the FSM and VM
|
||||
// in the pageserver, but the Postgres didn't know about that
|
||||
// and continued to generate incremental WAL records for pages
|
||||
// that didn't exist in the pageserver. Trying to replay those
|
||||
// WAL records failed to find the previous image of the page.
|
||||
// This special case allows us to recover from that situation.
|
||||
// See https://github.com/neondatabase/neon/issues/2601.
|
||||
//
|
||||
// Unfortunately we cannot do this for the main fork, or for
|
||||
// any metadata keys, keys, as that would lead to actual data
|
||||
// loss.
|
||||
if is_rel_fsm_block_key(img_key)
|
||||
|| is_rel_vm_block_key(img_key)
|
||||
{
|
||||
warn!("could not reconstruct FSM or VM key {img_key}, filling with zeros: {err:?}");
|
||||
ZERO_PAGE.clone()
|
||||
} else {
|
||||
return Err(
|
||||
CreateImageLayersError::PageReconstructError(err),
|
||||
);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
image_layer_writer.put_image(img_key, &img).await?;
|
||||
}
|
||||
}
|
||||
|
||||
key = key.next();
|
||||
}
|
||||
}
|
||||
@@ -3484,7 +3617,7 @@ impl Timeline {
|
||||
// has not so much sense, because largest holes will corresponds field1/field2 changes.
|
||||
// But we are mostly interested to eliminate holes which cause generation of excessive image layers.
|
||||
// That is why it is better to measure size of hole as number of covering image layers.
|
||||
let coverage_size = layers.image_coverage(&key_range, last_record_lsn)?.len();
|
||||
let coverage_size = layers.image_coverage(&key_range, last_record_lsn).len();
|
||||
if coverage_size >= min_hole_coverage_size {
|
||||
heap.push(Hole {
|
||||
key_range,
|
||||
@@ -4110,7 +4243,7 @@ impl Timeline {
|
||||
// we cannot remove C, even though it's older than 2500, because
|
||||
// the delta layer 2000-3000 depends on it.
|
||||
if !layers
|
||||
.image_layer_exists(&l.get_key_range(), &(l.get_lsn_range().end..new_gc_cutoff))?
|
||||
.image_layer_exists(&l.get_key_range(), &(l.get_lsn_range().end..new_gc_cutoff))
|
||||
{
|
||||
debug!("keeping {} because it is the latest layer", l.filename());
|
||||
// Collect delta key ranges that need image layers to allow garbage
|
||||
|
||||
Reference in New Issue
Block a user