Compare commits

..

9 Commits

Author SHA1 Message Date
Arthur Petukhovsky
686d199acf Run yapf 2021-12-09 12:30:20 +03:00
Arthur Petukhovsky
60783b986d Add epoch switch 2021-12-09 12:16:07 +03:00
Arthur Petukhovsky
1de822f1b0 Create test_sync_safekeepers_old_term_ahead 2021-12-09 12:13:02 +03:00
Arseny Sher
37c85d5fd9 Switch safekeeper from log to tracing logging.
Add context to wal acceptor and wal sender threads showing timeline id and
unique id differentiating them.
2021-12-09 06:57:46 +03:00
nikitashamgunov
6094236171 Update README.md 2021-12-08 11:55:54 -08:00
anastasia
bb5aba42eb bump vendor/postgres to use correct backpressure commit 2021-12-08 18:57:18 +03:00
Arthur Petukhovsky
450fb9eafe Don't persist control file without sync (#966) 2021-12-07 15:02:44 +03:00
Dmitry Rodionov
557e3024cd Forward pageserver connection string from compute to safekeeper
This is needed for implementation of tenant rebalancing. With this
change safekeeper becomes aware of which pageserver is supposed to be
used for replication from this particular compute.
2021-12-06 21:28:49 +03:00
Arseny Sher
bd34d7ecfc Bump safekeeper control file version and allow reading the previous one.
Should have been a part of cba4da3f4d to provide upgrade for previously
existing clusters. Separates version independent header (magic + version) out of
SafeKeeperState to choose what to deserialize.
2021-12-06 19:47:55 +03:00
22 changed files with 421 additions and 272 deletions

2
Cargo.lock generated
View File

