Compare commits

...

2 Commits

Author SHA1 Message Date
Heikki Linnakangas
bba997ee00 Refactor LayerMap::search to also search open and frozen layers.
This allows refactoring in get_reconstruct_data() which is needed by
PR #3228. That PR turns some functions async, and you cannot hold a
RwLock over 'await'. And it's enough to "drop(guard)" the lock guard,
it has to actually go out-of-scope to placate the compiler. This
refactoring allows dropping the lock on 'layers' at end of scope.
2023-01-03 22:59:18 +02:00
Heikki Linnakangas
70a0cce609 Refactor send_tarball()
The Basebackup struct is really just a convenient place to carry the
various parameters around in send_tarball and its subroutines. Make it
internal to the send_tarball function.
2023-01-03 20:47:38 +02:00
5 changed files with 221 additions and 164 deletions

View File

@@ -10,11 +10,10 @@
//! This module is responsible for creation of such tarball //! This module is responsible for creation of such tarball
//! from data stored in object storage. //! from data stored in object storage.
//! //!
use anyhow::{anyhow, bail, ensure, Context, Result}; use anyhow::{anyhow, bail, ensure, Context};
use bytes::{BufMut, BytesMut}; use bytes::{BufMut, BytesMut};
use fail::fail_point; use fail::fail_point;
use std::fmt::Write as FmtWrite; use std::fmt::Write as FmtWrite;
use std::sync::Arc;
use std::time::SystemTime; use std::time::SystemTime;
use tokio::io; use tokio::io;
use tokio::io::AsyncWrite; use tokio::io::AsyncWrite;
@@ -39,16 +38,87 @@ use postgres_ffi::PG_TLI;
use postgres_ffi::{BLCKSZ, RELSEG_SIZE, WAL_SEGMENT_SIZE}; use postgres_ffi::{BLCKSZ, RELSEG_SIZE, WAL_SEGMENT_SIZE};
use utils::lsn::Lsn; use utils::lsn::Lsn;
pub async fn send_basebackup_tarball<'a, W>(
write: &'a mut W,
timeline: &'a Timeline,
req_lsn: Option<Lsn>,
prev_lsn: Option<Lsn>,
full_backup: bool,
) -> anyhow::Result<()>
where
W: AsyncWrite + Send + Sync + Unpin,
{
// Compute postgres doesn't have any previous WAL files, but the first
// record that it's going to write needs to include the LSN of the
// previous record (xl_prev). We include prev_record_lsn in the
// "zenith.signal" file, so that postgres can read it during startup.
//
// We don't keep full history of record boundaries in the page server,
// however, only the predecessor of the latest record on each
// timeline. So we can only provide prev_record_lsn when you take a
// base backup at the end of the timeline, i.e. at last_record_lsn.
// Even at the end of the timeline, we sometimes don't have a valid
// prev_lsn value; that happens if the timeline was just branched from
// an old LSN and it doesn't have any WAL of its own yet. We will set
// prev_lsn to Lsn(0) if we cannot provide the correct value.
let (backup_prev, backup_lsn) = if let Some(req_lsn) = req_lsn {
// Backup was requested at a particular LSN. The caller should've
// already checked that it's a valid LSN.
// If the requested point is the end of the timeline, we can
// provide prev_lsn. (get_last_record_rlsn() might return it as
// zero, though, if no WAL has been generated on this timeline
// yet.)
let end_of_timeline = timeline.get_last_record_rlsn();
if req_lsn == end_of_timeline.last {
(end_of_timeline.prev, req_lsn)
} else {
(Lsn(0), req_lsn)
}
} else {
// Backup was requested at end of the timeline.
let end_of_timeline = timeline.get_last_record_rlsn();
(end_of_timeline.prev, end_of_timeline.last)
};
// Consolidate the derived and the provided prev_lsn values
let prev_lsn = if let Some(provided_prev_lsn) = prev_lsn {
if backup_prev != Lsn(0) {
ensure!(backup_prev == provided_prev_lsn);
}
provided_prev_lsn
} else {
backup_prev
};
info!(
"taking basebackup lsn={}, prev_lsn={} (full_backup={})",
backup_lsn, prev_lsn, full_backup
);
let basebackup = Basebackup {
ar: Builder::new_non_terminated(write),
timeline,
lsn: backup_lsn,
prev_record_lsn: prev_lsn,
full_backup,
};
basebackup
.send_tarball()
.instrument(info_span!("send_tarball", backup_lsn=%backup_lsn))
.await
}
/// This is short-living object only for the time of tarball creation, /// This is short-living object only for the time of tarball creation,
/// created mostly to avoid passing a lot of parameters between various functions /// created mostly to avoid passing a lot of parameters between various functions
/// used for constructing tarball. /// used for constructing tarball.
pub struct Basebackup<'a, W> struct Basebackup<'a, W>
where where
W: AsyncWrite + Send + Sync + Unpin, W: AsyncWrite + Send + Sync + Unpin,
{ {
ar: Builder<&'a mut W>, ar: Builder<&'a mut W>,
timeline: &'a Arc<Timeline>, timeline: &'a Timeline,
pub lsn: Lsn, lsn: Lsn,
prev_record_lsn: Lsn, prev_record_lsn: Lsn,
full_backup: bool, full_backup: bool,
} }
@@ -65,88 +135,33 @@ impl<'a, W> Basebackup<'a, W>
where where
W: AsyncWrite + Send + Sync + Unpin, W: AsyncWrite + Send + Sync + Unpin,
{ {
pub fn new( async fn send_tarball(mut self) -> anyhow::Result<()> {
write: &'a mut W,
timeline: &'a Arc<Timeline>,
req_lsn: Option<Lsn>,
prev_lsn: Option<Lsn>,
full_backup: bool,
) -> Result<Basebackup<'a, W>> {
// Compute postgres doesn't have any previous WAL files, but the first
// record that it's going to write needs to include the LSN of the
// previous record (xl_prev). We include prev_record_lsn in the
// "zenith.signal" file, so that postgres can read it during startup.
//
// We don't keep full history of record boundaries in the page server,
// however, only the predecessor of the latest record on each
// timeline. So we can only provide prev_record_lsn when you take a
// base backup at the end of the timeline, i.e. at last_record_lsn.
// Even at the end of the timeline, we sometimes don't have a valid
// prev_lsn value; that happens if the timeline was just branched from
// an old LSN and it doesn't have any WAL of its own yet. We will set
// prev_lsn to Lsn(0) if we cannot provide the correct value.
let (backup_prev, backup_lsn) = if let Some(req_lsn) = req_lsn {
// Backup was requested at a particular LSN. The caller should've
// already checked that it's a valid LSN.
// If the requested point is the end of the timeline, we can
// provide prev_lsn. (get_last_record_rlsn() might return it as
// zero, though, if no WAL has been generated on this timeline
// yet.)
let end_of_timeline = timeline.get_last_record_rlsn();
if req_lsn == end_of_timeline.last {
(end_of_timeline.prev, req_lsn)
} else {
(Lsn(0), req_lsn)
}
} else {
// Backup was requested at end of the timeline.
let end_of_timeline = timeline.get_last_record_rlsn();
(end_of_timeline.prev, end_of_timeline.last)
};
// Consolidate the derived and the provided prev_lsn values
let prev_lsn = if let Some(provided_prev_lsn) = prev_lsn {
if backup_prev != Lsn(0) {
ensure!(backup_prev == provided_prev_lsn)
}
provided_prev_lsn
} else {
backup_prev
};
info!(
"taking basebackup lsn={}, prev_lsn={} (full_backup={})",
backup_lsn, prev_lsn, full_backup
);
Ok(Basebackup {
ar: Builder::new_non_terminated(write),
timeline,
lsn: backup_lsn,
prev_record_lsn: prev_lsn,
full_backup,
})
}
pub async fn send_tarball(mut self) -> anyhow::Result<()> {
// TODO include checksum // TODO include checksum
// Create pgdata subdirs structure // Create pgdata subdirs structure
for dir in PGDATA_SUBDIRS.iter() { for dir in PGDATA_SUBDIRS.iter() {
let header = new_tar_header_dir(dir)?; let header = new_tar_header_dir(dir)?;
self.ar.append(&header, &mut io::empty()).await?; self.ar
.append(&header, &mut io::empty())
.await
.context("could not add directory to basebackup tarball")?;
} }
// Send empty config files. // Send config files.
for filepath in PGDATA_SPECIAL_FILES.iter() { for filepath in PGDATA_SPECIAL_FILES.iter() {
if *filepath == "pg_hba.conf" { if *filepath == "pg_hba.conf" {
let data = PG_HBA.as_bytes(); let data = PG_HBA.as_bytes();
let header = new_tar_header(filepath, data.len() as u64)?; let header = new_tar_header(filepath, data.len() as u64)?;
self.ar.append(&header, data).await?; self.ar
.append(&header, data)
.await
.context("could not add config file to basebackup tarball")?;
} else { } else {
let header = new_tar_header(filepath, 0)?; let header = new_tar_header(filepath, 0)?;
self.ar.append(&header, &mut io::empty()).await?; self.ar
.append(&header, &mut io::empty())
.await
.context("could not add config file to basebackup tarball")?;
} }
} }

View File

@@ -641,10 +641,8 @@ impl PageServerHandler {
/* Send a tarball of the latest layer on the timeline */ /* Send a tarball of the latest layer on the timeline */
{ {
let mut writer = pgb.copyout_writer(); let mut writer = pgb.copyout_writer();
let basebackup = basebackup::send_basebackup_tarball(&mut writer, &timeline, lsn, prev_lsn, full_backup)
basebackup::Basebackup::new(&mut writer, &timeline, lsn, prev_lsn, full_backup)?; .await?;
tracing::Span::current().record("lsn", basebackup.lsn.to_string().as_str());
basebackup.send_tarball().await?;
} }
pgb.write_message(&BeMessage::CopyDone)?; pgb.write_message(&BeMessage::CopyDone)?;

View File

@@ -26,7 +26,7 @@ use std::sync::Arc;
use tracing::*; use tracing::*;
use utils::lsn::Lsn; use utils::lsn::Lsn;
use super::storage_layer::{InMemoryLayer, Layer}; use super::storage_layer::{InMemoryLayer, InMemoryOrHistoricLayer, Layer};
/// ///
/// LayerMap tracks what layers exist on a timeline. /// LayerMap tracks what layers exist on a timeline.
@@ -241,7 +241,8 @@ where
/// Return value of LayerMap::search /// Return value of LayerMap::search
pub struct SearchResult<L: ?Sized> { pub struct SearchResult<L: ?Sized> {
pub layer: Arc<L>, // FIXME: I wish this could be Arc<dyn Layer>. But I couldn't make that work.
pub layer: InMemoryOrHistoricLayer<L>,
pub lsn_floor: Lsn, pub lsn_floor: Lsn,
} }
@@ -261,6 +262,30 @@ where
/// layer. /// layer.
/// ///
pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult<L>> { pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult<L>> {
// First check if an open or frozen layer matches
if let Some(open_layer) = &self.open_layer {
let start_lsn = open_layer.get_lsn_range().start;
if end_lsn > start_lsn {
return Some(SearchResult {
layer: InMemoryOrHistoricLayer::InMemory(Arc::clone(open_layer)),
lsn_floor: start_lsn,
});
}
}
for frozen_layer in self.frozen_layers.iter().rev() {
let start_lsn = frozen_layer.get_lsn_range().start;
if end_lsn > start_lsn {
return Some(SearchResult {
layer: InMemoryOrHistoricLayer::InMemory(Arc::clone(frozen_layer)),
lsn_floor: start_lsn,
});
}
}
self.search_historic(key, end_lsn)
}
fn search_historic(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult<L>> {
// linear search // linear search
// Find the latest image layer that covers the given key // Find the latest image layer that covers the given key
let mut latest_img: Option<Arc<L>> = None; let mut latest_img: Option<Arc<L>> = None;
@@ -286,7 +311,7 @@ where
if Lsn(img_lsn.0 + 1) == end_lsn { if Lsn(img_lsn.0 + 1) == end_lsn {
// found exact match // found exact match
return Some(SearchResult { return Some(SearchResult {
layer: Arc::clone(l), layer: InMemoryOrHistoricLayer::Historic(Arc::clone(l)),
lsn_floor: img_lsn, lsn_floor: img_lsn,
}); });
} }
@@ -349,13 +374,13 @@ where
); );
Some(SearchResult { Some(SearchResult {
lsn_floor, lsn_floor,
layer: l, layer: InMemoryOrHistoricLayer::Historic(l),
}) })
} else if let Some(l) = latest_img { } else if let Some(l) = latest_img {
trace!("found img layer and no deltas for request on {key} at {end_lsn}"); trace!("found img layer and no deltas for request on {key} at {end_lsn}");
Some(SearchResult { Some(SearchResult {
lsn_floor: latest_img_lsn.unwrap(), lsn_floor: latest_img_lsn.unwrap(),
layer: l, layer: InMemoryOrHistoricLayer::Historic(l),
}) })
} else { } else {
trace!("no layer found for request on {key} at {end_lsn}"); trace!("no layer found for request on {key} at {end_lsn}");

View File

@@ -196,3 +196,38 @@ pub fn downcast_remote_layer(
None None
} }
} }
pub enum InMemoryOrHistoricLayer<L: ?Sized> {
InMemory(Arc<InMemoryLayer>),
Historic(Arc<L>),
}
impl<L: ?Sized> InMemoryOrHistoricLayer<L>
where
L: PersistentLayer,
{
pub fn downcast_remote_layer(&self) -> Option<std::sync::Arc<RemoteLayer>> {
match self {
Self::InMemory(_) => None,
Self::Historic(l) => {
if l.is_remote_layer() {
Arc::clone(l).downcast_remote_layer()
} else {
None
}
}
}
}
pub fn get_value_reconstruct_data(
&self,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_data: &mut ValueReconstructState,
) -> Result<ValueReconstructResult> {
match self {
Self::InMemory(l) => l.get_value_reconstruct_data(key, lsn_range, reconstruct_data),
Self::Historic(l) => l.get_value_reconstruct_data(key, lsn_range, reconstruct_data),
}
}
}

View File

@@ -25,8 +25,8 @@ use std::time::{Duration, Instant, SystemTime};
use crate::tenant::remote_timeline_client::{self, index::LayerFileMetadata}; use crate::tenant::remote_timeline_client::{self, index::LayerFileMetadata};
use crate::tenant::storage_layer::{ use crate::tenant::storage_layer::{
DeltaFileName, DeltaLayerWriter, ImageFileName, ImageLayerWriter, InMemoryLayer, LayerFileName, DeltaFileName, DeltaLayerWriter, ImageFileName, ImageLayerWriter, InMemoryLayer,
RemoteLayer, InMemoryOrHistoricLayer, LayerFileName, RemoteLayer,
}; };
use crate::tenant::{ use crate::tenant::{
ephemeral_file::is_ephemeral_file, ephemeral_file::is_ephemeral_file,
@@ -1591,7 +1591,7 @@ trait TraversalLayerExt {
fn traversal_id(&self) -> TraversalId; fn traversal_id(&self) -> TraversalId;
} }
impl TraversalLayerExt for Arc<dyn PersistentLayer> { impl<T: PersistentLayer + ?Sized> TraversalLayerExt for T {
fn traversal_id(&self) -> TraversalId { fn traversal_id(&self) -> TraversalId {
match self.local_path() { match self.local_path() {
Some(local_path) => { Some(local_path) => {
@@ -1621,6 +1621,15 @@ impl TraversalLayerExt for Arc<InMemoryLayer> {
} }
} }
impl TraversalLayerExt for InMemoryOrHistoricLayer<dyn PersistentLayer> {
fn traversal_id(&self) -> String {
match self {
Self::InMemory(l) => l.traversal_id(),
Self::Historic(l) => l.traversal_id(),
}
}
}
impl Timeline { impl Timeline {
/// ///
/// Get a handle to a Layer for reading. /// Get a handle to a Layer for reading.
@@ -1642,8 +1651,11 @@ impl Timeline {
// For debugging purposes, collect the path of layers that we traversed // For debugging purposes, collect the path of layers that we traversed
// through. It's included in the error message if we fail to find the key. // through. It's included in the error message if we fail to find the key.
let mut traversal_path = let mut traversal_path = Vec::<(
Vec::<(ValueReconstructResult, Lsn, Box<dyn TraversalLayerExt>)>::new(); ValueReconstructResult,
Lsn,
Box<dyn TraversalLayerExt>,
)>::new();
let cached_lsn = if let Some((cached_lsn, _)) = &reconstruct_state.img { let cached_lsn = if let Some((cached_lsn, _)) = &reconstruct_state.img {
*cached_lsn *cached_lsn
@@ -1679,7 +1691,7 @@ impl Timeline {
Lsn(cont_lsn.0 - 1), Lsn(cont_lsn.0 - 1),
request_lsn, request_lsn,
timeline.ancestor_lsn timeline.ancestor_lsn
), traversal_path); ), &traversal_path);
} }
prev_lsn = cont_lsn; prev_lsn = cont_lsn;
} }
@@ -1689,7 +1701,7 @@ impl Timeline {
"could not find data for key {} at LSN {}, for request at LSN {}", "could not find data for key {} at LSN {}, for request at LSN {}",
key, cont_lsn, request_lsn key, cont_lsn, request_lsn
), ),
traversal_path, &traversal_path,
); );
} }
} }
@@ -1708,82 +1720,54 @@ impl Timeline {
timeline_owned = ancestor; timeline_owned = ancestor;
timeline = &*timeline_owned; timeline = &*timeline_owned;
prev_lsn = Lsn(u64::MAX); prev_lsn = Lsn(u64::MAX);
continue; continue 'outer;
} }
let layers = timeline.layers.read().unwrap(); loop {
let remote_layer = {
// Check the open and frozen in-memory layers first, in order from newest let layers = timeline.layers.read().unwrap();
// to oldest. if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
if let Some(open_layer) = &layers.open_layer { // If it's a remote layer, download it and retry.
let start_lsn = open_layer.get_lsn_range().start; if let Some(remote_layer) = layer.downcast_remote_layer() {
if cont_lsn > start_lsn { // TODO: push a breadcrumb to 'traversal_path' to record the fact that
//info!("CHECKING for {} at {} on open layer {}", key, cont_lsn, open_layer.filename().display()); // we downloaded / would need to download this.
// Get all the data needed to reconstruct the page version from this layer. remote_layer
// But if we have an older cached page image, no need to go past that. } else {
let lsn_floor = max(cached_lsn + 1, start_lsn); // Get all the data needed to reconstruct the page version from this layer.
result = match open_layer.get_value_reconstruct_data( // But if we have an older cached page image, no need to go past that.
key, let lsn_floor = max(cached_lsn + 1, lsn_floor);
lsn_floor..cont_lsn, result = match layer.get_value_reconstruct_data(
reconstruct_state, key,
) { lsn_floor..cont_lsn,
Ok(result) => result, reconstruct_state,
Err(e) => return PageReconstructResult::from(e), ) {
}; Ok(result) => result,
cont_lsn = lsn_floor; Err(e) => return PageReconstructResult::from(e),
traversal_path.push((result, cont_lsn, Box::new(open_layer.clone()))); };
continue; cont_lsn = lsn_floor;
} traversal_path.push((result, cont_lsn, Box::new(layer)));
} continue 'outer;
for frozen_layer in layers.frozen_layers.iter().rev() { }
let start_lsn = frozen_layer.get_lsn_range().start; } else if timeline.ancestor_timeline.is_some() {
if cont_lsn > start_lsn { // Nothing on this timeline. Traverse to parent
//info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display()); result = ValueReconstructResult::Continue;
let lsn_floor = max(cached_lsn + 1, start_lsn); cont_lsn = Lsn(timeline.ancestor_lsn.0 + 1);
result = match frozen_layer.get_value_reconstruct_data( continue 'outer;
key, } else {
lsn_floor..cont_lsn, // Nothing found
reconstruct_state, result = ValueReconstructResult::Missing;
) { continue 'outer;
Ok(result) => result, }
Err(e) => return PageReconstructResult::from(e),
};
cont_lsn = lsn_floor;
traversal_path.push((result, cont_lsn, Box::new(frozen_layer.clone())));
continue 'outer;
}
}
if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
//info!("CHECKING for {} at {} on historic layer {}", key, cont_lsn, layer.filename().display());
// If it's a remote layer, the caller can do the download and retry.
if let Some(remote_layer) = super::storage_layer::downcast_remote_layer(&layer) {
info!("need remote layer {}", layer.traversal_id());
return PageReconstructResult::NeedsDownload(
Weak::clone(&timeline.myself),
Arc::downgrade(&remote_layer),
);
}
let lsn_floor = max(cached_lsn + 1, lsn_floor);
result = match layer.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
) {
Ok(result) => result,
Err(e) => return PageReconstructResult::from(e),
}; };
cont_lsn = lsn_floor;
traversal_path.push((result, cont_lsn, Box::new(layer.clone()))); // The next layer doesn't exist locally. The caller can do the download and retry.
} else if timeline.ancestor_timeline.is_some() { // (The control flow is a bit complicated here because we must drop the 'layers'
// Nothing on this timeline. Traverse to parent // lock before awaiting on the Future.)
result = ValueReconstructResult::Continue; info!("need remote layer {}", remote_layer.traversal_id());
cont_lsn = Lsn(timeline.ancestor_lsn.0 + 1); return PageReconstructResult::NeedsDownload(
} else { Weak::clone(&timeline.myself),
// Nothing found Arc::downgrade(&remote_layer),
result = ValueReconstructResult::Missing; );
} }
} }
} }
@@ -3362,7 +3346,7 @@ where
/// to an error, as anyhow context information. /// to an error, as anyhow context information.
fn layer_traversal_error( fn layer_traversal_error(
msg: String, msg: String,
path: Vec<(ValueReconstructResult, Lsn, Box<dyn TraversalLayerExt>)>, path: &[(ValueReconstructResult, Lsn, Box<dyn TraversalLayerExt>)],
) -> PageReconstructResult<()> { ) -> PageReconstructResult<()> {
// We want the original 'msg' to be the outermost context. The outermost context // We want the original 'msg' to be the outermost context. The outermost context
// is the most high-level information, which also gets propagated to the client. // is the most high-level information, which also gets propagated to the client.