mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 14:02:55 +00:00
Upload initdb results to S3 (#5390)
## Problem See #2592 ## Summary of changes Compresses the results of initdb into a .tar.zst file and uploads them to S3, to enable usage in recovery from lsn. Generations should not be involved I think because we do this only once at the very beginning of a timeline. --------- Co-authored-by: Joonas Koivunen <joonas@neon.tech>
This commit is contained in:
5
Cargo.lock
generated
5
Cargo.lock
generated
@@ -193,6 +193,8 @@ dependencies = [
|
||||
"memchr",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
"zstd",
|
||||
"zstd-safe",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -6033,6 +6035,9 @@ dependencies = [
|
||||
"tungstenite",
|
||||
"url",
|
||||
"uuid",
|
||||
"zstd",
|
||||
"zstd-safe",
|
||||
"zstd-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -37,7 +37,7 @@ license = "Apache-2.0"
|
||||
[workspace.dependencies]
|
||||
anyhow = { version = "1.0", features = ["backtrace"] }
|
||||
arc-swap = "1.6"
|
||||
async-compression = { version = "0.4.0", features = ["tokio", "gzip"] }
|
||||
async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] }
|
||||
azure_core = "0.16"
|
||||
azure_identity = "0.16"
|
||||
azure_storage = "0.16"
|
||||
|
||||
@@ -487,8 +487,15 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
|
||||
.copied()
|
||||
.context("Failed to parse postgres version from the argument string")?;
|
||||
|
||||
let timeline_info =
|
||||
pageserver.timeline_create(tenant_id, None, None, None, Some(pg_version))?;
|
||||
let new_timeline_id_opt = parse_timeline_id(create_match)?;
|
||||
|
||||
let timeline_info = pageserver.timeline_create(
|
||||
tenant_id,
|
||||
new_timeline_id_opt,
|
||||
None,
|
||||
None,
|
||||
Some(pg_version),
|
||||
)?;
|
||||
let new_timeline_id = timeline_info.timeline_id;
|
||||
|
||||
let last_record_lsn = timeline_info.last_record_lsn;
|
||||
@@ -1308,6 +1315,7 @@ fn cli() -> Command {
|
||||
.subcommand(Command::new("create")
|
||||
.about("Create a new blank timeline")
|
||||
.arg(tenant_id_arg.clone())
|
||||
.arg(timeline_id_arg.clone())
|
||||
.arg(branch_name_arg.clone())
|
||||
.arg(pg_version_arg.clone())
|
||||
)
|
||||
|
||||
21
libs/utils/scripts/restore_from_wal_initdb.sh
Executable file
21
libs/utils/scripts/restore_from_wal_initdb.sh
Executable file
@@ -0,0 +1,21 @@
|
||||
#!/bin/bash
|
||||
|
||||
# like restore_from_wal.sh, but takes existing initdb.tar.zst
|
||||
|
||||
set -euxo pipefail
|
||||
|
||||
PG_BIN=$1
|
||||
WAL_PATH=$2
|
||||
DATA_DIR=$3
|
||||
PORT=$4
|
||||
echo "port=$PORT" >> "$DATA_DIR"/postgresql.conf
|
||||
echo "shared_preload_libraries='\$libdir/neon_rmgr.so'" >> "$DATA_DIR"/postgresql.conf
|
||||
REDO_POS=0x$("$PG_BIN"/pg_controldata -D "$DATA_DIR" | grep -F "REDO location"| cut -c 42-)
|
||||
declare -i WAL_SIZE=$REDO_POS+114
|
||||
"$PG_BIN"/pg_ctl -D "$DATA_DIR" -l "$DATA_DIR/logfile.log" start
|
||||
"$PG_BIN"/pg_ctl -D "$DATA_DIR" -l "$DATA_DIR/logfile.log" stop -m immediate
|
||||
cp "$DATA_DIR"/pg_wal/000000010000000000000001 .
|
||||
cp "$WAL_PATH"/* "$DATA_DIR"/pg_wal/
|
||||
for partial in "$DATA_DIR"/pg_wal/*.partial ; do mv "$partial" "${partial%.partial}" ; done
|
||||
dd if=000000010000000000000001 of="$DATA_DIR"/pg_wal/000000010000000000000001 bs=$WAL_SIZE count=1 conv=notrunc
|
||||
rm -f 000000010000000000000001
|
||||
@@ -3,18 +3,25 @@
|
||||
//! a neon Timeline.
|
||||
//!
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::pin::Pin;
|
||||
use std::task::{self, Poll};
|
||||
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use async_compression::{tokio::write::ZstdEncoder, zstd::CParameter, Level};
|
||||
use bytes::Bytes;
|
||||
use camino::Utf8Path;
|
||||
use futures::StreamExt;
|
||||
use tokio::io::{AsyncRead, AsyncReadExt};
|
||||
use nix::NixPath;
|
||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
|
||||
use tokio_tar::Archive;
|
||||
use tokio_tar::Builder;
|
||||
use tokio_tar::HeaderMode;
|
||||
use tracing::*;
|
||||
use walkdir::WalkDir;
|
||||
|
||||
use crate::context::RequestContext;
|
||||
use crate::pgdatadir_mapping::*;
|
||||
use crate::tenant::remote_timeline_client::INITDB_PATH;
|
||||
use crate::tenant::Timeline;
|
||||
use crate::walingest::WalIngest;
|
||||
use crate::walrecord::DecodedWALRecord;
|
||||
@@ -33,7 +40,9 @@ use utils::lsn::Lsn;
|
||||
pub fn get_lsn_from_controlfile(path: &Utf8Path) -> Result<Lsn> {
|
||||
// Read control file to extract the LSN
|
||||
let controlfile_path = path.join("global").join("pg_control");
|
||||
let controlfile = ControlFileData::decode(&std::fs::read(controlfile_path)?)?;
|
||||
let controlfile_buf = std::fs::read(&controlfile_path)
|
||||
.with_context(|| format!("reading controlfile: {controlfile_path}"))?;
|
||||
let controlfile = ControlFileData::decode(&controlfile_buf)?;
|
||||
let lsn = controlfile.checkPoint;
|
||||
|
||||
Ok(Lsn(lsn))
|
||||
@@ -618,3 +627,108 @@ async fn read_all_bytes(reader: &mut (impl AsyncRead + Unpin)) -> Result<Bytes>
|
||||
reader.read_to_end(&mut buf).await?;
|
||||
Ok(Bytes::from(buf))
|
||||
}
|
||||
|
||||
/// An in-memory buffer implementing `AsyncWrite`, inserting yields every now and then
|
||||
///
|
||||
/// The number of yields is bounded by above by the number of times poll_write is called,
|
||||
/// so calling it with 8 KB chunks and 8 MB chunks gives the same number of yields in total.
|
||||
/// This is an explicit choice as the `YieldingVec` is meant to give the async executor
|
||||
/// breathing room between units of CPU intensive preparation of buffers to be written.
|
||||
/// Once a write call is issued, the whole buffer has been prepared already, so there is no
|
||||
/// gain in splitting up the memcopy further.
|
||||
struct YieldingVec {
|
||||
yield_budget: usize,
|
||||
// the buffer written into
|
||||
buf: Vec<u8>,
|
||||
}
|
||||
|
||||
impl YieldingVec {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
yield_budget: 0,
|
||||
buf: Vec::new(),
|
||||
}
|
||||
}
|
||||
// Whether we should yield for a read operation of given size
|
||||
fn should_yield(&mut self, add_buf_len: usize) -> bool {
|
||||
// Set this limit to a small value so that we are a
|
||||
// good async citizen and yield repeatedly (but not
|
||||
// too often for many small writes to cause many yields)
|
||||
const YIELD_DIST: usize = 1024;
|
||||
|
||||
let target_buf_len = self.buf.len() + add_buf_len;
|
||||
let ret = self.yield_budget / YIELD_DIST < target_buf_len / YIELD_DIST;
|
||||
if self.yield_budget < target_buf_len {
|
||||
self.yield_budget += add_buf_len;
|
||||
}
|
||||
ret
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncWrite for YieldingVec {
|
||||
fn poll_write(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut task::Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<std::io::Result<usize>> {
|
||||
if self.should_yield(buf.len()) {
|
||||
cx.waker().wake_by_ref();
|
||||
return Poll::Pending;
|
||||
}
|
||||
self.get_mut().buf.extend_from_slice(buf);
|
||||
Poll::Ready(Ok(buf.len()))
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<std::io::Result<()>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn poll_shutdown(
|
||||
self: Pin<&mut Self>,
|
||||
_cx: &mut task::Context<'_>,
|
||||
) -> Poll<std::io::Result<()>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn create_tar_zst(pgdata_path: &Utf8Path) -> Result<Vec<u8>> {
|
||||
let mut paths = Vec::new();
|
||||
for entry in WalkDir::new(pgdata_path) {
|
||||
let entry = entry?;
|
||||
let metadata = entry.metadata().expect("error getting dir entry metadata");
|
||||
// Also allow directories so that we also get empty directories
|
||||
if !(metadata.is_file() || metadata.is_dir()) {
|
||||
continue;
|
||||
}
|
||||
let path = entry.into_path();
|
||||
paths.push(path);
|
||||
}
|
||||
// Do a sort to get a more consistent listing
|
||||
paths.sort_unstable();
|
||||
let zstd = ZstdEncoder::with_quality_and_params(
|
||||
YieldingVec::new(),
|
||||
Level::Default,
|
||||
&[CParameter::enable_long_distance_matching(true)],
|
||||
);
|
||||
let mut builder = Builder::new(zstd);
|
||||
// Use reproducible header mode
|
||||
builder.mode(HeaderMode::Deterministic);
|
||||
for path in paths {
|
||||
let rel_path = path.strip_prefix(pgdata_path)?;
|
||||
if rel_path.is_empty() {
|
||||
// The top directory should not be compressed,
|
||||
// the tar crate doesn't like that
|
||||
continue;
|
||||
}
|
||||
builder.append_path_with_name(&path, rel_path).await?;
|
||||
}
|
||||
let mut zstd = builder.into_inner().await?;
|
||||
zstd.shutdown().await?;
|
||||
let compressed = zstd.into_inner();
|
||||
let compressed_len = compressed.buf.len();
|
||||
const INITDB_TAR_ZST_WARN_LIMIT: usize = 2_000_000;
|
||||
if compressed_len > INITDB_TAR_ZST_WARN_LIMIT {
|
||||
warn!("compressed {INITDB_PATH} size of {compressed_len} is above limit {INITDB_TAR_ZST_WARN_LIMIT}.");
|
||||
}
|
||||
Ok(compressed.buf)
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
//!
|
||||
|
||||
use anyhow::{bail, Context};
|
||||
use bytes::Bytes;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use enumset::EnumSet;
|
||||
use futures::FutureExt;
|
||||
@@ -24,6 +25,7 @@ use tokio::sync::watch;
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::backoff;
|
||||
use utils::completion;
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
use utils::fs_ext;
|
||||
@@ -2895,7 +2897,7 @@ impl Tenant {
|
||||
}
|
||||
|
||||
/// - run initdb to init temporary instance and get bootstrap data
|
||||
/// - after initialization complete, remove the temp dir.
|
||||
/// - after initialization completes, tar up the temp dir and upload it to S3.
|
||||
///
|
||||
/// The caller is responsible for activating the returned timeline.
|
||||
async fn bootstrap_timeline(
|
||||
@@ -2936,6 +2938,30 @@ impl Tenant {
|
||||
let pgdata_path = &initdb_path;
|
||||
let pgdata_lsn = import_datadir::get_lsn_from_controlfile(pgdata_path)?.align();
|
||||
|
||||
// Upload the created data dir to S3
|
||||
if let Some(storage) = &self.remote_storage {
|
||||
let pgdata_zstd = import_datadir::create_tar_zst(pgdata_path).await?;
|
||||
let pgdata_zstd = Bytes::from(pgdata_zstd);
|
||||
backoff::retry(
|
||||
|| async {
|
||||
self::remote_timeline_client::upload_initdb_dir(
|
||||
storage,
|
||||
&self.tenant_id,
|
||||
&timeline_id,
|
||||
pgdata_zstd.clone(),
|
||||
)
|
||||
.await
|
||||
},
|
||||
|_| false,
|
||||
3,
|
||||
u32::MAX,
|
||||
"persist_initdb_tar_zst",
|
||||
// TODO: use a cancellation token (https://github.com/neondatabase/neon/issues/5066)
|
||||
backoff::Cancel::new(CancellationToken::new(), || unreachable!()),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Import the contents of the data directory at the initial checkpoint
|
||||
// LSN, and any WAL after that.
|
||||
// Initdb lsn will be equal to last_record_lsn which will be set after import.
|
||||
|
||||
@@ -190,6 +190,7 @@ use chrono::{NaiveDateTime, Utc};
|
||||
|
||||
use scopeguard::ScopeGuard;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
pub(crate) use upload::upload_initdb_dir;
|
||||
use utils::backoff::{
|
||||
self, exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
|
||||
};
|
||||
@@ -249,6 +250,8 @@ pub(crate) const FAILED_REMOTE_OP_RETRIES: u32 = 10;
|
||||
// retries. Uploads and deletions are retried forever, though.
|
||||
pub(crate) const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3;
|
||||
|
||||
pub(crate) const INITDB_PATH: &str = "initdb.tar.zst";
|
||||
|
||||
pub enum MaybeDeletedIndexPart {
|
||||
IndexPart(IndexPart),
|
||||
Deleted(IndexPart),
|
||||
@@ -1537,6 +1540,13 @@ pub fn remote_layer_path(
|
||||
RemotePath::from_string(&path).expect("Failed to construct path")
|
||||
}
|
||||
|
||||
pub fn remote_initdb_archive_path(tenant_id: &TenantId, timeline_id: &TimelineId) -> RemotePath {
|
||||
RemotePath::from_string(&format!(
|
||||
"tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{INITDB_PATH}"
|
||||
))
|
||||
.expect("Failed to construct path")
|
||||
}
|
||||
|
||||
pub fn remote_index_path(
|
||||
tenant_id: &TenantId,
|
||||
timeline_id: &TimelineId,
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
//! Helper functions to upload files to remote storage with a RemoteStorage
|
||||
|
||||
use anyhow::{bail, Context};
|
||||
use bytes::Bytes;
|
||||
use camino::Utf8Path;
|
||||
use fail::fail_point;
|
||||
use std::io::ErrorKind;
|
||||
@@ -9,7 +10,9 @@ use tokio::fs;
|
||||
use super::Generation;
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
tenant::remote_timeline_client::{index::IndexPart, remote_index_path, remote_path},
|
||||
tenant::remote_timeline_client::{
|
||||
index::IndexPart, remote_index_path, remote_initdb_archive_path, remote_path,
|
||||
},
|
||||
};
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
@@ -104,3 +107,22 @@ pub(super) async fn upload_timeline_layer<'a>(
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Uploads the given `initdb` data to the remote storage.
|
||||
pub(crate) async fn upload_initdb_dir(
|
||||
storage: &GenericRemoteStorage,
|
||||
tenant_id: &TenantId,
|
||||
timeline_id: &TimelineId,
|
||||
initdb_dir: Bytes,
|
||||
) -> anyhow::Result<()> {
|
||||
tracing::trace!("uploading initdb dir");
|
||||
|
||||
let size = initdb_dir.len();
|
||||
let bytes = tokio::io::BufReader::new(std::io::Cursor::new(initdb_dir));
|
||||
|
||||
let remote_path = remote_initdb_archive_path(tenant_id, timeline_id);
|
||||
storage
|
||||
.upload_storage_object(bytes, size, &remote_path)
|
||||
.await
|
||||
.with_context(|| format!("upload initdb dir for '{tenant_id} / {timeline_id}'"))
|
||||
}
|
||||
|
||||
@@ -1224,6 +1224,7 @@ class NeonCli(AbstractNeonCli):
|
||||
self,
|
||||
new_branch_name: str,
|
||||
tenant_id: Optional[TenantId] = None,
|
||||
timeline_id: Optional[TimelineId] = None,
|
||||
) -> TimelineId:
|
||||
cmd = [
|
||||
"timeline",
|
||||
@@ -1236,6 +1237,9 @@ class NeonCli(AbstractNeonCli):
|
||||
self.env.pg_version,
|
||||
]
|
||||
|
||||
if timeline_id is not None:
|
||||
cmd.extend(["--timeline-id", str(timeline_id)])
|
||||
|
||||
res = self.raw_cli(cmd)
|
||||
res.check_returncode()
|
||||
|
||||
|
||||
@@ -114,6 +114,7 @@ def test_timeline_init_break_before_checkpoint(neon_env_builder: NeonEnvBuilder)
|
||||
[
|
||||
".*Failed to process timeline dir contents.*Timeline has no ancestor and no layer files.*",
|
||||
".*Timeline got dropped without initializing, cleaning its files.*",
|
||||
".*Failed to load index_part from remote storage, failed creation?.*",
|
||||
]
|
||||
)
|
||||
|
||||
@@ -143,6 +144,58 @@ def test_timeline_init_break_before_checkpoint(neon_env_builder: NeonEnvBuilder)
|
||||
), "pageserver should clean its temp timeline files on timeline creation failure"
|
||||
|
||||
|
||||
def test_timeline_init_break_before_checkpoint_recreate(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
".*Failed to process timeline dir contents.*Timeline has no ancestor and no layer files.*",
|
||||
".*Timeline got dropped without initializing, cleaning its files.*",
|
||||
".*Failed to load index_part from remote storage, failed creation?.*",
|
||||
]
|
||||
)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
|
||||
timelines_dir = env.pageserver.timeline_dir(tenant_id)
|
||||
old_tenant_timelines = env.neon_cli.list_timelines(tenant_id)
|
||||
initial_timeline_dirs = [d for d in timelines_dir.iterdir()]
|
||||
|
||||
# Some fixed timeline ID (like control plane does)
|
||||
timeline_id = TimelineId("1080243c1f76fe3c5147266663c9860b")
|
||||
|
||||
# Introduce failpoint during timeline init (some intermediate files are on disk), before it's checkpointed.
|
||||
pageserver_http.configure_failpoints(("before-checkpoint-new-timeline", "return"))
|
||||
with pytest.raises(Exception, match="before-checkpoint-new-timeline"):
|
||||
_ = env.neon_cli.create_timeline(
|
||||
"test_timeline_init_break_before_checkpoint", tenant_id, timeline_id
|
||||
)
|
||||
|
||||
# Restart the page server
|
||||
env.pageserver.restart(immediate=True)
|
||||
|
||||
# Creating the timeline didn't finish. The other timelines on tenant should still be present and work normally.
|
||||
new_tenant_timelines = env.neon_cli.list_timelines(tenant_id)
|
||||
assert (
|
||||
new_tenant_timelines == old_tenant_timelines
|
||||
), f"Pageserver after restart should ignore non-initialized timelines for tenant {tenant_id}"
|
||||
|
||||
timeline_dirs = [d for d in timelines_dir.iterdir()]
|
||||
assert (
|
||||
timeline_dirs == initial_timeline_dirs
|
||||
), "pageserver should clean its temp timeline files on timeline creation failure"
|
||||
|
||||
# Disable the failpoint again
|
||||
pageserver_http.configure_failpoints(("before-checkpoint-new-timeline", "off"))
|
||||
# creating the branch should have worked now
|
||||
new_timeline_id = env.neon_cli.create_timeline(
|
||||
"test_timeline_init_break_before_checkpoint", tenant_id, timeline_id
|
||||
)
|
||||
|
||||
assert timeline_id == new_timeline_id
|
||||
|
||||
|
||||
def test_timeline_create_break_after_uninit_mark(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
@@ -1,14 +1,19 @@
|
||||
import sys
|
||||
import tarfile
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
import zstandard
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
VanillaPostgres,
|
||||
)
|
||||
from fixtures.port_distributor import PortDistributor
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
from fixtures.remote_storage import LocalFsStorage
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
@@ -53,3 +58,70 @@ def test_wal_restore(
|
||||
)
|
||||
restored.start()
|
||||
assert restored.safe_psql("select count(*) from t", user="cloud_admin") == [(300000,)]
|
||||
|
||||
|
||||
def decompress_zstd(
|
||||
input_file_name: Path,
|
||||
output_dir: Path,
|
||||
):
|
||||
log.info(f"decompressing zstd to: {output_dir}")
|
||||
output_dir.mkdir(mode=0o750, parents=True, exist_ok=True)
|
||||
with tempfile.TemporaryFile(suffix=".tar") as temp:
|
||||
decompressor = zstandard.ZstdDecompressor()
|
||||
with open(input_file_name, "rb") as input_file:
|
||||
decompressor.copy_stream(input_file, temp)
|
||||
temp.seek(0)
|
||||
with tarfile.open(fileobj=temp) as tfile:
|
||||
tfile.extractall(path=output_dir)
|
||||
|
||||
|
||||
def test_wal_restore_initdb(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
pg_bin: PgBin,
|
||||
test_output_dir: Path,
|
||||
port_distributor: PortDistributor,
|
||||
base_dir: Path,
|
||||
pg_distrib_dir: Path,
|
||||
):
|
||||
env = neon_env_builder.init_start()
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
endpoint.safe_psql("create table t as select generate_series(1,300000)")
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
original_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
|
||||
env.pageserver.stop()
|
||||
port = port_distributor.get_port()
|
||||
data_dir = test_output_dir / "pgsql.restored"
|
||||
|
||||
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
|
||||
|
||||
initdb_zst_path = (
|
||||
env.pageserver_remote_storage.timeline_path(tenant_id, timeline_id) / "initdb.tar.zst"
|
||||
)
|
||||
|
||||
decompress_zstd(initdb_zst_path, data_dir)
|
||||
with VanillaPostgres(
|
||||
data_dir, PgBin(test_output_dir, env.pg_distrib_dir, env.pg_version), port, init=False
|
||||
) as restored:
|
||||
pg_bin.run_capture(
|
||||
[
|
||||
str(base_dir / "libs" / "utils" / "scripts" / "restore_from_wal_initdb.sh"),
|
||||
str(pg_distrib_dir / f"v{env.pg_version}/bin"),
|
||||
str(
|
||||
test_output_dir
|
||||
/ "repo"
|
||||
/ "safekeepers"
|
||||
/ "sk1"
|
||||
/ str(tenant_id)
|
||||
/ str(timeline_id)
|
||||
),
|
||||
str(data_dir),
|
||||
str(port),
|
||||
]
|
||||
)
|
||||
restored.start()
|
||||
restored_lsn = Lsn(
|
||||
restored.safe_psql("SELECT pg_current_wal_flush_lsn()", user="cloud_admin")[0][0]
|
||||
)
|
||||
log.info(f"original lsn: {original_lsn}, restored lsn: {restored_lsn}")
|
||||
assert restored.safe_psql("select count(*) from t", user="cloud_admin") == [(300000,)]
|
||||
|
||||
@@ -68,6 +68,9 @@ tracing-core = { version = "0.1" }
|
||||
tungstenite = { version = "0.20" }
|
||||
url = { version = "2", features = ["serde"] }
|
||||
uuid = { version = "1", features = ["serde", "v4"] }
|
||||
zstd = { version = "0.12" }
|
||||
zstd-safe = { version = "6", default-features = false, features = ["arrays", "legacy", "std", "zdict_builder"] }
|
||||
zstd-sys = { version = "2", default-features = false, features = ["legacy", "std", "zdict_builder"] }
|
||||
|
||||
[build-dependencies]
|
||||
anyhow = { version = "1", features = ["backtrace"] }
|
||||
|
||||
Reference in New Issue
Block a user