@@ -2349,7 +2349,6 @@ dependencies = [
"humantime",
"hyper",
"lazy_static",
"log",
"pageserver",
"postgres",
"postgres-protocol",
@@ -2363,6 +2362,7 @@ dependencies = [
"tempfile",
"tokio",
"tokio-stream",
"tracing",
"walkdir",
"workspace_hack",
"zenith_metrics",

View File

@@ -1,6 +1,6 @@
# Zenith
Zenith substitutes PostgreSQL storage layer and redistributes data across a cluster of nodes
Zenith is an serverless open source alternative to AWS Aurora Postgres. It separates storage and compute and substitutes PostgreSQL storage layer by redistributes data across a cluster of nodes.
## Architecture overview

View File

@@ -199,17 +199,24 @@ impl PostgresNode {
})
}
fn sync_safekeepers(&self) -> Result<Lsn> {
fn sync_safekeepers(&self, auth_token: &Option<String>) -> Result<Lsn> {
let pg_path = self.env.pg_bin_dir().join("postgres");
let sync_handle = Command::new(pg_path)
.arg("--sync-safekeepers")
let mut cmd = Command::new(&pg_path);
cmd.arg("--sync-safekeepers")
.env_clear()
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("PGDATA", self.pgdata().to_str().unwrap())
.stdout(Stdio::piped())
// Comment this to avoid capturing stderr (useful if command hangs)
.stderr(Stdio::piped())
.stderr(Stdio::piped());
if let Some(token) = auth_token {
cmd.env("ZENITH_AUTH_TOKEN", token);
}
let sync_handle = cmd
.spawn()
.expect("postgres --sync-safekeepers failed to start");
@@ -287,15 +294,14 @@ impl PostgresNode {
conf.append("max_replication_slots", "10");
conf.append("hot_standby", "on");
conf.append("shared_buffers", "1MB");
conf.append("max_wal_size", "100GB");
conf.append("fsync", "off");
conf.append("max_connections", "100");
conf.append("wal_level", "replica");
// wal_sender_timeout is the maximum time to wait for WAL replication.
// It also defines how often the walreciever will send a feedback message to the wal sender.
//conf.append("wal_sender_timeout", "5s");
//conf.append("max_replication_flush_lag", "160MB");
//conf.append("max_replication_apply_lag", "1500MB");
conf.append("wal_sender_timeout", "5s");
conf.append("max_replication_flush_lag", "160MB");
conf.append("max_replication_apply_lag", "1500MB");
conf.append("listen_addresses", &self.address.ip().to_string());
conf.append("port", &self.address.port().to_string());
@@ -320,8 +326,11 @@ impl PostgresNode {
} else {
""
};
format!("host={} port={} password={}", host, port, password)
// NOTE avoiding spaces in connection string, because it is less error prone if we forward it somewhere.
// Also note that not all parameters are supported here. Because in compute we substitute $ZENITH_AUTH_TOKEN
// We parse this string and build it back with token from env var, and for simplicity rebuild
// uses only needed variables namely host, port, user, password.
format!("postgresql://no_user:{}@{}:{}", password, host, port)
};
conf.append("shared_preload_libraries", "zenith");
conf.append_line("");
@@ -359,7 +368,7 @@ impl PostgresNode {
Ok(())
}
fn load_basebackup(&self) -> Result<()> {
fn load_basebackup(&self, auth_token: &Option<String>) -> Result<()> {
let backup_lsn = if let Some(lsn) = self.lsn {
Some(lsn)
} else if self.uses_wal_proposer {
@@ -367,7 +376,7 @@ impl PostgresNode {
// latest data from the pageserver. That is a bit clumsy but whole bootstrap
// procedure evolves quite actively right now, so let's think about it again
// when things would be more stable (TODO).
let lsn = self.sync_safekeepers()?;
let lsn = self.sync_safekeepers(auth_token)?;
if lsn == Lsn(0) {
None
} else {
@@ -418,7 +427,6 @@ impl PostgresNode {
.env_clear()
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap());
if let Some(token) = auth_token {
cmd.env("ZENITH_AUTH_TOKEN", token);
}
@@ -452,7 +460,7 @@ impl PostgresNode {
fs::write(&postgresql_conf_path, postgresql_conf)?;
// 3. Load basebackup
self.load_basebackup()?;
self.load_basebackup(auth_token)?;
if self.lsn.is_some() {
File::create(self.pgdata().join("standby.signal"))?;

View File

@@ -15,13 +15,11 @@ use reqwest::blocking::{Client, RequestBuilder, Response};
use reqwest::{IntoUrl, Method};
use thiserror::Error;
use zenith_utils::http::error::HttpErrorBody;
use zenith_utils::postgres_backend::AuthType;
use crate::local_env::{LocalEnv, SafekeeperConf};
use crate::read_pidfile;
use crate::storage::PageServerNode;
use zenith_utils::connstring::connection_address;
use zenith_utils::connstring::connection_host_port;
#[derive(Error, Debug)]
pub enum SafekeeperHttpError {
@@ -116,17 +114,6 @@ impl SafekeeperNode {
);
io::stdout().flush().unwrap();
// Configure connection to page server
//
// FIXME: We extract the host and port from the connection string instead of using
// the connection string directly, because the 'safekeeper' binary expects
// host:port format. That's a bit silly when we already have a full libpq connection
// string at hand.
let pageserver_conn = {
let (host, port) = connection_host_port(&self.pageserver.pg_connection_config);
format!("{}:{}", host, port)
};
let listen_pg = format!("localhost:{}", self.conf.pg_port);
let listen_http = format!("localhost:{}", self.conf.http_port);
@@ -134,7 +121,6 @@ impl SafekeeperNode {
cmd.args(&["-D", self.datadir_path().to_str().unwrap()])
.args(&["--listen-pg", &listen_pg])
.args(&["--listen-http", &listen_http])
.args(&["--pageserver", &pageserver_conn])
.args(&["--recall", "1 second"])
.arg("--daemonize")
.env_clear()
@@ -143,10 +129,6 @@ impl SafekeeperNode {
cmd.arg("--no-sync");
}
if self.env.pageserver.auth_type == AuthType::ZenithJWT {
cmd.env("PAGESERVER_AUTH_TOKEN", &self.env.pageserver.auth_token);
}
let var = "LLVM_PROFILE_FILE";
if let Some(val) = std::env::var_os(var) {
cmd.env(var, val);

View File

@@ -39,10 +39,10 @@ pub mod defaults {
// would be more appropriate. But a low value forces the code to be exercised more,
// which is good for now to trigger bugs.
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024;
pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(10);
pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(1);
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(10);
pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100);
pub const DEFAULT_SUPERUSER: &str = "zenith_admin";
pub const DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNC: usize = 100;

View File

@@ -106,7 +106,7 @@ fn init_repo(conf: &'static PageServerConf, tenant_id: ZTenantId) {
conf,
Arc::new(walredo_mgr),
tenant_id,
false,
true,
));
let mut m = access_tenants();

View File

@@ -32,7 +32,6 @@ use std::os::unix::io::AsRawFd;
use std::path::PathBuf;
use std::process::Stdio;
use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Mutex;
use std::time::Duration;
use std::time::Instant;
@@ -54,8 +53,6 @@ use postgres_ffi::nonrelfile_utils::transaction_id_set_status;
use postgres_ffi::pg_constants;
use postgres_ffi::XLogRecord;
const N_WAL_REDO_PROCS: usize = 1;
///
/// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster.
///
@@ -142,8 +139,7 @@ pub struct PostgresRedoManager {
tenantid: ZTenantId,
conf: &'static PageServerConf,
round_robin: AtomicUsize,
processes: [Mutex<Option<PostgresRedoProcess>>; N_WAL_REDO_PROCS],
process: Mutex<Option<PostgresRedoProcess>>,
}
#[derive(Debug)]
@@ -213,13 +209,12 @@ impl WalRedoManager for PostgresRedoManager {
end_time = Instant::now();
WAL_REDO_TIME.observe(end_time.duration_since(start_time).as_secs_f64());
} else {
let rr = self.round_robin.fetch_add(1, Ordering::Relaxed) % N_WAL_REDO_PROCS;
let mut process_guard = self.processes[rr].lock().unwrap();
let mut process_guard = self.process.lock().unwrap();
let lock_time = Instant::now();
// launch the WAL redo process on first use
if process_guard.is_none() {
let p = PostgresRedoProcess::launch(self.conf, &self.tenantid, rr)?;
let p = PostgresRedoProcess::launch(self.conf, &self.tenantid)?;
*process_guard = Some(p);
}
let process = process_guard.as_mut().unwrap();
@@ -251,8 +246,7 @@ impl PostgresRedoManager {
PostgresRedoManager {
tenantid,
conf,
round_robin: AtomicUsize::new(0),
processes: [(); N_WAL_REDO_PROCS].map(|_| Mutex::new(None)),
process: Mutex::new(None),
}
}
@@ -478,17 +472,11 @@ impl PostgresRedoProcess {
//
// Start postgres binary in special WAL redo mode.
//
fn launch(
conf: &PageServerConf,
tenantid: &ZTenantId,
id: usize,
) -> Result<PostgresRedoProcess, Error> {
fn launch(conf: &PageServerConf, tenantid: &ZTenantId) -> Result<PostgresRedoProcess, Error> {
// FIXME: We need a dummy Postgres cluster to run the process in. Currently, we
// just create one with constant name. That fails if you try to launch more than
// one WAL redo manager concurrently.
let datadir = conf
.tenant_path(tenantid)
.join(format! {"wal-redo-datadir-{}", id});
let datadir = conf.tenant_path(tenantid).join("wal-redo-datadir");
// Create empty data directory for wal-redo postgres, deleting old one first.
if datadir.exists() {

View File

@@ -340,10 +340,11 @@ class ProposerPostgres:
"synchronous_standby_names = 'walproposer'\n",
f"zenith.zenith_timeline = '{self.timeline_id}'\n",
f"zenith.zenith_tenant = '{self.tenant_id}'\n",
f"zenith.page_server_connstring = ''\n",
f"wal_acceptors = '{wal_acceptors}'\n",
])
def sync_safekeepers(self) -> str:
def sync_safekeepers(self, timeout=60) -> str:
"""
Run 'postgres --sync-safekeepers'.
Returns execution result, which is commit_lsn after sync.
@@ -354,7 +355,7 @@ class ProposerPostgres:
"PGDATA": self.pg_data_dir_path(),
}
basepath = self.pg_bin.run_capture(command, env)
basepath = self.pg_bin.run_capture(command, env, timeout=timeout)
stdout_filename = basepath + '.stdout'
with open(stdout_filename, 'r') as stdout_f:
@@ -437,3 +438,90 @@ def test_timeline_status(zenith_env_builder: ZenithEnvBuilder):
epoch_after_reboot = wa_http_cli.timeline_status(tenant_id, timeline_id).acceptor_epoch
assert epoch_after_reboot > epoch
class WalAppender:
"""Helper for appending WAL to safekeepers, keeps track of last inserted lsn."""
# 0/16B9188 is good lsn to start with, it's valid and not in the segment start, nor in zero segment
def __init__(self,
acceptors,
tenant_id,
timeline_id,
epoch_start_lsn=0x16B9188,
begin_lsn=0x16B9188):
self.acceptors = acceptors
self.epoch_start_lsn = epoch_start_lsn
self.begin_lsn = begin_lsn
self.tenant_id = tenant_id
self.timeline_id = timeline_id
self.flush_lsns = dict()
def append(self, i, term, lm_message="message", lm_prefix="", set_commit_lsn=False):
"""Append new logical message to i'th safekeeper."""
lsn = self.flush_lsns.get(i, self.begin_lsn)
req = {
"lm_prefix": lm_prefix,
"lm_message": lm_message,
"set_commit_lsn": set_commit_lsn,
"term": term,
"begin_lsn": lsn,
"epoch_start_lsn": self.epoch_start_lsn,
"truncate_lsn": self.begin_lsn,
}
res = self.acceptors[i].append_logical_message(
self.tenant_id,
self.timeline_id,
req,
)
end_lsn = res["inserted_wal"]["end_lsn"]
self.flush_lsns[i] = end_lsn
return res
def debug_print(self):
"""Print lsn for each acceptor."""
for i, lsn in self.flush_lsns.items():
print(f'end_lsn for acceptors[{i}] = {lsn_to_hex(lsn)}')
# one safekeeper with old term and a lot of non-commited wal
def test_sync_safekeepers_old_term_ahead(repo_dir: str,
pg_bin: PgBin,
wa_factory: WalAcceptorFactory):
wa_factory.start_n_new(3)
timeline_id = uuid.uuid4().hex
tenant_id = uuid.uuid4().hex
# write config for proposer
pgdata_dir = os.path.join(repo_dir, "proposer_pgdata")
pg = ProposerPostgres(pgdata_dir, pg_bin, timeline_id, tenant_id)
pg.create_dir_config(wa_factory.get_connstrs())
# append wal to safekeepers
appender = WalAppender(wa_factory.instances, tenant_id, timeline_id)
appender.append(0, term=1, lm_message="msg1")
appender.append(1, term=1, lm_message="msg1")
appender.append(2, term=1, lm_message="msg1")
appender.append(2, term=1, lm_message="msg1")
appender.append(2, term=1, lm_message="msg1")
appender.append(2, term=1, lm_message="msg1")
appender.append(2, term=1, lm_message="msg1")
new_epoch = appender.flush_lsns[0]
appender.epoch_start_lsn = new_epoch
appender.append(0, term=2, lm_message="msg2")
appender.append(1, term=2, lm_message="msg2")
# run sync safekeepers
# FIXME: fails with timeout
lsn_after_sync = pg.sync_safekeepers()
print(f"lsn after sync = {lsn_after_sync}")
appender.debug_print()

View File

@@ -16,7 +16,7 @@ routerify = "2"
fs2 = "0.4.3"
lazy_static = "1.4.0"
serde_json = "1"
log = "0.4.14"
tracing = "0.1.27"
clap = "2.33.0"
daemonize = "0.4.1"
rust-s3 = { version = "0.27.0-rc4", features = ["no-verify-ssl"] }

View File

@@ -5,9 +5,9 @@ use anyhow::Result;
use clap::{App, Arg};
use const_format::formatcp;
use daemonize::Daemonize;
use log::*;
use std::path::{Path, PathBuf};
use std::thread;
use tracing::*;
use walkeeper::defaults::{DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR};
use walkeeper::http;
use walkeeper::s3_offload;
@@ -44,6 +44,9 @@ fn main() -> Result<()> {
.takes_value(true)
.help(formatcp!("http endpoint address for metrics on ip:port (default: {DEFAULT_HTTP_LISTEN_ADDR})")),
)
// FIXME this argument is no longer needed since pageserver address is forwarded from compute.
// However because this argument is in use by console's e2e tests lets keep it for now and remove separately.
// So currently it is a noop.
.arg(
Arg::with_name("pageserver")
.short("p")
@@ -101,10 +104,6 @@ fn main() -> Result<()> {
conf.listen_http_addr = addr.to_owned();
}
if let Some(addr) = arg_matches.value_of("pageserver") {
conf.pageserver_addr = Some(addr.to_owned());
}
if let Some(ttl) = arg_matches.value_of("ttl") {
conf.ttl = Some(humantime::parse_duration(ttl)?);
}

View File

@@ -9,8 +9,8 @@
use anyhow::{anyhow, Result};
use bytes::{BufMut, Bytes, BytesMut};
use crc32c::crc32c_append;
use log::*;
use serde::{Deserialize, Serialize};
use tracing::*;
use crate::safekeeper::{AcceptorProposerMessage, AppendResponse};
use crate::safekeeper::{

View File

@@ -2,7 +2,6 @@
use std::path::PathBuf;
use std::time::Duration;
use std::env;
use zenith_utils::zid::ZTimelineId;
pub mod http;
@@ -13,6 +12,7 @@ pub mod s3_offload;
pub mod safekeeper;
pub mod send_wal;
pub mod timeline;
pub mod upgrade;
pub mod wal_service;
pub mod defaults {
@@ -39,9 +39,6 @@ pub struct SafeKeeperConf {
pub no_sync: bool,
pub listen_pg_addr: String,
pub listen_http_addr: String,
pub pageserver_addr: Option<String>,
// TODO (create issue) this is temporary, until protocol between PG<->SK<->PS rework
pub pageserver_auth_token: Option<String>,
pub ttl: Option<Duration>,
pub recall_period: Option<Duration>,
}
@@ -61,12 +58,10 @@ impl Default for SafeKeeperConf {
workdir: PathBuf::from("./"),
daemonize: false,
no_sync: false,
pageserver_addr: None,
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
listen_http_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
ttl: None,
recall_period: None,
pageserver_auth_token: env::var("PAGESERVER_AUTH_TOKEN").ok(),
}
}
}

View File

@@ -5,8 +5,8 @@
use anyhow::{bail, Context, Result};
use bytes::Bytes;
use bytes::BytesMut;
use log::*;
use postgres::{Client, Config, NoTls};
use tracing::*;
use std::net::SocketAddr;
use std::thread;
@@ -28,20 +28,22 @@ pub struct ReceiveWalConn<'pg> {
pg_backend: &'pg mut PostgresBackend,
/// The cached result of `pg_backend.socket().peer_addr()` (roughly)
peer_addr: SocketAddr,
/// Pageserver connection string forwarded from compute
/// NOTE that it is allowed to operate without a pageserver.
/// So if compute has no pageserver configured do not use it.
pageserver_connstr: Option<String>,
}
///
/// Periodically request pageserver to call back.
/// If pageserver already has replication channel, it will just ignore this request
///
fn request_callback(conf: SafeKeeperConf, timelineid: ZTimelineId, tenantid: ZTenantId) {
let ps_addr = conf.pageserver_addr.unwrap();
let ps_connstr = format!(
"postgresql://no_user:{}@{}/no_db",
&conf.pageserver_auth_token.unwrap_or_default(),
ps_addr
);
fn request_callback(
conf: SafeKeeperConf,
pageserver_connstr: String,
timelineid: ZTimelineId,
tenantid: ZTenantId,
) {
// use Config parsing because SockAddr parsing doesnt allow to use host names instead of ip addresses
let me_connstr = format!("postgresql://no_user@{}/no_db", conf.listen_pg_addr);
let me_conf: Config = me_connstr.parse().unwrap();
@@ -54,15 +56,18 @@ fn request_callback(conf: SafeKeeperConf, timelineid: ZTimelineId, tenantid: ZTe
loop {
info!(
"requesting page server to connect to us: start {} {}",
ps_connstr, callme
pageserver_connstr, callme
);
match Client::connect(&ps_connstr, NoTls) {
match Client::connect(&pageserver_connstr, NoTls) {
Ok(mut client) => {
if let Err(e) = client.simple_query(&callme) {
error!("Failed to send callme request to pageserver: {}", e);
}
}
Err(e) => error!("Failed to connect to pageserver {}: {}", &ps_connstr, e),
Err(e) => error!(
"Failed to connect to pageserver {}: {}",
&pageserver_connstr, e
),
}
if let Some(period) = conf.recall_period {
@@ -74,11 +79,15 @@ fn request_callback(conf: SafeKeeperConf, timelineid: ZTimelineId, tenantid: ZTe
}
impl<'pg> ReceiveWalConn<'pg> {
pub fn new(pg: &'pg mut PostgresBackend) -> ReceiveWalConn<'pg> {
pub fn new(
pg: &'pg mut PostgresBackend,
pageserver_connstr: Option<String>,
) -> ReceiveWalConn<'pg> {
let peer_addr = *pg.get_peer_addr();
ReceiveWalConn {
pg_backend: pg,
peer_addr,
pageserver_connstr,
}
}
@@ -107,6 +116,8 @@ impl<'pg> ReceiveWalConn<'pg> {
/// Receive WAL from wal_proposer
pub fn run(&mut self, swh: &mut SendWalHandler) -> Result<()> {
let _enter = info_span!("WAL acceptor", timeline = %swh.timelineid.unwrap()).entered();
// Notify the libpq client that it's allowed to send `CopyData` messages
self.pg_backend
.write_message(&BeMessage::CopyBothResponse)?;
@@ -127,17 +138,17 @@ impl<'pg> ReceiveWalConn<'pg> {
_ => bail!("unexpected message {:?} instead of greeting", msg),
}
// if requested, ask pageserver to fetch wal from us
// xxx: this place seems not really fitting
if swh.conf.pageserver_addr.is_some() {
// Need to establish replication channel with page server.
// Add far as replication in postgres is initiated by receiver, we should use callme mechanism
// Need to establish replication channel with page server.
// Add far as replication in postgres is initiated by receiver, we should use callme mechanism
if let Some(ref pageserver_connstr) = self.pageserver_connstr {
let conf = swh.conf.clone();
let timelineid = swh.timeline.get().timelineid;
// copy to safely move to a thread
let pageserver_connstr = pageserver_connstr.to_owned();
let _ = thread::Builder::new()
.name("request_callback thread".into())
.spawn(move || {
request_callback(conf, timelineid, tenant_id);
request_callback(conf, pageserver_connstr, timelineid, tenant_id);
})
.unwrap();
}

View File

@@ -5,7 +5,6 @@ use crate::send_wal::SendWalHandler;
use crate::timeline::{ReplicaState, Timeline, TimelineTools};
use anyhow::{anyhow, Context, Result};
use bytes::Bytes;
use log::*;
use postgres_ffi::xlog_utils::{
get_current_timestamp, TimestampTz, XLogFileName, MAX_SEND_SIZE, PG_TLI,
};
@@ -20,6 +19,7 @@ use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration;
use std::{str, thread};
use tracing::*;
use zenith_utils::bin_ser::BeSer;
use zenith_utils::lsn::Lsn;
use zenith_utils::postgres_backend::PostgresBackend;
@@ -177,6 +177,8 @@ impl ReplicationConn {
pgb: &mut PostgresBackend,
cmd: &Bytes,
) -> Result<()> {
let _enter = info_span!("WAL sender", timeline = %swh.timelineid.unwrap()).entered();
// spawn the background thread which receives HotStandbyFeedback messages.
let bg_timeline = Arc::clone(swh.timeline.get());
let bg_stream_in = self.stream_in.take().unwrap();

View File

@@ -3,7 +3,6 @@
//
use anyhow::Result;
use log::*;
use postgres_ffi::xlog_utils::*;
use s3::bucket::Bucket;
use s3::creds::Credentials;
@@ -16,6 +15,7 @@ use std::path::Path;
use std::time::SystemTime;
use tokio::runtime;
use tokio::time::sleep;
use tracing::*;
use walkdir::WalkDir;
use crate::SafeKeeperConf;

View File

@@ -8,13 +8,13 @@ use bytes::Buf;
use bytes::BufMut;
use bytes::Bytes;
use bytes::BytesMut;
use log::*;
use pageserver::waldecoder::WalStreamDecoder;
use postgres_ffi::xlog_utils::TimeLineID;
use serde::{Deserialize, Serialize};
use std::cmp::min;
use std::fmt;
use std::io::Read;
use tracing::*;
use lazy_static::lazy_static;
@@ -30,7 +30,7 @@ use zenith_utils::pq_proto::SystemId;
use zenith_utils::zid::{ZTenantId, ZTimelineId};
pub const SK_MAGIC: u32 = 0xcafeceefu32;
pub const SK_FORMAT_VERSION: u32 = 1;
pub const SK_FORMAT_VERSION: u32 = 2;
const SK_PROTOCOL_VERSION: u32 = 1;
const UNKNOWN_SERVER_VERSION: u32 = 0;
@@ -102,7 +102,7 @@ impl fmt::Debug for TermHistory {
}
/// Unique id of proposer. Not needed for correctness, used for monitoring.
type PgUuid = [u8; 16];
pub type PgUuid = [u8; 16];
/// Persistent consensus state of the acceptor.
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -140,12 +140,9 @@ pub struct ServerInfo {
}
/// Persistent information stored on safekeeper node
/// On disk data is prefixed by magic and format version and followed by checksum.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SafeKeeperState {
/// magic for verifying content the control file
pub magic: u32,
/// safekeeper format version
pub format_version: u32,
/// persistent acceptor state
pub acceptor_state: AcceptorState,
/// information about server
@@ -166,8 +163,6 @@ pub struct SafeKeeperState {
impl SafeKeeperState {
pub fn new() -> SafeKeeperState {
SafeKeeperState {
magic: SK_MAGIC,
format_version: SK_FORMAT_VERSION,
acceptor_state: AcceptorState {
term: 0,
term_history: TermHistory::empty(),
@@ -414,8 +409,8 @@ impl AcceptorProposerMessage {
}
pub trait Storage {
/// Persist safekeeper state on disk, optionally syncing it.
fn persist(&mut self, s: &SafeKeeperState, sync: bool) -> Result<()>;
/// Persist safekeeper state on disk.
fn persist(&mut self, s: &SafeKeeperState) -> Result<()>;
/// Write piece of wal in buf to disk and sync it.
fn write_wal(&mut self, server: &ServerInfo, startpos: Lsn, buf: &[u8]) -> Result<()>;
// Truncate WAL at specified LSN
@@ -568,7 +563,7 @@ where
self.s.server.ztli = msg.ztli;
self.s.server.wal_seg_size = msg.wal_seg_size;
self.storage
.persist(&self.s, true)
.persist(&self.s)
.with_context(|| "failed to persist shared state")?;
self.metrics = SafeKeeperMetrics::new(self.s.server.ztli);
@@ -598,7 +593,7 @@ where
if self.s.acceptor_state.term < msg.term {
self.s.acceptor_state.term = msg.term;
// persist vote before sending it out
self.storage.persist(&self.s, true)?;
self.storage.persist(&self.s)?;
resp.term = self.s.acceptor_state.term;
resp.vote_given = true as u64;
}
@@ -610,7 +605,7 @@ where
fn bump_if_higher(&mut self, term: Term) -> Result<()> {
if self.s.acceptor_state.term < term {
self.s.acceptor_state.term = term;
self.storage.persist(&self.s, true)?;
self.storage.persist(&self.s)?;
}
Ok(())
}
@@ -649,7 +644,7 @@ where
self.flush_lsn = msg.start_streaming_at;
// and now adopt term history from proposer
self.s.acceptor_state.term_history = msg.term_history.clone();
self.storage.persist(&self.s, true)?;
self.storage.persist(&self.s)?;
info!("start receiving WAL since {:?}", msg.start_streaming_at);
@@ -753,7 +748,10 @@ where
self.s.commit_lsn = self.commit_lsn;
self.s.truncate_lsn = self.truncate_lsn;
}
self.storage.persist(&self.s, sync_control_file)?;
if sync_control_file {
self.storage.persist(&self.s)?;
}
let resp = self.append_response();
info!(
@@ -778,7 +776,7 @@ mod tests {
}
impl Storage for InMemoryStorage {
fn persist(&mut self, s: &SafeKeeperState, _sync: bool) -> Result<()> {
fn persist(&mut self, s: &SafeKeeperState) -> Result<()> {
self.persisted_state = s.clone();
Ok(())
}

View File

@@ -46,6 +46,7 @@ impl postgres_backend::Handler for SendWalHandler {
if let Some(app_name) = sm.params.get("application_name") {
self.appname = Some(app_name.clone());
}
Ok(())
}
@@ -75,7 +76,17 @@ impl postgres_backend::Handler for SendWalHandler {
} else if query_string.starts_with(b"START_REPLICATION") {
ReplicationConn::new(pgb).run(self, pgb, &query_string)?;
} else if query_string.starts_with(b"START_WAL_PUSH") {
ReceiveWalConn::new(pgb)
// TODO: this repeats query decoding logic from page_service so it is probably
// a good idea to refactor it in pgbackend and pass string to process query instead of bytes
let decoded_query_string = match query_string.last() {
Some(0) => std::str::from_utf8(&query_string[..query_string.len() - 1])?,
_ => std::str::from_utf8(&query_string)?,
};
let pageserver_connstr = decoded_query_string
.split_whitespace()
.nth(1)
.map(|s| s.to_owned());
ReceiveWalConn::new(pgb, pageserver_connstr)
.run(self)
.with_context(|| "failed to run ReceiveWalConn")?;
} else if query_string.starts_with(b"JSON_CTRL") {

View File

@@ -2,9 +2,9 @@
//! persistence and support for interaction between sending and receiving wal.
use anyhow::{anyhow, bail, ensure, Context, Result};
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use fs2::FileExt;
use lazy_static::lazy_static;
use log::*;
use postgres_ffi::xlog_utils::{find_end_of_wal, PG_TLI};
use std::cmp::{max, min};
use std::collections::HashMap;
@@ -13,6 +13,7 @@ use std::io::{Read, Seek, SeekFrom, Write};
use std::path::PathBuf;
use std::sync::{Arc, Condvar, Mutex};
use std::time::Duration;
use tracing::*;
use zenith_metrics::{register_histogram_vec, Histogram, HistogramVec, DISK_WRITE_SECONDS_BUCKETS};
use zenith_utils::bin_ser::LeSer;
use zenith_utils::lsn::Lsn;
@@ -23,6 +24,7 @@ use crate::safekeeper::{
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState, ServerInfo,
Storage, SK_FORMAT_VERSION, SK_MAGIC,
};
use crate::upgrade::upgrade_control_file;
use crate::SafeKeeperConf;
use postgres_ffi::xlog_utils::{XLogFileName, XLOG_BLCKSZ};
use std::convert::TryInto;
@@ -34,7 +36,7 @@ const CONTROL_FILE_NAME_PARTIAL: &str = "safekeeper.control.partial";
// dedicated lockfile to prevent running several safekeepers on the same data
const LOCK_FILE_NAME: &str = "safekeeper.lock";
const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
const CHECKSUM_SIZE: usize = std::mem::size_of::<u32>();
pub const CHECKSUM_SIZE: usize = std::mem::size_of::<u32>();
/// Replica status: host standby feedback + disk consistent lsn
#[derive(Debug, Clone, Copy)]
@@ -83,20 +85,13 @@ pub enum CreateControlFile {
}
lazy_static! {
static ref PERSIST_SYNC_CONTROL_FILE_SECONDS: HistogramVec = register_histogram_vec!(
"safekeeper_persist_sync_control_file_seconds",
static ref PERSIST_CONTROL_FILE_SECONDS: HistogramVec = register_histogram_vec!(
"safekeeper_persist_control_file_seconds",
"Seconds to persist and sync control file, grouped by timeline",
&["timeline_id"],
DISK_WRITE_SECONDS_BUCKETS.to_vec()
)
.expect("Failed to register safekeeper_persist_sync_control_file_seconds histogram vec");
static ref PERSIST_NOSYNC_CONTROL_FILE_SECONDS: HistogramVec = register_histogram_vec!(
"safekeeper_persist_nosync_control_file_seconds",
"Seconds to persist and sync control file, grouped by timeline",
&["timeline_id"],
DISK_WRITE_SECONDS_BUCKETS.to_vec()
)
.expect("Failed to register safekeeper_persist_nosync_control_file_seconds histogram vec");
.expect("Failed to register safekeeper_persist_control_file_seconds histogram vec");
}
impl SharedState {
@@ -134,7 +129,7 @@ impl SharedState {
timelineid: ZTimelineId,
create: CreateControlFile,
) -> Result<Self> {
let (file_storage, state) = SharedState::load_from_control_file(conf, timelineid, create)
let (file_storage, state) = FileStorage::load_from_control_file(conf, timelineid, create)
.with_context(|| "failed to load from control file")?;
let flush_lsn = if state.server.wal_seg_size != 0 {
let wal_dir = conf.timeline_dir(&timelineid);
@@ -155,111 +150,6 @@ impl SharedState {
replicas: Vec::new(),
})
}
/// Fetch and lock control file (prevent running more than one instance of safekeeper)
/// If create=false and file doesn't exist, bails out.
fn load_from_control_file(
conf: &SafeKeeperConf,
timelineid: ZTimelineId,
create: CreateControlFile,
) -> Result<(FileStorage, SafeKeeperState)> {
let timeline_dir = conf.timeline_dir(&timelineid);
let control_file_path = timeline_dir.join(CONTROL_FILE_NAME);
let lock_file_path = timeline_dir.join(LOCK_FILE_NAME);
info!(
"loading control file {}, create={:?} lock file {:?}",
control_file_path.display(),
create,
lock_file_path.display(),
);
let lock_file = File::create(&lock_file_path).with_context(|| "failed to open lockfile")?;
// Lock file to prevent two or more active safekeepers
lock_file.try_lock_exclusive().map_err(|e| {
anyhow!(
"control file {:?} is locked by some other process: {}",
&control_file_path,
e
)
})?;
let mut control_file = OpenOptions::new()
.read(true)
.write(true)
.create(matches!(create, CreateControlFile::True))
.open(&control_file_path)
.with_context(|| {
format!(
"failed to open control file at {}",
control_file_path.display(),
)
})?;
// Empty file is legit on 'create', don't try to deser from it.
let state = if control_file.metadata().unwrap().len() == 0 {
if let CreateControlFile::False = create {
bail!("control file is empty");
}
SafeKeeperState::new()
} else {
let mut buf = Vec::new();
control_file
.read_to_end(&mut buf)
.with_context(|| "failed to read control file")?;
let calculated_checksum = crc32c::crc32c(&buf[..buf.len() - CHECKSUM_SIZE]);
let expected_checksum_bytes: &[u8; CHECKSUM_SIZE] =
buf[buf.len() - CHECKSUM_SIZE..].try_into()?;
let expected_checksum = u32::from_le_bytes(*expected_checksum_bytes);
ensure!(
calculated_checksum == expected_checksum,
format!(
"safe keeper state checksum mismatch expected {} got {}",
expected_checksum, calculated_checksum
)
);
let state =
SafeKeeperState::des(&buf[..buf.len() - CHECKSUM_SIZE]).with_context(|| {
format!(
"failed to deserialize safe keeper state from control file at {}",
control_file_path.display(),
)
})?;
if state.magic != SK_MAGIC {
bail!("bad control file magic: {}", state.magic);
}
if state.format_version != SK_FORMAT_VERSION {
bail!(
"Got incompatible format version, expected {}, got {}",
SK_FORMAT_VERSION,
state.format_version,
);
}
state
};
let timelineid_str = format!("{}", timelineid);
Ok((
FileStorage {
lock_file,
timeline_dir,
conf: conf.clone(),
persist_sync_control_file_seconds: PERSIST_SYNC_CONTROL_FILE_SECONDS
.with_label_values(&[&timelineid_str]),
persist_nosync_control_file_seconds: PERSIST_NOSYNC_CONTROL_FILE_SECONDS
.with_label_values(&[&timelineid_str]),
},
state,
))
}
}
/// Database instance (tenant)
@@ -441,20 +331,123 @@ struct FileStorage {
// save timeline dir to avoid reconstructing it every time
timeline_dir: PathBuf,
conf: SafeKeeperConf,
persist_sync_control_file_seconds: Histogram,
persist_nosync_control_file_seconds: Histogram,
persist_control_file_seconds: Histogram,
}
impl FileStorage {
// Check the magic/version in the on-disk data and deserialize it, if possible.
fn deser_sk_state(buf: &mut &[u8]) -> Result<SafeKeeperState> {
// Read the version independent part
let magic = buf.read_u32::<LittleEndian>()?;
if magic != SK_MAGIC {
bail!(
"bad control file magic: {:X}, expected {:X}",
magic,
SK_MAGIC
);
}
let version = buf.read_u32::<LittleEndian>()?;
if version == SK_FORMAT_VERSION {
let res = SafeKeeperState::des(buf)?;
return Ok(res);
}
// try to upgrade
upgrade_control_file(buf, version)
}
/// Fetch and lock control file (prevent running more than one instance of safekeeper)
/// If create=false and file doesn't exist, bails out.
fn load_from_control_file(
conf: &SafeKeeperConf,
timelineid: ZTimelineId,
create: CreateControlFile,
) -> Result<(FileStorage, SafeKeeperState)> {
let timeline_dir = conf.timeline_dir(&timelineid);
let control_file_path = timeline_dir.join(CONTROL_FILE_NAME);
let lock_file_path = timeline_dir.join(LOCK_FILE_NAME);
info!(
"loading control file {}, create={:?} lock file {:?}",
control_file_path.display(),
create,
lock_file_path.display(),
);
let lock_file = File::create(&lock_file_path).with_context(|| "failed to open lockfile")?;
// Lock file to prevent two or more active safekeepers
lock_file.try_lock_exclusive().map_err(|e| {
anyhow!(
"control file {:?} is locked by some other process: {}",
&control_file_path,
e
)
})?;
let mut control_file = OpenOptions::new()
.read(true)
.write(true)
.create(matches!(create, CreateControlFile::True))
.open(&control_file_path)
.with_context(|| {
format!(
"failed to open control file at {}",
control_file_path.display(),
)
})?;
// Empty file is legit on 'create', don't try to deser from it.
let state = if control_file.metadata().unwrap().len() == 0 {
if let CreateControlFile::False = create {
bail!("control file is empty");
}
SafeKeeperState::new()
} else {
let mut buf = Vec::new();
control_file
.read_to_end(&mut buf)
.with_context(|| "failed to read control file")?;
let calculated_checksum = crc32c::crc32c(&buf[..buf.len() - CHECKSUM_SIZE]);
let expected_checksum_bytes: &[u8; CHECKSUM_SIZE] =
buf[buf.len() - CHECKSUM_SIZE..].try_into()?;
let expected_checksum = u32::from_le_bytes(*expected_checksum_bytes);
ensure!(
calculated_checksum == expected_checksum,
format!(
"safekeeper control file checksum mismatch: expected {} got {}",
expected_checksum, calculated_checksum
)
);
FileStorage::deser_sk_state(&mut &buf[..buf.len() - CHECKSUM_SIZE]).with_context(
|| format!("while reading control file {}", control_file_path.display(),),
)?
};
let timelineid_str = format!("{}", timelineid);
Ok((
FileStorage {
lock_file,
timeline_dir,
conf: conf.clone(),
persist_control_file_seconds: PERSIST_CONTROL_FILE_SECONDS
.with_label_values(&[&timelineid_str]),
},
state,
))
}
}
impl Storage for FileStorage {
// persists state durably to underlying storage
// for description see https://lwn.net/Articles/457667/
fn persist(&mut self, s: &SafeKeeperState, sync: bool) -> Result<()> {
let _timer = if sync {
&self.persist_sync_control_file_seconds
} else {
&self.persist_nosync_control_file_seconds
}
.start_timer();
fn persist(&mut self, s: &SafeKeeperState) -> Result<()> {
let _timer = &self.persist_control_file_seconds.start_timer();
// write data to safekeeper.control.partial
let control_partial_path = self.timeline_dir.join(CONTROL_FILE_NAME_PARTIAL);
@@ -464,7 +457,11 @@ impl Storage for FileStorage {
&control_partial_path.display()
)
})?;
let mut buf = s.ser().with_context(|| "failed to serialize state")?;
let mut buf: Vec<u8> = Vec::new();
buf.write_u32::<LittleEndian>(SK_MAGIC)?;
buf.write_u32::<LittleEndian>(SK_FORMAT_VERSION)?;
s.ser_into(&mut buf)?;
// calculate checksum before resize
let checksum = crc32c::crc32c(&buf);
buf.extend_from_slice(&checksum.to_le_bytes());
@@ -476,36 +473,32 @@ impl Storage for FileStorage {
)
})?;
if sync {
// fsync the file
control_partial.sync_all().with_context(|| {
format!(
"failed to sync partial control file at {}",
control_partial_path.display()
)
})?;
}
// fsync the file
control_partial.sync_all().with_context(|| {
format!(
"failed to sync partial control file at {}",
control_partial_path.display()
)
})?;
let control_path = self.timeline_dir.join(CONTROL_FILE_NAME);
// rename should be atomic
fs::rename(&control_partial_path, &control_path)?;
if sync {
// this sync is not required by any standard but postgres does this (see durable_rename)
File::open(&control_path)
.and_then(|f| f.sync_all())
.with_context(|| {
format!(
"failed to sync control file at: {}",
&control_path.display()
)
})?;
// this sync is not required by any standard but postgres does this (see durable_rename)
File::open(&control_path)
.and_then(|f| f.sync_all())
.with_context(|| {
format!(
"failed to sync control file at: {}",
&control_path.display()
)
})?;
// fsync the directory (linux specific)
File::open(&self.timeline_dir)
.and_then(|f| f.sync_all())
.with_context(|| "failed to sync control file directory")?;
}
// fsync the directory (linux specific)
File::open(&self.timeline_dir)
.and_then(|f| f.sync_all())
.with_context(|| "failed to sync control file directory")?;
Ok(())
}
@@ -682,7 +675,7 @@ mod test {
use super::FileStorage;
use crate::{
safekeeper::{SafeKeeperState, Storage},
timeline::{CreateControlFile, SharedState, CONTROL_FILE_NAME},
timeline::{CreateControlFile, CONTROL_FILE_NAME},
SafeKeeperConf,
};
use anyhow::Result;
@@ -704,7 +697,7 @@ mod test {
) -> Result<(FileStorage, SafeKeeperState)> {
fs::create_dir_all(&conf.timeline_dir(&timeline_id))
.expect("failed to create timeline dir");
SharedState::load_from_control_file(conf, timeline_id, create)
FileStorage::load_from_control_file(conf, timeline_id, create)
}
#[test]
@@ -717,9 +710,7 @@ mod test {
.expect("failed to read state");
// change something
state.wal_start_lsn = Lsn(42);
storage
.persist(&state, true)
.expect("failed to persist state");
storage.persist(&state).expect("failed to persist state");
}
let (_, state) = load_from_control_file(&conf, timeline_id, CreateControlFile::False)
@@ -737,9 +728,7 @@ mod test {
.expect("failed to read state");
// change something
state.wal_start_lsn = Lsn(42);
storage
.persist(&state, true)
.expect("failed to persist state");
storage.persist(&state).expect("failed to persist state");
}
let control_path = conf.timeline_dir(&timeline_id).join(CONTROL_FILE_NAME);
let mut data = fs::read(&control_path).unwrap();
@@ -749,7 +738,7 @@ mod test {
match load_from_control_file(&conf, timeline_id, CreateControlFile::False) {
Err(err) => assert!(err
.to_string()
.contains("safe keeper state checksum mismatch")),
.contains("safekeeper control file checksum mismatch")),
Ok(_) => panic!("expected error"),
}
}

60
walkeeper/src/upgrade.rs Normal file
View File

@@ -0,0 +1,60 @@
//! Code to deal with safekeeper control file upgrades
use crate::safekeeper::{
AcceptorState, PgUuid, SafeKeeperState, ServerInfo, Term, TermHistory, TermSwitchEntry,
};
use anyhow::{bail, Result};
use serde::{Deserialize, Serialize};
use tracing::*;
use zenith_utils::{bin_ser::LeSer, lsn::Lsn};
/// Persistent consensus state of the acceptor.
#[derive(Debug, Clone, Serialize, Deserialize)]
struct AcceptorStateV1 {
/// acceptor's last term it voted for (advanced in 1 phase)
term: Term,
/// acceptor's epoch (advanced, i.e. bumped to 'term' when VCL is reached).
epoch: Term,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct SafeKeeperStateV1 {
/// persistent acceptor state
acceptor_state: AcceptorStateV1,
/// information about server
server: ServerInfo,
/// Unique id of the last *elected* proposer we dealed with. Not needed
/// for correctness, exists for monitoring purposes.
proposer_uuid: PgUuid,
/// part of WAL acknowledged by quorum and available locally
commit_lsn: Lsn,
/// minimal LSN which may be needed for recovery of some safekeeper (end_lsn
/// of last record streamed to everyone)
truncate_lsn: Lsn,
// Safekeeper starts receiving WAL from this LSN, zeros before it ought to
// be skipped during decoding.
wal_start_lsn: Lsn,
}
pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState> {
// migrate to storing full term history
if version == 1 {
info!("reading safekeeper control file version {}", version);
let oldstate = SafeKeeperStateV1::des(&buf[..buf.len()])?;
let ac = AcceptorState {
term: oldstate.acceptor_state.term,
term_history: TermHistory(vec![TermSwitchEntry {
term: oldstate.acceptor_state.epoch,
lsn: Lsn(0),
}]),
};
return Ok(SafeKeeperState {
acceptor_state: ac,
server: oldstate.server.clone(),
proposer_uuid: oldstate.proposer_uuid,
commit_lsn: oldstate.commit_lsn,
truncate_lsn: oldstate.truncate_lsn,
wal_start_lsn: oldstate.wal_start_lsn,
});
}
bail!("unsupported safekeeper control file version {}", version)
}

View File

@@ -3,9 +3,10 @@
//! receive WAL from wal_proposer and send it to WAL receivers
//!
use anyhow::Result;
use log::*;
use regex::Regex;
use std::net::{TcpListener, TcpStream};
use std::thread;
use tracing::*;
use crate::send_wal::SendWalHandler;
use crate::SafeKeeperConf;
@@ -33,9 +34,19 @@ pub fn thread_main(conf: SafeKeeperConf, listener: TcpListener) -> Result<()> {
}
}
// Get unique thread id (Rust internal), with ThreadId removed for shorter printing
fn get_tid() -> u64 {
let tids = format!("{:?}", thread::current().id());
let r = Regex::new(r"ThreadId\((\d+)\)").unwrap();
let caps = r.captures(&tids).unwrap();
caps.get(1).unwrap().as_str().parse().unwrap()
}
/// This is run by `thread_main` above, inside a background thread.
///
fn handle_socket(socket: TcpStream, conf: SafeKeeperConf) -> Result<()> {
let _enter = info_span!("", tid = ?get_tid()).entered();
socket.set_nodelay(true)?;
let mut conn_handler = SendWalHandler::new(conf);

View File

@@ -36,10 +36,17 @@ pg_port = {pageserver_pg_port}
http_port = {pageserver_http_port}
auth_type = '{pageserver_auth_type}'
[[safekeepers]]
name = '{safekeeper_name}'
pg_port = {safekeeper_pg_port}
http_port = {safekeeper_http_port}
"#,
pageserver_pg_port = DEFAULT_PAGESERVER_PG_PORT,
pageserver_http_port = DEFAULT_PAGESERVER_HTTP_PORT,
pageserver_auth_type = AuthType::Trust,
safekeeper_name = DEFAULT_SAFEKEEPER_NAME,
safekeeper_pg_port = DEFAULT_SAFEKEEPER_PG_PORT,
safekeeper_http_port = DEFAULT_SAFEKEEPER_HTTP_PORT,
)
}