Compare commits

...

2 Commits

Author SHA1 Message Date
Heikki Linnakangas
bba997ee00 Refactor LayerMap::search to also search open and frozen layers.
This allows refactoring in get_reconstruct_data() which is needed by
PR #3228. That PR turns some functions async, and you cannot hold a
RwLock over 'await'. And it's enough to "drop(guard)" the lock guard,
it has to actually go out-of-scope to placate the compiler. This
refactoring allows dropping the lock on 'layers' at end of scope.
2023-01-03 22:59:18 +02:00
Heikki Linnakangas
70a0cce609 Refactor send_tarball()
The Basebackup struct is really just a convenient place to carry the
various parameters around in send_tarball and its subroutines. Make it
internal to the send_tarball function.
2023-01-03 20:47:38 +02:00
5 changed files with 221 additions and 164 deletions

View File

@@ -10,11 +10,10 @@
//! This module is responsible for creation of such tarball
//! from data stored in object storage.
//!
use anyhow::{anyhow, bail, ensure, Context, Result};
use anyhow::{anyhow, bail, ensure, Context};
use bytes::{BufMut, BytesMut};
use fail::fail_point;
use std::fmt::Write as FmtWrite;
use std::sync::Arc;
use std::time::SystemTime;
use tokio::io;
use tokio::io::AsyncWrite;
@@ -39,16 +38,87 @@ use postgres_ffi::PG_TLI;
use postgres_ffi::{BLCKSZ, RELSEG_SIZE, WAL_SEGMENT_SIZE};
use utils::lsn::Lsn;
pub async fn send_basebackup_tarball<'a, W>(
write: &'a mut W,
timeline: &'a Timeline,
req_lsn: Option<Lsn>,
prev_lsn: Option<Lsn>,
full_backup: bool,
) -> anyhow::Result<()>
where
W: AsyncWrite + Send + Sync + Unpin,
{
// Compute postgres doesn't have any previous WAL files, but the first
// record that it's going to write needs to include the LSN of the
// previous record (xl_prev). We include prev_record_lsn in the
// "zenith.signal" file, so that postgres can read it during startup.
//
// We don't keep full history of record boundaries in the page server,
// however, only the predecessor of the latest record on each
// timeline. So we can only provide prev_record_lsn when you take a
// base backup at the end of the timeline, i.e. at last_record_lsn.
// Even at the end of the timeline, we sometimes don't have a valid
// prev_lsn value; that happens if the timeline was just branched from
// an old LSN and it doesn't have any WAL of its own yet. We will set
// prev_lsn to Lsn(0) if we cannot provide the correct value.
let (backup_prev, backup_lsn) = if let Some(req_lsn) = req_lsn {
// Backup was requested at a particular LSN. The caller should've
// already checked that it's a valid LSN.
// If the requested point is the end of the timeline, we can
// provide prev_lsn. (get_last_record_rlsn() might return it as
// zero, though, if no WAL has been generated on this timeline
// yet.)
let end_of_timeline = timeline.get_last_record_rlsn();
if req_lsn == end_of_timeline.last {
(end_of_timeline.prev, req_lsn)
} else {
(Lsn(0), req_lsn)
}
} else {
// Backup was requested at end of the timeline.
let end_of_timeline = timeline.get_last_record_rlsn();
(end_of_timeline.prev, end_of_timeline.last)
};
// Consolidate the derived and the provided prev_lsn values
let prev_lsn = if let Some(provided_prev_lsn) = prev_lsn {
if backup_prev != Lsn(0) {
ensure!(backup_prev == provided_prev_lsn);
}
provided_prev_lsn
} else {
backup_prev
};
info!(
"taking basebackup lsn={}, prev_lsn={} (full_backup={})",
backup_lsn, prev_lsn, full_backup
);
let basebackup = Basebackup {
ar: Builder::new_non_terminated(write),
timeline,
lsn: backup_lsn,
prev_record_lsn: prev_lsn,
full_backup,
};
basebackup
.send_tarball()
.instrument(info_span!("send_tarball", backup_lsn=%backup_lsn))
.await
}
/// This is short-living object only for the time of tarball creation,
/// created mostly to avoid passing a lot of parameters between various functions
/// used for constructing tarball.
pub struct Basebackup<'a, W>
struct Basebackup<'a, W>
where
W: AsyncWrite + Send + Sync + Unpin,
{
ar: Builder<&'a mut W>,
timeline: &'a Arc<Timeline>,
pub lsn: Lsn,
timeline: &'a Timeline,
lsn: Lsn,
prev_record_lsn: Lsn,
full_backup: bool,
}
@@ -65,88 +135,33 @@ impl<'a, W> Basebackup<'a, W>
where
W: AsyncWrite + Send + Sync + Unpin,
{
pub fn new(
write: &'a mut W,
timeline: &'a Arc<Timeline>,
req_lsn: Option<Lsn>,
prev_lsn: Option<Lsn>,
full_backup: bool,
) -> Result<Basebackup<'a, W>> {
// Compute postgres doesn't have any previous WAL files, but the first
// record that it's going to write needs to include the LSN of the
// previous record (xl_prev). We include prev_record_lsn in the
// "zenith.signal" file, so that postgres can read it during startup.
//
// We don't keep full history of record boundaries in the page server,
// however, only the predecessor of the latest record on each
// timeline. So we can only provide prev_record_lsn when you take a
// base backup at the end of the timeline, i.e. at last_record_lsn.
// Even at the end of the timeline, we sometimes don't have a valid
// prev_lsn value; that happens if the timeline was just branched from
// an old LSN and it doesn't have any WAL of its own yet. We will set
// prev_lsn to Lsn(0) if we cannot provide the correct value.
let (backup_prev, backup_lsn) = if let Some(req_lsn) = req_lsn {
// Backup was requested at a particular LSN. The caller should've
// already checked that it's a valid LSN.
// If the requested point is the end of the timeline, we can
// provide prev_lsn. (get_last_record_rlsn() might return it as
// zero, though, if no WAL has been generated on this timeline
// yet.)
let end_of_timeline = timeline.get_last_record_rlsn();
if req_lsn == end_of_timeline.last {
(end_of_timeline.prev, req_lsn)
} else {
(Lsn(0), req_lsn)
}
} else {
// Backup was requested at end of the timeline.
let end_of_timeline = timeline.get_last_record_rlsn();
(end_of_timeline.prev, end_of_timeline.last)
};
// Consolidate the derived and the provided prev_lsn values
let prev_lsn = if let Some(provided_prev_lsn) = prev_lsn {
if backup_prev != Lsn(0) {
ensure!(backup_prev == provided_prev_lsn)
}
provided_prev_lsn
} else {
backup_prev
};
info!(
"taking basebackup lsn={}, prev_lsn={} (full_backup={})",
backup_lsn, prev_lsn, full_backup
);
Ok(Basebackup {
ar: Builder::new_non_terminated(write),
timeline,
lsn: backup_lsn,
prev_record_lsn: prev_lsn,
full_backup,
})
}
pub async fn send_tarball(mut self) -> anyhow::Result<()> {
async fn send_tarball(mut self) -> anyhow::Result<()> {
// TODO include checksum
// Create pgdata subdirs structure
for dir in PGDATA_SUBDIRS.iter() {
let header = new_tar_header_dir(dir)?;
self.ar.append(&header, &mut io::empty()).await?;
self.ar
.append(&header, &mut io::empty())
.await
.context("could not add directory to basebackup tarball")?;
}
// Send empty config files.
// Send config files.
for filepath in PGDATA_SPECIAL_FILES.iter() {
if *filepath == "pg_hba.conf" {
let data = PG_HBA.as_bytes();
let header = new_tar_header(filepath, data.len() as u64)?;
self.ar.append(&header, data).await?;
self.ar
.append(&header, data)
.await
.context("could not add config file to basebackup tarball")?;
} else {
let header = new_tar_header(filepath, 0)?;
self.ar.append(&header, &mut io::empty()).await?;
self.ar
.append(&header, &mut io::empty())
.await
.context("could not add config file to basebackup tarball")?;
}
}

View File

@@ -641,10 +641,8 @@ impl PageServerHandler {
/* Send a tarball of the latest layer on the timeline */
{
let mut writer = pgb.copyout_writer();
let basebackup =
basebackup::Basebackup::new(&mut writer, &timeline, lsn, prev_lsn, full_backup)?;
tracing::Span::current().record("lsn", basebackup.lsn.to_string().as_str());
basebackup.send_tarball().await?;
basebackup::send_basebackup_tarball(&mut writer, &timeline, lsn, prev_lsn, full_backup)
.await?;
}
pgb.write_message(&BeMessage::CopyDone)?;

View File

@@ -26,7 +26,7 @@ use std::sync::Arc;
use tracing::*;
use utils::lsn::Lsn;
use super::storage_layer::{InMemoryLayer, Layer};
use super::storage_layer::{InMemoryLayer, InMemoryOrHistoricLayer, Layer};
///
/// LayerMap tracks what layers exist on a timeline.
@@ -241,7 +241,8 @@ where
/// Return value of LayerMap::search
pub struct SearchResult<L: ?Sized> {
pub layer: Arc<L>,
// FIXME: I wish this could be Arc<dyn Layer>. But I couldn't make that work.
pub layer: InMemoryOrHistoricLayer<L>,
pub lsn_floor: Lsn,
}
@@ -261,6 +262,30 @@ where
/// layer.
///
pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult<L>> {
// First check if an open or frozen layer matches
if let Some(open_layer) = &self.open_layer {
let start_lsn = open_layer.get_lsn_range().start;
if end_lsn > start_lsn {
return Some(SearchResult {
layer: InMemoryOrHistoricLayer::InMemory(Arc::clone(open_layer)),
lsn_floor: start_lsn,
});
}
}
for frozen_layer in self.frozen_layers.iter().rev() {
let start_lsn = frozen_layer.get_lsn_range().start;
if end_lsn > start_lsn {
return Some(SearchResult {
layer: InMemoryOrHistoricLayer::InMemory(Arc::clone(frozen_layer)),
lsn_floor: start_lsn,
});
}
}
self.search_historic(key, end_lsn)
}
fn search_historic(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult<L>> {
// linear search
// Find the latest image layer that covers the given key
let mut latest_img: Option<Arc<L>> = None;
@@ -286,7 +311,7 @@ where
if Lsn(img_lsn.0 + 1) == end_lsn {
// found exact match
return Some(SearchResult {
layer: Arc::clone(l),
layer: InMemoryOrHistoricLayer::Historic(Arc::clone(l)),
lsn_floor: img_lsn,
});
}
@@ -349,13 +374,13 @@ where
);
Some(SearchResult {
lsn_floor,
layer: l,
layer: InMemoryOrHistoricLayer::Historic(l),
})
} else if let Some(l) = latest_img {
trace!("found img layer and no deltas for request on {key} at {end_lsn}");
Some(SearchResult {
lsn_floor: latest_img_lsn.unwrap(),
layer: l,
layer: InMemoryOrHistoricLayer::Historic(l),
})
} else {
trace!("no layer found for request on {key} at {end_lsn}");

View File

@@ -196,3 +196,38 @@ pub fn downcast_remote_layer(
None
}
}
pub enum InMemoryOrHistoricLayer<L: ?Sized> {
InMemory(Arc<InMemoryLayer>),
Historic(Arc<L>),
}
impl<L: ?Sized> InMemoryOrHistoricLayer<L>
where
L: PersistentLayer,
{
pub fn downcast_remote_layer(&self) -> Option<std::sync::Arc<RemoteLayer>> {
match self {
Self::InMemory(_) => None,
Self::Historic(l) => {
if l.is_remote_layer() {
Arc::clone(l).downcast_remote_layer()
} else {
None
}
}
}
}
pub fn get_value_reconstruct_data(
&self,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_data: &mut ValueReconstructState,
) -> Result<ValueReconstructResult> {
match self {
Self::InMemory(l) => l.get_value_reconstruct_data(key, lsn_range, reconstruct_data),
Self::Historic(l) => l.get_value_reconstruct_data(key, lsn_range, reconstruct_data),
}
}
}

View File

@@ -25,8 +25,8 @@ use std::time::{Duration, Instant, SystemTime};
use crate::tenant::remote_timeline_client::{self, index::LayerFileMetadata};
use crate::tenant::storage_layer::{
DeltaFileName, DeltaLayerWriter, ImageFileName, ImageLayerWriter, InMemoryLayer, LayerFileName,
RemoteLayer,
DeltaFileName, DeltaLayerWriter, ImageFileName, ImageLayerWriter, InMemoryLayer,
InMemoryOrHistoricLayer, LayerFileName, RemoteLayer,
};
use crate::tenant::{
ephemeral_file::is_ephemeral_file,
@@ -1591,7 +1591,7 @@ trait TraversalLayerExt {
fn traversal_id(&self) -> TraversalId;
}
impl TraversalLayerExt for Arc<dyn PersistentLayer> {
impl<T: PersistentLayer + ?Sized> TraversalLayerExt for T {
fn traversal_id(&self) -> TraversalId {
match self.local_path() {
Some(local_path) => {
@@ -1621,6 +1621,15 @@ impl TraversalLayerExt for Arc<InMemoryLayer> {
}
}
impl TraversalLayerExt for InMemoryOrHistoricLayer<dyn PersistentLayer> {
fn traversal_id(&self) -> String {
match self {
Self::InMemory(l) => l.traversal_id(),
Self::Historic(l) => l.traversal_id(),
}
}
}
impl Timeline {
///
/// Get a handle to a Layer for reading.
@@ -1642,8 +1651,11 @@ impl Timeline {
// For debugging purposes, collect the path of layers that we traversed
// through. It's included in the error message if we fail to find the key.
let mut traversal_path =
Vec::<(ValueReconstructResult, Lsn, Box<dyn TraversalLayerExt>)>::new();
let mut traversal_path = Vec::<(
ValueReconstructResult,
Lsn,
Box<dyn TraversalLayerExt>,
)>::new();
let cached_lsn = if let Some((cached_lsn, _)) = &reconstruct_state.img {
*cached_lsn
@@ -1679,7 +1691,7 @@ impl Timeline {
Lsn(cont_lsn.0 - 1),
request_lsn,
timeline.ancestor_lsn
), traversal_path);
), &traversal_path);
}
prev_lsn = cont_lsn;
}
@@ -1689,7 +1701,7 @@ impl Timeline {
"could not find data for key {} at LSN {}, for request at LSN {}",
key, cont_lsn, request_lsn
),
traversal_path,
&traversal_path,
);
}
}
@@ -1708,82 +1720,54 @@ impl Timeline {
timeline_owned = ancestor;
timeline = &*timeline_owned;
prev_lsn = Lsn(u64::MAX);
continue;
continue 'outer;
}
let layers = timeline.layers.read().unwrap();
// Check the open and frozen in-memory layers first, in order from newest
// to oldest.
if let Some(open_layer) = &layers.open_layer {
let start_lsn = open_layer.get_lsn_range().start;
if cont_lsn > start_lsn {
//info!("CHECKING for {} at {} on open layer {}", key, cont_lsn, open_layer.filename().display());
// Get all the data needed to reconstruct the page version from this layer.
// But if we have an older cached page image, no need to go past that.
let lsn_floor = max(cached_lsn + 1, start_lsn);
result = match open_layer.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
) {
Ok(result) => result,
Err(e) => return PageReconstructResult::from(e),
};
cont_lsn = lsn_floor;
traversal_path.push((result, cont_lsn, Box::new(open_layer.clone())));
continue;
}
}
for frozen_layer in layers.frozen_layers.iter().rev() {
let start_lsn = frozen_layer.get_lsn_range().start;
if cont_lsn > start_lsn {
//info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display());
let lsn_floor = max(cached_lsn + 1, start_lsn);
result = match frozen_layer.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
) {
Ok(result) => result,
Err(e) => return PageReconstructResult::from(e),
};
cont_lsn = lsn_floor;
traversal_path.push((result, cont_lsn, Box::new(frozen_layer.clone())));
continue 'outer;
}
}
if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
//info!("CHECKING for {} at {} on historic layer {}", key, cont_lsn, layer.filename().display());
// If it's a remote layer, the caller can do the download and retry.
if let Some(remote_layer) = super::storage_layer::downcast_remote_layer(&layer) {
info!("need remote layer {}", layer.traversal_id());
return PageReconstructResult::NeedsDownload(
Weak::clone(&timeline.myself),
Arc::downgrade(&remote_layer),
);
}
let lsn_floor = max(cached_lsn + 1, lsn_floor);
result = match layer.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
) {
Ok(result) => result,
Err(e) => return PageReconstructResult::from(e),
loop {
let remote_layer = {
let layers = timeline.layers.read().unwrap();
if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
// If it's a remote layer, download it and retry.
if let Some(remote_layer) = layer.downcast_remote_layer() {
// TODO: push a breadcrumb to 'traversal_path' to record the fact that
// we downloaded / would need to download this.
remote_layer
} else {
// Get all the data needed to reconstruct the page version from this layer.
// But if we have an older cached page image, no need to go past that.
let lsn_floor = max(cached_lsn + 1, lsn_floor);
result = match layer.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
) {
Ok(result) => result,
Err(e) => return PageReconstructResult::from(e),
};
cont_lsn = lsn_floor;
traversal_path.push((result, cont_lsn, Box::new(layer)));
continue 'outer;
}
} else if timeline.ancestor_timeline.is_some() {
// Nothing on this timeline. Traverse to parent
result = ValueReconstructResult::Continue;
cont_lsn = Lsn(timeline.ancestor_lsn.0 + 1);
continue 'outer;
} else {
// Nothing found
result = ValueReconstructResult::Missing;
continue 'outer;
}
};
cont_lsn = lsn_floor;
traversal_path.push((result, cont_lsn, Box::new(layer.clone())));
} else if timeline.ancestor_timeline.is_some() {
// Nothing on this timeline. Traverse to parent
result = ValueReconstructResult::Continue;
cont_lsn = Lsn(timeline.ancestor_lsn.0 + 1);
} else {
// Nothing found
result = ValueReconstructResult::Missing;
// The next layer doesn't exist locally. The caller can do the download and retry.
// (The control flow is a bit complicated here because we must drop the 'layers'
// lock before awaiting on the Future.)
info!("need remote layer {}", remote_layer.traversal_id());
return PageReconstructResult::NeedsDownload(
Weak::clone(&timeline.myself),
Arc::downgrade(&remote_layer),
);
}
}
}
@@ -3362,7 +3346,7 @@ where
/// to an error, as anyhow context information.
fn layer_traversal_error(
msg: String,
path: Vec<(ValueReconstructResult, Lsn, Box<dyn TraversalLayerExt>)>,
path: &[(ValueReconstructResult, Lsn, Box<dyn TraversalLayerExt>)],
) -> PageReconstructResult<()> {
// We want the original 'msg' to be the outermost context. The outermost context
// is the most high-level information, which also gets propagated to the client.