mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-16 20:50:37 +00:00
Motivation ========== Layer Eviction Needs Context ---------------------------- Before we start implementing layer eviction, we need to collect some access statistics per layer file or maybe even page. Part of these statistics should be the initiator of a page read request to answer the question of whether it was page_service vs. one of the background loops, and if the latter, which of them? Further, it would be nice to learn more about what activity in the pageserver initiated an on-demand download of a layer file. We will use this information to test out layer eviction policies. Read more about the current plan for layer eviction here: https://github.com/neondatabase/neon/issues/2476#issuecomment-1370822104 task_mgr problems + cancellation + tenant/timeline lifecycle ------------------------------------------------------------ Apart from layer eviction, we have long-standing problems with task_mgr, task cancellation, and various races around tenant / timeline lifecycle transitions. One approach to solve these is to abandon task_mgr in favor of a mechanism similar to Golang's context.Context, albeit extended to support waiting for completion, and specialized to the needs in the pageserver. Heikki solves all of the above at once in PR https://github.com/neondatabase/neon/pull/3228 , which is not yet merged at the time of writing. What Is This Patch About ======================== This patch addresses the immediate needs of layer eviction by introducing a `RequestContext` structure that is plumbed through the pageserver - all the way from the various entrypoints (page_service, management API, tenant background loops) down to Timeline::{get,get_reconstruct_data}. The struct carries a description of the kind of activity that initiated the call. We re-use task_mgr::TaskKind for this. Also, it carries the desired on-demand download behavior of the entrypoint. Timeline::get_reconstruct_data can then log the TaskKind that initiated the on-demand download. I developed this patch by git-checking-out Heikki's big RequestContext PR https://github.com/neondatabase/neon/pull/3228 , then deleting all the functionality that we do not need to address the needs for layer eviction. After that, I added a few things on top: 1. The concept of attached_child and detached_child in preparation for cancellation signalling through RequestContext, which will be added in a future patch. 2. A kill switch to turn DownloadBehavior::Error into a warning. 3. Renamed WalReceiverConnection to WalReceiverConnectionPoller and added an additional TaskKind WalReceiverConnectionHandler.These were necessary to create proper detached_child-type RequestContexts for the various tasks that walreceiver starts. How To Review This Patch ======================== Start your review with the module-level comment in context.rs. It explains the idea of RequestContext, what parts of it are implemented in this patch, and the future plans for RequestContext. Then review the various `task_mgr::spawn` call sites. At each of them, we should be creating a new detached_child RequestContext. Then review the (few) RequestContext::attached_child call sites and ensure that the spawned tasks do not outlive the task that spawns them. If they do, these call sites should use detached_child() instead. Then review the todo_child() call sites and judge whether it's worth the trouble of plumbing through a parent context from the caller(s). Lastly, go through the bulk of mechanical changes that simply forwards the &ctx.
620 lines
20 KiB
Rust
620 lines
20 KiB
Rust
//!
|
|
//! Import data and WAL from a PostgreSQL data directory and WAL segments into
|
|
//! a neon Timeline.
|
|
//!
|
|
use std::path::{Path, PathBuf};
|
|
|
|
use anyhow::{bail, ensure, Context, Result};
|
|
use bytes::Bytes;
|
|
use futures::StreamExt;
|
|
use tokio::io::{AsyncRead, AsyncReadExt};
|
|
use tokio_tar::Archive;
|
|
use tracing::*;
|
|
use walkdir::WalkDir;
|
|
|
|
use crate::context::RequestContext;
|
|
use crate::pgdatadir_mapping::*;
|
|
use crate::tenant::Timeline;
|
|
use crate::walingest::WalIngest;
|
|
use crate::walrecord::DecodedWALRecord;
|
|
use pageserver_api::reltag::{RelTag, SlruKind};
|
|
use postgres_ffi::pg_constants;
|
|
use postgres_ffi::relfile_utils::*;
|
|
use postgres_ffi::waldecoder::WalStreamDecoder;
|
|
use postgres_ffi::ControlFileData;
|
|
use postgres_ffi::DBState_DB_SHUTDOWNED;
|
|
use postgres_ffi::Oid;
|
|
use postgres_ffi::XLogFileName;
|
|
use postgres_ffi::{BLCKSZ, WAL_SEGMENT_SIZE};
|
|
use utils::lsn::Lsn;
|
|
|
|
// Returns checkpoint LSN from controlfile
|
|
pub fn get_lsn_from_controlfile(path: &Path) -> Result<Lsn> {
|
|
// Read control file to extract the LSN
|
|
let controlfile_path = path.join("global").join("pg_control");
|
|
let controlfile = ControlFileData::decode(&std::fs::read(controlfile_path)?)?;
|
|
let lsn = controlfile.checkPoint;
|
|
|
|
Ok(Lsn(lsn))
|
|
}
|
|
|
|
///
|
|
/// Import all relation data pages from local disk into the repository.
|
|
///
|
|
/// This is currently only used to import a cluster freshly created by initdb.
|
|
/// The code that deals with the checkpoint would not work right if the
|
|
/// cluster was not shut down cleanly.
|
|
pub async fn import_timeline_from_postgres_datadir(
|
|
tline: &Timeline,
|
|
pgdata_path: &Path,
|
|
pgdata_lsn: Lsn,
|
|
ctx: &RequestContext,
|
|
) -> Result<()> {
|
|
let mut pg_control: Option<ControlFileData> = None;
|
|
|
|
// TODO this shoud be start_lsn, which is not necessarily equal to end_lsn (aka lsn)
|
|
// Then fishing out pg_control would be unnecessary
|
|
let mut modification = tline.begin_modification(pgdata_lsn);
|
|
modification.init_empty()?;
|
|
|
|
// Import all but pg_wal
|
|
let all_but_wal = WalkDir::new(pgdata_path)
|
|
.into_iter()
|
|
.filter_entry(|entry| !entry.path().ends_with("pg_wal"));
|
|
for entry in all_but_wal {
|
|
let entry = entry?;
|
|
let metadata = entry.metadata().expect("error getting dir entry metadata");
|
|
if metadata.is_file() {
|
|
let absolute_path = entry.path();
|
|
let relative_path = absolute_path.strip_prefix(pgdata_path)?;
|
|
|
|
let mut file = tokio::fs::File::open(absolute_path).await?;
|
|
let len = metadata.len() as usize;
|
|
if let Some(control_file) =
|
|
import_file(&mut modification, relative_path, &mut file, len, ctx).await?
|
|
{
|
|
pg_control = Some(control_file);
|
|
}
|
|
modification.flush()?;
|
|
}
|
|
}
|
|
|
|
// We're done importing all the data files.
|
|
modification.commit()?;
|
|
|
|
// We expect the Postgres server to be shut down cleanly.
|
|
let pg_control = pg_control.context("pg_control file not found")?;
|
|
ensure!(
|
|
pg_control.state == DBState_DB_SHUTDOWNED,
|
|
"Postgres cluster was not shut down cleanly"
|
|
);
|
|
ensure!(
|
|
pg_control.checkPointCopy.redo == pgdata_lsn.0,
|
|
"unexpected checkpoint REDO pointer"
|
|
);
|
|
|
|
// Import WAL. This is needed even when starting from a shutdown checkpoint, because
|
|
// this reads the checkpoint record itself, advancing the tip of the timeline to
|
|
// *after* the checkpoint record. And crucially, it initializes the 'prev_lsn'.
|
|
import_wal(
|
|
&pgdata_path.join("pg_wal"),
|
|
tline,
|
|
Lsn(pg_control.checkPointCopy.redo),
|
|
pgdata_lsn,
|
|
ctx,
|
|
)
|
|
.await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
// subroutine of import_timeline_from_postgres_datadir(), to load one relation file.
|
|
async fn import_rel(
|
|
modification: &mut DatadirModification<'_>,
|
|
path: &Path,
|
|
spcoid: Oid,
|
|
dboid: Oid,
|
|
reader: &mut (impl AsyncRead + Send + Sync + Unpin),
|
|
len: usize,
|
|
ctx: &RequestContext,
|
|
) -> anyhow::Result<()> {
|
|
// Does it look like a relation file?
|
|
trace!("importing rel file {}", path.display());
|
|
|
|
let filename = &path
|
|
.file_name()
|
|
.expect("missing rel filename")
|
|
.to_string_lossy();
|
|
let (relnode, forknum, segno) = parse_relfilename(filename).map_err(|e| {
|
|
warn!("unrecognized file in postgres datadir: {:?} ({})", path, e);
|
|
e
|
|
})?;
|
|
|
|
let mut buf: [u8; 8192] = [0u8; 8192];
|
|
|
|
ensure!(len % BLCKSZ as usize == 0);
|
|
let nblocks = len / BLCKSZ as usize;
|
|
|
|
let rel = RelTag {
|
|
spcnode: spcoid,
|
|
dbnode: dboid,
|
|
relnode,
|
|
forknum,
|
|
};
|
|
|
|
let mut blknum: u32 = segno * (1024 * 1024 * 1024 / BLCKSZ as u32);
|
|
|
|
// Call put_rel_creation for every segment of the relation,
|
|
// because there is no guarantee about the order in which we are processing segments.
|
|
// ignore "relation already exists" error
|
|
//
|
|
// FIXME: use proper error type for this, instead of parsing the error message.
|
|
// Or better yet, keep track of which relations we've already created
|
|
// https://github.com/neondatabase/neon/issues/3309
|
|
if let Err(e) = modification
|
|
.put_rel_creation(rel, nblocks as u32, ctx)
|
|
.await
|
|
{
|
|
if e.to_string().contains("already exists") {
|
|
debug!("relation {} already exists. we must be extending it", rel);
|
|
} else {
|
|
return Err(e);
|
|
}
|
|
}
|
|
|
|
loop {
|
|
let r = reader.read_exact(&mut buf).await;
|
|
match r {
|
|
Ok(_) => {
|
|
modification.put_rel_page_image(rel, blknum, Bytes::copy_from_slice(&buf))?;
|
|
}
|
|
|
|
// TODO: UnexpectedEof is expected
|
|
Err(err) => match err.kind() {
|
|
std::io::ErrorKind::UnexpectedEof => {
|
|
// reached EOF. That's expected.
|
|
let relative_blknum = blknum - segno * (1024 * 1024 * 1024 / BLCKSZ as u32);
|
|
ensure!(relative_blknum == nblocks as u32, "unexpected EOF");
|
|
break;
|
|
}
|
|
_ => {
|
|
bail!("error reading file {}: {:#}", path.display(), err);
|
|
}
|
|
},
|
|
};
|
|
blknum += 1;
|
|
}
|
|
|
|
// Update relation size
|
|
//
|
|
// If we process rel segments out of order,
|
|
// put_rel_extend will skip the update.
|
|
modification.put_rel_extend(rel, blknum, ctx).await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Import an SLRU segment file
|
|
///
|
|
async fn import_slru(
|
|
modification: &mut DatadirModification<'_>,
|
|
slru: SlruKind,
|
|
path: &Path,
|
|
reader: &mut (impl AsyncRead + Send + Sync + Unpin),
|
|
len: usize,
|
|
ctx: &RequestContext,
|
|
) -> anyhow::Result<()> {
|
|
info!("importing slru file {path:?}");
|
|
|
|
let mut buf: [u8; 8192] = [0u8; 8192];
|
|
let filename = &path
|
|
.file_name()
|
|
.with_context(|| format!("missing slru filename for path {path:?}"))?
|
|
.to_string_lossy();
|
|
let segno = u32::from_str_radix(filename, 16)?;
|
|
|
|
ensure!(len % BLCKSZ as usize == 0); // we assume SLRU block size is the same as BLCKSZ
|
|
let nblocks = len / BLCKSZ as usize;
|
|
|
|
ensure!(nblocks <= pg_constants::SLRU_PAGES_PER_SEGMENT as usize);
|
|
|
|
modification
|
|
.put_slru_segment_creation(slru, segno, nblocks as u32, ctx)
|
|
.await?;
|
|
|
|
let mut rpageno = 0;
|
|
loop {
|
|
let r = reader.read_exact(&mut buf).await;
|
|
match r {
|
|
Ok(_) => {
|
|
modification.put_slru_page_image(
|
|
slru,
|
|
segno,
|
|
rpageno,
|
|
Bytes::copy_from_slice(&buf),
|
|
)?;
|
|
}
|
|
|
|
// TODO: UnexpectedEof is expected
|
|
Err(err) => match err.kind() {
|
|
std::io::ErrorKind::UnexpectedEof => {
|
|
// reached EOF. That's expected.
|
|
ensure!(rpageno == nblocks as u32, "unexpected EOF");
|
|
break;
|
|
}
|
|
_ => {
|
|
bail!("error reading file {}: {:#}", path.display(), err);
|
|
}
|
|
},
|
|
};
|
|
rpageno += 1;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Scan PostgreSQL WAL files in given directory and load all records between
|
|
/// 'startpoint' and 'endpoint' into the repository.
|
|
async fn import_wal(
|
|
walpath: &Path,
|
|
tline: &Timeline,
|
|
startpoint: Lsn,
|
|
endpoint: Lsn,
|
|
ctx: &RequestContext,
|
|
) -> anyhow::Result<()> {
|
|
let mut waldecoder = WalStreamDecoder::new(startpoint, tline.pg_version);
|
|
|
|
let mut segno = startpoint.segment_number(WAL_SEGMENT_SIZE);
|
|
let mut offset = startpoint.segment_offset(WAL_SEGMENT_SIZE);
|
|
let mut last_lsn = startpoint;
|
|
|
|
let mut walingest = WalIngest::new(tline, startpoint, ctx).await?;
|
|
|
|
while last_lsn <= endpoint {
|
|
// FIXME: assume postgresql tli 1 for now
|
|
let filename = XLogFileName(1, segno, WAL_SEGMENT_SIZE);
|
|
let mut buf = Vec::new();
|
|
|
|
// Read local file
|
|
let mut path = walpath.join(&filename);
|
|
|
|
// It could be as .partial
|
|
if !PathBuf::from(&path).exists() {
|
|
path = walpath.join(filename + ".partial");
|
|
}
|
|
|
|
// Slurp the WAL file
|
|
let mut file = std::fs::File::open(&path)?;
|
|
|
|
if offset > 0 {
|
|
use std::io::Seek;
|
|
file.seek(std::io::SeekFrom::Start(offset as u64))?;
|
|
}
|
|
|
|
use std::io::Read;
|
|
let nread = file.read_to_end(&mut buf)?;
|
|
if nread != WAL_SEGMENT_SIZE - offset {
|
|
// Maybe allow this for .partial files?
|
|
error!("read only {} bytes from WAL file", nread);
|
|
}
|
|
|
|
waldecoder.feed_bytes(&buf);
|
|
|
|
let mut nrecords = 0;
|
|
let mut modification = tline.begin_modification(endpoint);
|
|
let mut decoded = DecodedWALRecord::default();
|
|
while last_lsn <= endpoint {
|
|
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
|
|
walingest
|
|
.ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx)
|
|
.await?;
|
|
last_lsn = lsn;
|
|
|
|
nrecords += 1;
|
|
|
|
trace!("imported record at {} (end {})", lsn, endpoint);
|
|
}
|
|
}
|
|
|
|
debug!("imported {} records up to {}", nrecords, last_lsn);
|
|
|
|
segno += 1;
|
|
offset = 0;
|
|
}
|
|
|
|
if last_lsn != startpoint {
|
|
info!("reached end of WAL at {}", last_lsn);
|
|
} else {
|
|
info!("no WAL to import at {}", last_lsn);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn import_basebackup_from_tar(
|
|
tline: &Timeline,
|
|
reader: &mut (impl AsyncRead + Send + Sync + Unpin),
|
|
base_lsn: Lsn,
|
|
ctx: &RequestContext,
|
|
) -> Result<()> {
|
|
info!("importing base at {base_lsn}");
|
|
let mut modification = tline.begin_modification(base_lsn);
|
|
modification.init_empty()?;
|
|
|
|
let mut pg_control: Option<ControlFileData> = None;
|
|
|
|
// Import base
|
|
let mut entries = Archive::new(reader).entries()?;
|
|
while let Some(base_tar_entry) = entries.next().await {
|
|
let mut entry = base_tar_entry?;
|
|
let header = entry.header();
|
|
let len = header.entry_size()? as usize;
|
|
let file_path = header.path()?.into_owned();
|
|
|
|
match header.entry_type() {
|
|
tokio_tar::EntryType::Regular => {
|
|
if let Some(res) =
|
|
import_file(&mut modification, file_path.as_ref(), &mut entry, len, ctx).await?
|
|
{
|
|
// We found the pg_control file.
|
|
pg_control = Some(res);
|
|
}
|
|
modification.flush()?;
|
|
}
|
|
tokio_tar::EntryType::Directory => {
|
|
debug!("directory {:?}", file_path);
|
|
}
|
|
_ => {
|
|
bail!(
|
|
"entry {} in backup tar archive is of unexpected type: {:?}",
|
|
file_path.display(),
|
|
header.entry_type()
|
|
);
|
|
}
|
|
}
|
|
}
|
|
|
|
// sanity check: ensure that pg_control is loaded
|
|
let _pg_control = pg_control.context("pg_control file not found")?;
|
|
|
|
modification.commit()?;
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn import_wal_from_tar(
|
|
tline: &Timeline,
|
|
reader: &mut (impl AsyncRead + Send + Sync + Unpin),
|
|
start_lsn: Lsn,
|
|
end_lsn: Lsn,
|
|
ctx: &RequestContext,
|
|
) -> Result<()> {
|
|
// Set up walingest mutable state
|
|
let mut waldecoder = WalStreamDecoder::new(start_lsn, tline.pg_version);
|
|
let mut segno = start_lsn.segment_number(WAL_SEGMENT_SIZE);
|
|
let mut offset = start_lsn.segment_offset(WAL_SEGMENT_SIZE);
|
|
let mut last_lsn = start_lsn;
|
|
let mut walingest = WalIngest::new(tline, start_lsn, ctx).await?;
|
|
|
|
// Ingest wal until end_lsn
|
|
info!("importing wal until {}", end_lsn);
|
|
let mut pg_wal_tar = Archive::new(reader);
|
|
let mut pg_wal_entries = pg_wal_tar.entries()?;
|
|
while last_lsn <= end_lsn {
|
|
let bytes = {
|
|
let mut entry = pg_wal_entries
|
|
.next()
|
|
.await
|
|
.ok_or_else(|| anyhow::anyhow!("expected more wal"))??;
|
|
let header = entry.header();
|
|
let file_path = header.path()?.into_owned();
|
|
|
|
match header.entry_type() {
|
|
tokio_tar::EntryType::Regular => {
|
|
// FIXME: assume postgresql tli 1 for now
|
|
let expected_filename = XLogFileName(1, segno, WAL_SEGMENT_SIZE);
|
|
let file_name = file_path
|
|
.file_name()
|
|
.expect("missing wal filename")
|
|
.to_string_lossy();
|
|
ensure!(expected_filename == file_name);
|
|
|
|
debug!("processing wal file {:?}", file_path);
|
|
read_all_bytes(&mut entry).await?
|
|
}
|
|
tokio_tar::EntryType::Directory => {
|
|
debug!("directory {:?}", file_path);
|
|
continue;
|
|
}
|
|
_ => {
|
|
bail!(
|
|
"entry {} in WAL tar archive is of unexpected type: {:?}",
|
|
file_path.display(),
|
|
header.entry_type()
|
|
);
|
|
}
|
|
}
|
|
};
|
|
|
|
waldecoder.feed_bytes(&bytes[offset..]);
|
|
|
|
let mut modification = tline.begin_modification(end_lsn);
|
|
let mut decoded = DecodedWALRecord::default();
|
|
while last_lsn <= end_lsn {
|
|
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
|
|
walingest
|
|
.ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx)
|
|
.await?;
|
|
last_lsn = lsn;
|
|
|
|
debug!("imported record at {} (end {})", lsn, end_lsn);
|
|
}
|
|
}
|
|
|
|
debug!("imported records up to {}", last_lsn);
|
|
segno += 1;
|
|
offset = 0;
|
|
}
|
|
|
|
if last_lsn != start_lsn {
|
|
info!("reached end of WAL at {}", last_lsn);
|
|
} else {
|
|
info!("there was no WAL to import at {}", last_lsn);
|
|
}
|
|
|
|
// Log any extra unused files
|
|
while let Some(e) = pg_wal_entries.next().await {
|
|
let entry = e?;
|
|
let header = entry.header();
|
|
let file_path = header.path()?.into_owned();
|
|
info!("skipping {:?}", file_path);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn import_file(
|
|
modification: &mut DatadirModification<'_>,
|
|
file_path: &Path,
|
|
reader: &mut (impl AsyncRead + Send + Sync + Unpin),
|
|
len: usize,
|
|
ctx: &RequestContext,
|
|
) -> Result<Option<ControlFileData>> {
|
|
let file_name = match file_path.file_name() {
|
|
Some(name) => name.to_string_lossy(),
|
|
None => return Ok(None),
|
|
};
|
|
|
|
if file_name.starts_with('.') {
|
|
// tar archives on macOs, created without COPYFILE_DISABLE=1 env var
|
|
// will contain "fork files", skip them.
|
|
return Ok(None);
|
|
}
|
|
|
|
if file_path.starts_with("global") {
|
|
let spcnode = postgres_ffi::pg_constants::GLOBALTABLESPACE_OID;
|
|
let dbnode = 0;
|
|
|
|
match file_name.as_ref() {
|
|
"pg_control" => {
|
|
let bytes = read_all_bytes(reader).await?;
|
|
|
|
// Extract the checkpoint record and import it separately.
|
|
let pg_control = ControlFileData::decode(&bytes[..])?;
|
|
let checkpoint_bytes = pg_control.checkPointCopy.encode()?;
|
|
modification.put_checkpoint(checkpoint_bytes)?;
|
|
debug!("imported control file");
|
|
|
|
// Import it as ControlFile
|
|
modification.put_control_file(bytes)?;
|
|
return Ok(Some(pg_control));
|
|
}
|
|
"pg_filenode.map" => {
|
|
let bytes = read_all_bytes(reader).await?;
|
|
modification
|
|
.put_relmap_file(spcnode, dbnode, bytes, ctx)
|
|
.await?;
|
|
debug!("imported relmap file")
|
|
}
|
|
"PG_VERSION" => {
|
|
debug!("ignored PG_VERSION file");
|
|
}
|
|
_ => {
|
|
import_rel(modification, file_path, spcnode, dbnode, reader, len, ctx).await?;
|
|
debug!("imported rel creation");
|
|
}
|
|
}
|
|
} else if file_path.starts_with("base") {
|
|
let spcnode = pg_constants::DEFAULTTABLESPACE_OID;
|
|
let dbnode: u32 = file_path
|
|
.iter()
|
|
.nth(1)
|
|
.expect("invalid file path, expected dbnode")
|
|
.to_string_lossy()
|
|
.parse()?;
|
|
|
|
match file_name.as_ref() {
|
|
"pg_filenode.map" => {
|
|
let bytes = read_all_bytes(reader).await?;
|
|
modification
|
|
.put_relmap_file(spcnode, dbnode, bytes, ctx)
|
|
.await?;
|
|
debug!("imported relmap file")
|
|
}
|
|
"PG_VERSION" => {
|
|
debug!("ignored PG_VERSION file");
|
|
}
|
|
_ => {
|
|
import_rel(modification, file_path, spcnode, dbnode, reader, len, ctx).await?;
|
|
debug!("imported rel creation");
|
|
}
|
|
}
|
|
} else if file_path.starts_with("pg_xact") {
|
|
let slru = SlruKind::Clog;
|
|
|
|
import_slru(modification, slru, file_path, reader, len, ctx).await?;
|
|
debug!("imported clog slru");
|
|
} else if file_path.starts_with("pg_multixact/offsets") {
|
|
let slru = SlruKind::MultiXactOffsets;
|
|
|
|
import_slru(modification, slru, file_path, reader, len, ctx).await?;
|
|
debug!("imported multixact offsets slru");
|
|
} else if file_path.starts_with("pg_multixact/members") {
|
|
let slru = SlruKind::MultiXactMembers;
|
|
|
|
import_slru(modification, slru, file_path, reader, len, ctx).await?;
|
|
debug!("imported multixact members slru");
|
|
} else if file_path.starts_with("pg_twophase") {
|
|
let xid = u32::from_str_radix(file_name.as_ref(), 16)?;
|
|
|
|
let bytes = read_all_bytes(reader).await?;
|
|
modification
|
|
.put_twophase_file(xid, Bytes::copy_from_slice(&bytes[..]), ctx)
|
|
.await?;
|
|
debug!("imported twophase file");
|
|
} else if file_path.starts_with("pg_wal") {
|
|
debug!("found wal file in base section. ignore it");
|
|
} else if file_path.starts_with("zenith.signal") {
|
|
// Parse zenith signal file to set correct previous LSN
|
|
let bytes = read_all_bytes(reader).await?;
|
|
// zenith.signal format is "PREV LSN: prev_lsn"
|
|
// TODO write serialization and deserialization in the same place.
|
|
let zenith_signal = std::str::from_utf8(&bytes)?.trim();
|
|
let prev_lsn = match zenith_signal {
|
|
"PREV LSN: none" => Lsn(0),
|
|
"PREV LSN: invalid" => Lsn(0),
|
|
other => {
|
|
let split = other.split(':').collect::<Vec<_>>();
|
|
split[1]
|
|
.trim()
|
|
.parse::<Lsn>()
|
|
.context("can't parse zenith.signal")?
|
|
}
|
|
};
|
|
|
|
// zenith.signal is not necessarily the last file, that we handle
|
|
// but it is ok to call `finish_write()`, because final `modification.commit()`
|
|
// will update lsn once more to the final one.
|
|
let writer = modification.tline.writer();
|
|
writer.finish_write(prev_lsn);
|
|
|
|
debug!("imported zenith signal {}", prev_lsn);
|
|
} else if file_path.starts_with("pg_tblspc") {
|
|
// TODO Backups exported from neon won't have pg_tblspc, but we will need
|
|
// this to import arbitrary postgres databases.
|
|
bail!("Importing pg_tblspc is not implemented");
|
|
} else {
|
|
debug!(
|
|
"ignoring unrecognized file \"{}\" in tar archive",
|
|
file_path.display()
|
|
);
|
|
}
|
|
|
|
Ok(None)
|
|
}
|
|
|
|
async fn read_all_bytes(reader: &mut (impl AsyncRead + Send + Sync + Unpin)) -> Result<Bytes> {
|
|
let mut buf: Vec<u8> = vec![];
|
|
reader.read_to_end(&mut buf).await?;
|
|
Ok(Bytes::copy_from_slice(&buf[..]))
|
|
}
|