mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-14 19:00:37 +00:00
Compare commits
9 Commits
parallel_w
...
sync-sk-te
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
686d199acf | ||
|
|
60783b986d | ||
|
|
1de822f1b0 | ||
|
|
37c85d5fd9 | ||
|
|
6094236171 | ||
|
|
bb5aba42eb | ||
|
|
450fb9eafe | ||
|
|
557e3024cd | ||
|
|
bd34d7ecfc |
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -2349,7 +2349,6 @@ dependencies = [
|
||||
"humantime",
|
||||
"hyper",
|
||||
"lazy_static",
|
||||
"log",
|
||||
"pageserver",
|
||||
"postgres",
|
||||
"postgres-protocol",
|
||||
@@ -2363,6 +2362,7 @@ dependencies = [
|
||||
"tempfile",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tracing",
|
||||
"walkdir",
|
||||
"workspace_hack",
|
||||
"zenith_metrics",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# Zenith
|
||||
|
||||
Zenith substitutes PostgreSQL storage layer and redistributes data across a cluster of nodes
|
||||
Zenith is an serverless open source alternative to AWS Aurora Postgres. It separates storage and compute and substitutes PostgreSQL storage layer by redistributes data across a cluster of nodes.
|
||||
|
||||
## Architecture overview
|
||||
|
||||
|
||||
@@ -199,17 +199,24 @@ impl PostgresNode {
|
||||
})
|
||||
}
|
||||
|
||||
fn sync_safekeepers(&self) -> Result<Lsn> {
|
||||
fn sync_safekeepers(&self, auth_token: &Option<String>) -> Result<Lsn> {
|
||||
let pg_path = self.env.pg_bin_dir().join("postgres");
|
||||
let sync_handle = Command::new(pg_path)
|
||||
.arg("--sync-safekeepers")
|
||||
let mut cmd = Command::new(&pg_path);
|
||||
|
||||
cmd.arg("--sync-safekeepers")
|
||||
.env_clear()
|
||||
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
|
||||
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
|
||||
.env("PGDATA", self.pgdata().to_str().unwrap())
|
||||
.stdout(Stdio::piped())
|
||||
// Comment this to avoid capturing stderr (useful if command hangs)
|
||||
.stderr(Stdio::piped())
|
||||
.stderr(Stdio::piped());
|
||||
|
||||
if let Some(token) = auth_token {
|
||||
cmd.env("ZENITH_AUTH_TOKEN", token);
|
||||
}
|
||||
|
||||
let sync_handle = cmd
|
||||
.spawn()
|
||||
.expect("postgres --sync-safekeepers failed to start");
|
||||
|
||||
@@ -287,15 +294,14 @@ impl PostgresNode {
|
||||
conf.append("max_replication_slots", "10");
|
||||
conf.append("hot_standby", "on");
|
||||
conf.append("shared_buffers", "1MB");
|
||||
conf.append("max_wal_size", "100GB");
|
||||
conf.append("fsync", "off");
|
||||
conf.append("max_connections", "100");
|
||||
conf.append("wal_level", "replica");
|
||||
// wal_sender_timeout is the maximum time to wait for WAL replication.
|
||||
// It also defines how often the walreciever will send a feedback message to the wal sender.
|
||||
//conf.append("wal_sender_timeout", "5s");
|
||||
//conf.append("max_replication_flush_lag", "160MB");
|
||||
//conf.append("max_replication_apply_lag", "1500MB");
|
||||
conf.append("wal_sender_timeout", "5s");
|
||||
conf.append("max_replication_flush_lag", "160MB");
|
||||
conf.append("max_replication_apply_lag", "1500MB");
|
||||
conf.append("listen_addresses", &self.address.ip().to_string());
|
||||
conf.append("port", &self.address.port().to_string());
|
||||
|
||||
@@ -320,8 +326,11 @@ impl PostgresNode {
|
||||
} else {
|
||||
""
|
||||
};
|
||||
|
||||
format!("host={} port={} password={}", host, port, password)
|
||||
// NOTE avoiding spaces in connection string, because it is less error prone if we forward it somewhere.
|
||||
// Also note that not all parameters are supported here. Because in compute we substitute $ZENITH_AUTH_TOKEN
|
||||
// We parse this string and build it back with token from env var, and for simplicity rebuild
|
||||
// uses only needed variables namely host, port, user, password.
|
||||
format!("postgresql://no_user:{}@{}:{}", password, host, port)
|
||||
};
|
||||
conf.append("shared_preload_libraries", "zenith");
|
||||
conf.append_line("");
|
||||
@@ -359,7 +368,7 @@ impl PostgresNode {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn load_basebackup(&self) -> Result<()> {
|
||||
fn load_basebackup(&self, auth_token: &Option<String>) -> Result<()> {
|
||||
let backup_lsn = if let Some(lsn) = self.lsn {
|
||||
Some(lsn)
|
||||
} else if self.uses_wal_proposer {
|
||||
@@ -367,7 +376,7 @@ impl PostgresNode {
|
||||
// latest data from the pageserver. That is a bit clumsy but whole bootstrap
|
||||
// procedure evolves quite actively right now, so let's think about it again
|
||||
// when things would be more stable (TODO).
|
||||
let lsn = self.sync_safekeepers()?;
|
||||
let lsn = self.sync_safekeepers(auth_token)?;
|
||||
if lsn == Lsn(0) {
|
||||
None
|
||||
} else {
|
||||
@@ -418,7 +427,6 @@ impl PostgresNode {
|
||||
.env_clear()
|
||||
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
|
||||
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap());
|
||||
|
||||
if let Some(token) = auth_token {
|
||||
cmd.env("ZENITH_AUTH_TOKEN", token);
|
||||
}
|
||||
@@ -452,7 +460,7 @@ impl PostgresNode {
|
||||
fs::write(&postgresql_conf_path, postgresql_conf)?;
|
||||
|
||||
// 3. Load basebackup
|
||||
self.load_basebackup()?;
|
||||
self.load_basebackup(auth_token)?;
|
||||
|
||||
if self.lsn.is_some() {
|
||||
File::create(self.pgdata().join("standby.signal"))?;
|
||||
|
||||
@@ -15,13 +15,11 @@ use reqwest::blocking::{Client, RequestBuilder, Response};
|
||||
use reqwest::{IntoUrl, Method};
|
||||
use thiserror::Error;
|
||||
use zenith_utils::http::error::HttpErrorBody;
|
||||
use zenith_utils::postgres_backend::AuthType;
|
||||
|
||||
use crate::local_env::{LocalEnv, SafekeeperConf};
|
||||
use crate::read_pidfile;
|
||||
use crate::storage::PageServerNode;
|
||||
use zenith_utils::connstring::connection_address;
|
||||
use zenith_utils::connstring::connection_host_port;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum SafekeeperHttpError {
|
||||
@@ -116,17 +114,6 @@ impl SafekeeperNode {
|
||||
);
|
||||
io::stdout().flush().unwrap();
|
||||
|
||||
// Configure connection to page server
|
||||
//
|
||||
// FIXME: We extract the host and port from the connection string instead of using
|
||||
// the connection string directly, because the 'safekeeper' binary expects
|
||||
// host:port format. That's a bit silly when we already have a full libpq connection
|
||||
// string at hand.
|
||||
let pageserver_conn = {
|
||||
let (host, port) = connection_host_port(&self.pageserver.pg_connection_config);
|
||||
format!("{}:{}", host, port)
|
||||
};
|
||||
|
||||
let listen_pg = format!("localhost:{}", self.conf.pg_port);
|
||||
let listen_http = format!("localhost:{}", self.conf.http_port);
|
||||
|
||||
@@ -134,7 +121,6 @@ impl SafekeeperNode {
|
||||
cmd.args(&["-D", self.datadir_path().to_str().unwrap()])
|
||||
.args(&["--listen-pg", &listen_pg])
|
||||
.args(&["--listen-http", &listen_http])
|
||||
.args(&["--pageserver", &pageserver_conn])
|
||||
.args(&["--recall", "1 second"])
|
||||
.arg("--daemonize")
|
||||
.env_clear()
|
||||
@@ -143,10 +129,6 @@ impl SafekeeperNode {
|
||||
cmd.arg("--no-sync");
|
||||
}
|
||||
|
||||
if self.env.pageserver.auth_type == AuthType::ZenithJWT {
|
||||
cmd.env("PAGESERVER_AUTH_TOKEN", &self.env.pageserver.auth_token);
|
||||
}
|
||||
|
||||
let var = "LLVM_PROFILE_FILE";
|
||||
if let Some(val) = std::env::var_os(var) {
|
||||
cmd.env(var, val);
|
||||
|
||||
@@ -39,10 +39,10 @@ pub mod defaults {
|
||||
// would be more appropriate. But a low value forces the code to be exercised more,
|
||||
// which is good for now to trigger bugs.
|
||||
pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024;
|
||||
pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(10);
|
||||
pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(1);
|
||||
|
||||
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
|
||||
pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(10);
|
||||
pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100);
|
||||
|
||||
pub const DEFAULT_SUPERUSER: &str = "zenith_admin";
|
||||
pub const DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNC: usize = 100;
|
||||
|
||||
@@ -106,7 +106,7 @@ fn init_repo(conf: &'static PageServerConf, tenant_id: ZTenantId) {
|
||||
conf,
|
||||
Arc::new(walredo_mgr),
|
||||
tenant_id,
|
||||
false,
|
||||
true,
|
||||
));
|
||||
|
||||
let mut m = access_tenants();
|
||||
|
||||
@@ -32,7 +32,6 @@ use std::os::unix::io::AsRawFd;
|
||||
use std::path::PathBuf;
|
||||
use std::process::Stdio;
|
||||
use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command};
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
@@ -54,8 +53,6 @@ use postgres_ffi::nonrelfile_utils::transaction_id_set_status;
|
||||
use postgres_ffi::pg_constants;
|
||||
use postgres_ffi::XLogRecord;
|
||||
|
||||
const N_WAL_REDO_PROCS: usize = 1;
|
||||
|
||||
///
|
||||
/// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster.
|
||||
///
|
||||
@@ -142,8 +139,7 @@ pub struct PostgresRedoManager {
|
||||
tenantid: ZTenantId,
|
||||
conf: &'static PageServerConf,
|
||||
|
||||
round_robin: AtomicUsize,
|
||||
processes: [Mutex<Option<PostgresRedoProcess>>; N_WAL_REDO_PROCS],
|
||||
process: Mutex<Option<PostgresRedoProcess>>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -213,13 +209,12 @@ impl WalRedoManager for PostgresRedoManager {
|
||||
end_time = Instant::now();
|
||||
WAL_REDO_TIME.observe(end_time.duration_since(start_time).as_secs_f64());
|
||||
} else {
|
||||
let rr = self.round_robin.fetch_add(1, Ordering::Relaxed) % N_WAL_REDO_PROCS;
|
||||
let mut process_guard = self.processes[rr].lock().unwrap();
|
||||
let mut process_guard = self.process.lock().unwrap();
|
||||
let lock_time = Instant::now();
|
||||
|
||||
// launch the WAL redo process on first use
|
||||
if process_guard.is_none() {
|
||||
let p = PostgresRedoProcess::launch(self.conf, &self.tenantid, rr)?;
|
||||
let p = PostgresRedoProcess::launch(self.conf, &self.tenantid)?;
|
||||
*process_guard = Some(p);
|
||||
}
|
||||
let process = process_guard.as_mut().unwrap();
|
||||
@@ -251,8 +246,7 @@ impl PostgresRedoManager {
|
||||
PostgresRedoManager {
|
||||
tenantid,
|
||||
conf,
|
||||
round_robin: AtomicUsize::new(0),
|
||||
processes: [(); N_WAL_REDO_PROCS].map(|_| Mutex::new(None)),
|
||||
process: Mutex::new(None),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -478,17 +472,11 @@ impl PostgresRedoProcess {
|
||||
//
|
||||
// Start postgres binary in special WAL redo mode.
|
||||
//
|
||||
fn launch(
|
||||
conf: &PageServerConf,
|
||||
tenantid: &ZTenantId,
|
||||
id: usize,
|
||||
) -> Result<PostgresRedoProcess, Error> {
|
||||
fn launch(conf: &PageServerConf, tenantid: &ZTenantId) -> Result<PostgresRedoProcess, Error> {
|
||||
// FIXME: We need a dummy Postgres cluster to run the process in. Currently, we
|
||||
// just create one with constant name. That fails if you try to launch more than
|
||||
// one WAL redo manager concurrently.
|
||||
let datadir = conf
|
||||
.tenant_path(tenantid)
|
||||
.join(format! {"wal-redo-datadir-{}", id});
|
||||
let datadir = conf.tenant_path(tenantid).join("wal-redo-datadir");
|
||||
|
||||
// Create empty data directory for wal-redo postgres, deleting old one first.
|
||||
if datadir.exists() {
|
||||
|
||||
@@ -340,10 +340,11 @@ class ProposerPostgres:
|
||||
"synchronous_standby_names = 'walproposer'\n",
|
||||
f"zenith.zenith_timeline = '{self.timeline_id}'\n",
|
||||
f"zenith.zenith_tenant = '{self.tenant_id}'\n",
|
||||
f"zenith.page_server_connstring = ''\n",
|
||||
f"wal_acceptors = '{wal_acceptors}'\n",
|
||||
])
|
||||
|
||||
def sync_safekeepers(self) -> str:
|
||||
def sync_safekeepers(self, timeout=60) -> str:
|
||||
"""
|
||||
Run 'postgres --sync-safekeepers'.
|
||||
Returns execution result, which is commit_lsn after sync.
|
||||
@@ -354,7 +355,7 @@ class ProposerPostgres:
|
||||
"PGDATA": self.pg_data_dir_path(),
|
||||
}
|
||||
|
||||
basepath = self.pg_bin.run_capture(command, env)
|
||||
basepath = self.pg_bin.run_capture(command, env, timeout=timeout)
|
||||
stdout_filename = basepath + '.stdout'
|
||||
|
||||
with open(stdout_filename, 'r') as stdout_f:
|
||||
@@ -437,3 +438,90 @@ def test_timeline_status(zenith_env_builder: ZenithEnvBuilder):
|
||||
|
||||
epoch_after_reboot = wa_http_cli.timeline_status(tenant_id, timeline_id).acceptor_epoch
|
||||
assert epoch_after_reboot > epoch
|
||||
|
||||
|
||||
class WalAppender:
|
||||
"""Helper for appending WAL to safekeepers, keeps track of last inserted lsn."""
|
||||
|
||||
# 0/16B9188 is good lsn to start with, it's valid and not in the segment start, nor in zero segment
|
||||
def __init__(self,
|
||||
acceptors,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
epoch_start_lsn=0x16B9188,
|
||||
begin_lsn=0x16B9188):
|
||||
self.acceptors = acceptors
|
||||
self.epoch_start_lsn = epoch_start_lsn
|
||||
self.begin_lsn = begin_lsn
|
||||
self.tenant_id = tenant_id
|
||||
self.timeline_id = timeline_id
|
||||
self.flush_lsns = dict()
|
||||
|
||||
def append(self, i, term, lm_message="message", lm_prefix="", set_commit_lsn=False):
|
||||
"""Append new logical message to i'th safekeeper."""
|
||||
lsn = self.flush_lsns.get(i, self.begin_lsn)
|
||||
req = {
|
||||
"lm_prefix": lm_prefix,
|
||||
"lm_message": lm_message,
|
||||
"set_commit_lsn": set_commit_lsn,
|
||||
"term": term,
|
||||
"begin_lsn": lsn,
|
||||
"epoch_start_lsn": self.epoch_start_lsn,
|
||||
"truncate_lsn": self.begin_lsn,
|
||||
}
|
||||
|
||||
res = self.acceptors[i].append_logical_message(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
req,
|
||||
)
|
||||
|
||||
end_lsn = res["inserted_wal"]["end_lsn"]
|
||||
self.flush_lsns[i] = end_lsn
|
||||
|
||||
return res
|
||||
|
||||
def debug_print(self):
|
||||
"""Print lsn for each acceptor."""
|
||||
for i, lsn in self.flush_lsns.items():
|
||||
print(f'end_lsn for acceptors[{i}] = {lsn_to_hex(lsn)}')
|
||||
|
||||
|
||||
# one safekeeper with old term and a lot of non-commited wal
|
||||
def test_sync_safekeepers_old_term_ahead(repo_dir: str,
|
||||
pg_bin: PgBin,
|
||||
wa_factory: WalAcceptorFactory):
|
||||
wa_factory.start_n_new(3)
|
||||
|
||||
timeline_id = uuid.uuid4().hex
|
||||
tenant_id = uuid.uuid4().hex
|
||||
|
||||
# write config for proposer
|
||||
pgdata_dir = os.path.join(repo_dir, "proposer_pgdata")
|
||||
pg = ProposerPostgres(pgdata_dir, pg_bin, timeline_id, tenant_id)
|
||||
pg.create_dir_config(wa_factory.get_connstrs())
|
||||
|
||||
# append wal to safekeepers
|
||||
appender = WalAppender(wa_factory.instances, tenant_id, timeline_id)
|
||||
|
||||
appender.append(0, term=1, lm_message="msg1")
|
||||
appender.append(1, term=1, lm_message="msg1")
|
||||
appender.append(2, term=1, lm_message="msg1")
|
||||
|
||||
appender.append(2, term=1, lm_message="msg1")
|
||||
appender.append(2, term=1, lm_message="msg1")
|
||||
appender.append(2, term=1, lm_message="msg1")
|
||||
appender.append(2, term=1, lm_message="msg1")
|
||||
|
||||
new_epoch = appender.flush_lsns[0]
|
||||
appender.epoch_start_lsn = new_epoch
|
||||
|
||||
appender.append(0, term=2, lm_message="msg2")
|
||||
appender.append(1, term=2, lm_message="msg2")
|
||||
|
||||
# run sync safekeepers
|
||||
# FIXME: fails with timeout
|
||||
lsn_after_sync = pg.sync_safekeepers()
|
||||
print(f"lsn after sync = {lsn_after_sync}")
|
||||
|
||||
appender.debug_print()
|
||||
|
||||
2
vendor/postgres
vendored
2
vendor/postgres
vendored
Submodule vendor/postgres updated: be8bdba074...a97cfe8ed7
@@ -16,7 +16,7 @@ routerify = "2"
|
||||
fs2 = "0.4.3"
|
||||
lazy_static = "1.4.0"
|
||||
serde_json = "1"
|
||||
log = "0.4.14"
|
||||
tracing = "0.1.27"
|
||||
clap = "2.33.0"
|
||||
daemonize = "0.4.1"
|
||||
rust-s3 = { version = "0.27.0-rc4", features = ["no-verify-ssl"] }
|
||||
|
||||
@@ -5,9 +5,9 @@ use anyhow::Result;
|
||||
use clap::{App, Arg};
|
||||
use const_format::formatcp;
|
||||
use daemonize::Daemonize;
|
||||
use log::*;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::thread;
|
||||
use tracing::*;
|
||||
use walkeeper::defaults::{DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR};
|
||||
use walkeeper::http;
|
||||
use walkeeper::s3_offload;
|
||||
@@ -44,6 +44,9 @@ fn main() -> Result<()> {
|
||||
.takes_value(true)
|
||||
.help(formatcp!("http endpoint address for metrics on ip:port (default: {DEFAULT_HTTP_LISTEN_ADDR})")),
|
||||
)
|
||||
// FIXME this argument is no longer needed since pageserver address is forwarded from compute.
|
||||
// However because this argument is in use by console's e2e tests lets keep it for now and remove separately.
|
||||
// So currently it is a noop.
|
||||
.arg(
|
||||
Arg::with_name("pageserver")
|
||||
.short("p")
|
||||
@@ -101,10 +104,6 @@ fn main() -> Result<()> {
|
||||
conf.listen_http_addr = addr.to_owned();
|
||||
}
|
||||
|
||||
if let Some(addr) = arg_matches.value_of("pageserver") {
|
||||
conf.pageserver_addr = Some(addr.to_owned());
|
||||
}
|
||||
|
||||
if let Some(ttl) = arg_matches.value_of("ttl") {
|
||||
conf.ttl = Some(humantime::parse_duration(ttl)?);
|
||||
}
|
||||
|
||||
@@ -9,8 +9,8 @@
|
||||
use anyhow::{anyhow, Result};
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use crc32c::crc32c_append;
|
||||
use log::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tracing::*;
|
||||
|
||||
use crate::safekeeper::{AcceptorProposerMessage, AppendResponse};
|
||||
use crate::safekeeper::{
|
||||
|
||||
@@ -2,7 +2,6 @@
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
|
||||
use std::env;
|
||||
use zenith_utils::zid::ZTimelineId;
|
||||
|
||||
pub mod http;
|
||||
@@ -13,6 +12,7 @@ pub mod s3_offload;
|
||||
pub mod safekeeper;
|
||||
pub mod send_wal;
|
||||
pub mod timeline;
|
||||
pub mod upgrade;
|
||||
pub mod wal_service;
|
||||
|
||||
pub mod defaults {
|
||||
@@ -39,9 +39,6 @@ pub struct SafeKeeperConf {
|
||||
pub no_sync: bool,
|
||||
pub listen_pg_addr: String,
|
||||
pub listen_http_addr: String,
|
||||
pub pageserver_addr: Option<String>,
|
||||
// TODO (create issue) this is temporary, until protocol between PG<->SK<->PS rework
|
||||
pub pageserver_auth_token: Option<String>,
|
||||
pub ttl: Option<Duration>,
|
||||
pub recall_period: Option<Duration>,
|
||||
}
|
||||
@@ -61,12 +58,10 @@ impl Default for SafeKeeperConf {
|
||||
workdir: PathBuf::from("./"),
|
||||
daemonize: false,
|
||||
no_sync: false,
|
||||
pageserver_addr: None,
|
||||
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
|
||||
listen_http_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
|
||||
ttl: None,
|
||||
recall_period: None,
|
||||
pageserver_auth_token: env::var("PAGESERVER_AUTH_TOKEN").ok(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,8 +5,8 @@
|
||||
use anyhow::{bail, Context, Result};
|
||||
use bytes::Bytes;
|
||||
use bytes::BytesMut;
|
||||
use log::*;
|
||||
use postgres::{Client, Config, NoTls};
|
||||
use tracing::*;
|
||||
|
||||
use std::net::SocketAddr;
|
||||
use std::thread;
|
||||
@@ -28,20 +28,22 @@ pub struct ReceiveWalConn<'pg> {
|
||||
pg_backend: &'pg mut PostgresBackend,
|
||||
/// The cached result of `pg_backend.socket().peer_addr()` (roughly)
|
||||
peer_addr: SocketAddr,
|
||||
/// Pageserver connection string forwarded from compute
|
||||
/// NOTE that it is allowed to operate without a pageserver.
|
||||
/// So if compute has no pageserver configured do not use it.
|
||||
pageserver_connstr: Option<String>,
|
||||
}
|
||||
|
||||
///
|
||||
/// Periodically request pageserver to call back.
|
||||
/// If pageserver already has replication channel, it will just ignore this request
|
||||
///
|
||||
fn request_callback(conf: SafeKeeperConf, timelineid: ZTimelineId, tenantid: ZTenantId) {
|
||||
let ps_addr = conf.pageserver_addr.unwrap();
|
||||
let ps_connstr = format!(
|
||||
"postgresql://no_user:{}@{}/no_db",
|
||||
&conf.pageserver_auth_token.unwrap_or_default(),
|
||||
ps_addr
|
||||
);
|
||||
|
||||
fn request_callback(
|
||||
conf: SafeKeeperConf,
|
||||
pageserver_connstr: String,
|
||||
timelineid: ZTimelineId,
|
||||
tenantid: ZTenantId,
|
||||
) {
|
||||
// use Config parsing because SockAddr parsing doesnt allow to use host names instead of ip addresses
|
||||
let me_connstr = format!("postgresql://no_user@{}/no_db", conf.listen_pg_addr);
|
||||
let me_conf: Config = me_connstr.parse().unwrap();
|
||||
@@ -54,15 +56,18 @@ fn request_callback(conf: SafeKeeperConf, timelineid: ZTimelineId, tenantid: ZTe
|
||||
loop {
|
||||
info!(
|
||||
"requesting page server to connect to us: start {} {}",
|
||||
ps_connstr, callme
|
||||
pageserver_connstr, callme
|
||||
);
|
||||
match Client::connect(&ps_connstr, NoTls) {
|
||||
match Client::connect(&pageserver_connstr, NoTls) {
|
||||
Ok(mut client) => {
|
||||
if let Err(e) = client.simple_query(&callme) {
|
||||
error!("Failed to send callme request to pageserver: {}", e);
|
||||
}
|
||||
}
|
||||
Err(e) => error!("Failed to connect to pageserver {}: {}", &ps_connstr, e),
|
||||
Err(e) => error!(
|
||||
"Failed to connect to pageserver {}: {}",
|
||||
&pageserver_connstr, e
|
||||
),
|
||||
}
|
||||
|
||||
if let Some(period) = conf.recall_period {
|
||||
@@ -74,11 +79,15 @@ fn request_callback(conf: SafeKeeperConf, timelineid: ZTimelineId, tenantid: ZTe
|
||||
}
|
||||
|
||||
impl<'pg> ReceiveWalConn<'pg> {
|
||||
pub fn new(pg: &'pg mut PostgresBackend) -> ReceiveWalConn<'pg> {
|
||||
pub fn new(
|
||||
pg: &'pg mut PostgresBackend,
|
||||
pageserver_connstr: Option<String>,
|
||||
) -> ReceiveWalConn<'pg> {
|
||||
let peer_addr = *pg.get_peer_addr();
|
||||
ReceiveWalConn {
|
||||
pg_backend: pg,
|
||||
peer_addr,
|
||||
pageserver_connstr,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -107,6 +116,8 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
|
||||
/// Receive WAL from wal_proposer
|
||||
pub fn run(&mut self, swh: &mut SendWalHandler) -> Result<()> {
|
||||
let _enter = info_span!("WAL acceptor", timeline = %swh.timelineid.unwrap()).entered();
|
||||
|
||||
// Notify the libpq client that it's allowed to send `CopyData` messages
|
||||
self.pg_backend
|
||||
.write_message(&BeMessage::CopyBothResponse)?;
|
||||
@@ -127,17 +138,17 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
_ => bail!("unexpected message {:?} instead of greeting", msg),
|
||||
}
|
||||
|
||||
// if requested, ask pageserver to fetch wal from us
|
||||
// xxx: this place seems not really fitting
|
||||
if swh.conf.pageserver_addr.is_some() {
|
||||
// Need to establish replication channel with page server.
|
||||
// Add far as replication in postgres is initiated by receiver, we should use callme mechanism
|
||||
// Need to establish replication channel with page server.
|
||||
// Add far as replication in postgres is initiated by receiver, we should use callme mechanism
|
||||
if let Some(ref pageserver_connstr) = self.pageserver_connstr {
|
||||
let conf = swh.conf.clone();
|
||||
let timelineid = swh.timeline.get().timelineid;
|
||||
// copy to safely move to a thread
|
||||
let pageserver_connstr = pageserver_connstr.to_owned();
|
||||
let _ = thread::Builder::new()
|
||||
.name("request_callback thread".into())
|
||||
.spawn(move || {
|
||||
request_callback(conf, timelineid, tenant_id);
|
||||
request_callback(conf, pageserver_connstr, timelineid, tenant_id);
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
@@ -5,7 +5,6 @@ use crate::send_wal::SendWalHandler;
|
||||
use crate::timeline::{ReplicaState, Timeline, TimelineTools};
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use bytes::Bytes;
|
||||
use log::*;
|
||||
use postgres_ffi::xlog_utils::{
|
||||
get_current_timestamp, TimestampTz, XLogFileName, MAX_SEND_SIZE, PG_TLI,
|
||||
};
|
||||
@@ -20,6 +19,7 @@ use std::sync::Arc;
|
||||
use std::thread::sleep;
|
||||
use std::time::Duration;
|
||||
use std::{str, thread};
|
||||
use tracing::*;
|
||||
use zenith_utils::bin_ser::BeSer;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
use zenith_utils::postgres_backend::PostgresBackend;
|
||||
@@ -177,6 +177,8 @@ impl ReplicationConn {
|
||||
pgb: &mut PostgresBackend,
|
||||
cmd: &Bytes,
|
||||
) -> Result<()> {
|
||||
let _enter = info_span!("WAL sender", timeline = %swh.timelineid.unwrap()).entered();
|
||||
|
||||
// spawn the background thread which receives HotStandbyFeedback messages.
|
||||
let bg_timeline = Arc::clone(swh.timeline.get());
|
||||
let bg_stream_in = self.stream_in.take().unwrap();
|
||||
|
||||
@@ -3,7 +3,6 @@
|
||||
//
|
||||
|
||||
use anyhow::Result;
|
||||
use log::*;
|
||||
use postgres_ffi::xlog_utils::*;
|
||||
use s3::bucket::Bucket;
|
||||
use s3::creds::Credentials;
|
||||
@@ -16,6 +15,7 @@ use std::path::Path;
|
||||
use std::time::SystemTime;
|
||||
use tokio::runtime;
|
||||
use tokio::time::sleep;
|
||||
use tracing::*;
|
||||
use walkdir::WalkDir;
|
||||
|
||||
use crate::SafeKeeperConf;
|
||||
|
||||
@@ -8,13 +8,13 @@ use bytes::Buf;
|
||||
use bytes::BufMut;
|
||||
use bytes::Bytes;
|
||||
use bytes::BytesMut;
|
||||
use log::*;
|
||||
use pageserver::waldecoder::WalStreamDecoder;
|
||||
use postgres_ffi::xlog_utils::TimeLineID;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::cmp::min;
|
||||
use std::fmt;
|
||||
use std::io::Read;
|
||||
use tracing::*;
|
||||
|
||||
use lazy_static::lazy_static;
|
||||
|
||||
@@ -30,7 +30,7 @@ use zenith_utils::pq_proto::SystemId;
|
||||
use zenith_utils::zid::{ZTenantId, ZTimelineId};
|
||||
|
||||
pub const SK_MAGIC: u32 = 0xcafeceefu32;
|
||||
pub const SK_FORMAT_VERSION: u32 = 1;
|
||||
pub const SK_FORMAT_VERSION: u32 = 2;
|
||||
const SK_PROTOCOL_VERSION: u32 = 1;
|
||||
const UNKNOWN_SERVER_VERSION: u32 = 0;
|
||||
|
||||
@@ -102,7 +102,7 @@ impl fmt::Debug for TermHistory {
|
||||
}
|
||||
|
||||
/// Unique id of proposer. Not needed for correctness, used for monitoring.
|
||||
type PgUuid = [u8; 16];
|
||||
pub type PgUuid = [u8; 16];
|
||||
|
||||
/// Persistent consensus state of the acceptor.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
@@ -140,12 +140,9 @@ pub struct ServerInfo {
|
||||
}
|
||||
|
||||
/// Persistent information stored on safekeeper node
|
||||
/// On disk data is prefixed by magic and format version and followed by checksum.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct SafeKeeperState {
|
||||
/// magic for verifying content the control file
|
||||
pub magic: u32,
|
||||
/// safekeeper format version
|
||||
pub format_version: u32,
|
||||
/// persistent acceptor state
|
||||
pub acceptor_state: AcceptorState,
|
||||
/// information about server
|
||||
@@ -166,8 +163,6 @@ pub struct SafeKeeperState {
|
||||
impl SafeKeeperState {
|
||||
pub fn new() -> SafeKeeperState {
|
||||
SafeKeeperState {
|
||||
magic: SK_MAGIC,
|
||||
format_version: SK_FORMAT_VERSION,
|
||||
acceptor_state: AcceptorState {
|
||||
term: 0,
|
||||
term_history: TermHistory::empty(),
|
||||
@@ -414,8 +409,8 @@ impl AcceptorProposerMessage {
|
||||
}
|
||||
|
||||
pub trait Storage {
|
||||
/// Persist safekeeper state on disk, optionally syncing it.
|
||||
fn persist(&mut self, s: &SafeKeeperState, sync: bool) -> Result<()>;
|
||||
/// Persist safekeeper state on disk.
|
||||
fn persist(&mut self, s: &SafeKeeperState) -> Result<()>;
|
||||
/// Write piece of wal in buf to disk and sync it.
|
||||
fn write_wal(&mut self, server: &ServerInfo, startpos: Lsn, buf: &[u8]) -> Result<()>;
|
||||
// Truncate WAL at specified LSN
|
||||
@@ -568,7 +563,7 @@ where
|
||||
self.s.server.ztli = msg.ztli;
|
||||
self.s.server.wal_seg_size = msg.wal_seg_size;
|
||||
self.storage
|
||||
.persist(&self.s, true)
|
||||
.persist(&self.s)
|
||||
.with_context(|| "failed to persist shared state")?;
|
||||
|
||||
self.metrics = SafeKeeperMetrics::new(self.s.server.ztli);
|
||||
@@ -598,7 +593,7 @@ where
|
||||
if self.s.acceptor_state.term < msg.term {
|
||||
self.s.acceptor_state.term = msg.term;
|
||||
// persist vote before sending it out
|
||||
self.storage.persist(&self.s, true)?;
|
||||
self.storage.persist(&self.s)?;
|
||||
resp.term = self.s.acceptor_state.term;
|
||||
resp.vote_given = true as u64;
|
||||
}
|
||||
@@ -610,7 +605,7 @@ where
|
||||
fn bump_if_higher(&mut self, term: Term) -> Result<()> {
|
||||
if self.s.acceptor_state.term < term {
|
||||
self.s.acceptor_state.term = term;
|
||||
self.storage.persist(&self.s, true)?;
|
||||
self.storage.persist(&self.s)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -649,7 +644,7 @@ where
|
||||
self.flush_lsn = msg.start_streaming_at;
|
||||
// and now adopt term history from proposer
|
||||
self.s.acceptor_state.term_history = msg.term_history.clone();
|
||||
self.storage.persist(&self.s, true)?;
|
||||
self.storage.persist(&self.s)?;
|
||||
|
||||
info!("start receiving WAL since {:?}", msg.start_streaming_at);
|
||||
|
||||
@@ -753,7 +748,10 @@ where
|
||||
self.s.commit_lsn = self.commit_lsn;
|
||||
self.s.truncate_lsn = self.truncate_lsn;
|
||||
}
|
||||
self.storage.persist(&self.s, sync_control_file)?;
|
||||
|
||||
if sync_control_file {
|
||||
self.storage.persist(&self.s)?;
|
||||
}
|
||||
|
||||
let resp = self.append_response();
|
||||
info!(
|
||||
@@ -778,7 +776,7 @@ mod tests {
|
||||
}
|
||||
|
||||
impl Storage for InMemoryStorage {
|
||||
fn persist(&mut self, s: &SafeKeeperState, _sync: bool) -> Result<()> {
|
||||
fn persist(&mut self, s: &SafeKeeperState) -> Result<()> {
|
||||
self.persisted_state = s.clone();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -46,6 +46,7 @@ impl postgres_backend::Handler for SendWalHandler {
|
||||
if let Some(app_name) = sm.params.get("application_name") {
|
||||
self.appname = Some(app_name.clone());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -75,7 +76,17 @@ impl postgres_backend::Handler for SendWalHandler {
|
||||
} else if query_string.starts_with(b"START_REPLICATION") {
|
||||
ReplicationConn::new(pgb).run(self, pgb, &query_string)?;
|
||||
} else if query_string.starts_with(b"START_WAL_PUSH") {
|
||||
ReceiveWalConn::new(pgb)
|
||||
// TODO: this repeats query decoding logic from page_service so it is probably
|
||||
// a good idea to refactor it in pgbackend and pass string to process query instead of bytes
|
||||
let decoded_query_string = match query_string.last() {
|
||||
Some(0) => std::str::from_utf8(&query_string[..query_string.len() - 1])?,
|
||||
_ => std::str::from_utf8(&query_string)?,
|
||||
};
|
||||
let pageserver_connstr = decoded_query_string
|
||||
.split_whitespace()
|
||||
.nth(1)
|
||||
.map(|s| s.to_owned());
|
||||
ReceiveWalConn::new(pgb, pageserver_connstr)
|
||||
.run(self)
|
||||
.with_context(|| "failed to run ReceiveWalConn")?;
|
||||
} else if query_string.starts_with(b"JSON_CTRL") {
|
||||
|
||||
@@ -2,9 +2,9 @@
|
||||
//! persistence and support for interaction between sending and receiving wal.
|
||||
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
|
||||
use fs2::FileExt;
|
||||
use lazy_static::lazy_static;
|
||||
use log::*;
|
||||
use postgres_ffi::xlog_utils::{find_end_of_wal, PG_TLI};
|
||||
use std::cmp::{max, min};
|
||||
use std::collections::HashMap;
|
||||
@@ -13,6 +13,7 @@ use std::io::{Read, Seek, SeekFrom, Write};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::{Arc, Condvar, Mutex};
|
||||
use std::time::Duration;
|
||||
use tracing::*;
|
||||
use zenith_metrics::{register_histogram_vec, Histogram, HistogramVec, DISK_WRITE_SECONDS_BUCKETS};
|
||||
use zenith_utils::bin_ser::LeSer;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
@@ -23,6 +24,7 @@ use crate::safekeeper::{
|
||||
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState, ServerInfo,
|
||||
Storage, SK_FORMAT_VERSION, SK_MAGIC,
|
||||
};
|
||||
use crate::upgrade::upgrade_control_file;
|
||||
use crate::SafeKeeperConf;
|
||||
use postgres_ffi::xlog_utils::{XLogFileName, XLOG_BLCKSZ};
|
||||
use std::convert::TryInto;
|
||||
@@ -34,7 +36,7 @@ const CONTROL_FILE_NAME_PARTIAL: &str = "safekeeper.control.partial";
|
||||
// dedicated lockfile to prevent running several safekeepers on the same data
|
||||
const LOCK_FILE_NAME: &str = "safekeeper.lock";
|
||||
const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
|
||||
const CHECKSUM_SIZE: usize = std::mem::size_of::<u32>();
|
||||
pub const CHECKSUM_SIZE: usize = std::mem::size_of::<u32>();
|
||||
|
||||
/// Replica status: host standby feedback + disk consistent lsn
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
@@ -83,20 +85,13 @@ pub enum CreateControlFile {
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
static ref PERSIST_SYNC_CONTROL_FILE_SECONDS: HistogramVec = register_histogram_vec!(
|
||||
"safekeeper_persist_sync_control_file_seconds",
|
||||
static ref PERSIST_CONTROL_FILE_SECONDS: HistogramVec = register_histogram_vec!(
|
||||
"safekeeper_persist_control_file_seconds",
|
||||
"Seconds to persist and sync control file, grouped by timeline",
|
||||
&["timeline_id"],
|
||||
DISK_WRITE_SECONDS_BUCKETS.to_vec()
|
||||
)
|
||||
.expect("Failed to register safekeeper_persist_sync_control_file_seconds histogram vec");
|
||||
static ref PERSIST_NOSYNC_CONTROL_FILE_SECONDS: HistogramVec = register_histogram_vec!(
|
||||
"safekeeper_persist_nosync_control_file_seconds",
|
||||
"Seconds to persist and sync control file, grouped by timeline",
|
||||
&["timeline_id"],
|
||||
DISK_WRITE_SECONDS_BUCKETS.to_vec()
|
||||
)
|
||||
.expect("Failed to register safekeeper_persist_nosync_control_file_seconds histogram vec");
|
||||
.expect("Failed to register safekeeper_persist_control_file_seconds histogram vec");
|
||||
}
|
||||
|
||||
impl SharedState {
|
||||
@@ -134,7 +129,7 @@ impl SharedState {
|
||||
timelineid: ZTimelineId,
|
||||
create: CreateControlFile,
|
||||
) -> Result<Self> {
|
||||
let (file_storage, state) = SharedState::load_from_control_file(conf, timelineid, create)
|
||||
let (file_storage, state) = FileStorage::load_from_control_file(conf, timelineid, create)
|
||||
.with_context(|| "failed to load from control file")?;
|
||||
let flush_lsn = if state.server.wal_seg_size != 0 {
|
||||
let wal_dir = conf.timeline_dir(&timelineid);
|
||||
@@ -155,111 +150,6 @@ impl SharedState {
|
||||
replicas: Vec::new(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Fetch and lock control file (prevent running more than one instance of safekeeper)
|
||||
/// If create=false and file doesn't exist, bails out.
|
||||
fn load_from_control_file(
|
||||
conf: &SafeKeeperConf,
|
||||
timelineid: ZTimelineId,
|
||||
create: CreateControlFile,
|
||||
) -> Result<(FileStorage, SafeKeeperState)> {
|
||||
let timeline_dir = conf.timeline_dir(&timelineid);
|
||||
|
||||
let control_file_path = timeline_dir.join(CONTROL_FILE_NAME);
|
||||
let lock_file_path = timeline_dir.join(LOCK_FILE_NAME);
|
||||
|
||||
info!(
|
||||
"loading control file {}, create={:?} lock file {:?}",
|
||||
control_file_path.display(),
|
||||
create,
|
||||
lock_file_path.display(),
|
||||
);
|
||||
|
||||
let lock_file = File::create(&lock_file_path).with_context(|| "failed to open lockfile")?;
|
||||
|
||||
// Lock file to prevent two or more active safekeepers
|
||||
lock_file.try_lock_exclusive().map_err(|e| {
|
||||
anyhow!(
|
||||
"control file {:?} is locked by some other process: {}",
|
||||
&control_file_path,
|
||||
e
|
||||
)
|
||||
})?;
|
||||
|
||||
let mut control_file = OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create(matches!(create, CreateControlFile::True))
|
||||
.open(&control_file_path)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"failed to open control file at {}",
|
||||
control_file_path.display(),
|
||||
)
|
||||
})?;
|
||||
|
||||
// Empty file is legit on 'create', don't try to deser from it.
|
||||
let state = if control_file.metadata().unwrap().len() == 0 {
|
||||
if let CreateControlFile::False = create {
|
||||
bail!("control file is empty");
|
||||
}
|
||||
SafeKeeperState::new()
|
||||
} else {
|
||||
let mut buf = Vec::new();
|
||||
control_file
|
||||
.read_to_end(&mut buf)
|
||||
.with_context(|| "failed to read control file")?;
|
||||
|
||||
let calculated_checksum = crc32c::crc32c(&buf[..buf.len() - CHECKSUM_SIZE]);
|
||||
|
||||
let expected_checksum_bytes: &[u8; CHECKSUM_SIZE] =
|
||||
buf[buf.len() - CHECKSUM_SIZE..].try_into()?;
|
||||
let expected_checksum = u32::from_le_bytes(*expected_checksum_bytes);
|
||||
|
||||
ensure!(
|
||||
calculated_checksum == expected_checksum,
|
||||
format!(
|
||||
"safe keeper state checksum mismatch expected {} got {}",
|
||||
expected_checksum, calculated_checksum
|
||||
)
|
||||
);
|
||||
|
||||
let state =
|
||||
SafeKeeperState::des(&buf[..buf.len() - CHECKSUM_SIZE]).with_context(|| {
|
||||
format!(
|
||||
"failed to deserialize safe keeper state from control file at {}",
|
||||
control_file_path.display(),
|
||||
)
|
||||
})?;
|
||||
|
||||
if state.magic != SK_MAGIC {
|
||||
bail!("bad control file magic: {}", state.magic);
|
||||
}
|
||||
if state.format_version != SK_FORMAT_VERSION {
|
||||
bail!(
|
||||
"Got incompatible format version, expected {}, got {}",
|
||||
SK_FORMAT_VERSION,
|
||||
state.format_version,
|
||||
);
|
||||
}
|
||||
state
|
||||
};
|
||||
|
||||
let timelineid_str = format!("{}", timelineid);
|
||||
|
||||
Ok((
|
||||
FileStorage {
|
||||
lock_file,
|
||||
timeline_dir,
|
||||
conf: conf.clone(),
|
||||
persist_sync_control_file_seconds: PERSIST_SYNC_CONTROL_FILE_SECONDS
|
||||
.with_label_values(&[&timelineid_str]),
|
||||
persist_nosync_control_file_seconds: PERSIST_NOSYNC_CONTROL_FILE_SECONDS
|
||||
.with_label_values(&[&timelineid_str]),
|
||||
},
|
||||
state,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
/// Database instance (tenant)
|
||||
@@ -441,20 +331,123 @@ struct FileStorage {
|
||||
// save timeline dir to avoid reconstructing it every time
|
||||
timeline_dir: PathBuf,
|
||||
conf: SafeKeeperConf,
|
||||
persist_sync_control_file_seconds: Histogram,
|
||||
persist_nosync_control_file_seconds: Histogram,
|
||||
persist_control_file_seconds: Histogram,
|
||||
}
|
||||
|
||||
impl FileStorage {
|
||||
// Check the magic/version in the on-disk data and deserialize it, if possible.
|
||||
fn deser_sk_state(buf: &mut &[u8]) -> Result<SafeKeeperState> {
|
||||
// Read the version independent part
|
||||
let magic = buf.read_u32::<LittleEndian>()?;
|
||||
if magic != SK_MAGIC {
|
||||
bail!(
|
||||
"bad control file magic: {:X}, expected {:X}",
|
||||
magic,
|
||||
SK_MAGIC
|
||||
);
|
||||
}
|
||||
let version = buf.read_u32::<LittleEndian>()?;
|
||||
if version == SK_FORMAT_VERSION {
|
||||
let res = SafeKeeperState::des(buf)?;
|
||||
return Ok(res);
|
||||
}
|
||||
// try to upgrade
|
||||
upgrade_control_file(buf, version)
|
||||
}
|
||||
|
||||
/// Fetch and lock control file (prevent running more than one instance of safekeeper)
|
||||
/// If create=false and file doesn't exist, bails out.
|
||||
fn load_from_control_file(
|
||||
conf: &SafeKeeperConf,
|
||||
timelineid: ZTimelineId,
|
||||
create: CreateControlFile,
|
||||
) -> Result<(FileStorage, SafeKeeperState)> {
|
||||
let timeline_dir = conf.timeline_dir(&timelineid);
|
||||
|
||||
let control_file_path = timeline_dir.join(CONTROL_FILE_NAME);
|
||||
let lock_file_path = timeline_dir.join(LOCK_FILE_NAME);
|
||||
|
||||
info!(
|
||||
"loading control file {}, create={:?} lock file {:?}",
|
||||
control_file_path.display(),
|
||||
create,
|
||||
lock_file_path.display(),
|
||||
);
|
||||
|
||||
let lock_file = File::create(&lock_file_path).with_context(|| "failed to open lockfile")?;
|
||||
|
||||
// Lock file to prevent two or more active safekeepers
|
||||
lock_file.try_lock_exclusive().map_err(|e| {
|
||||
anyhow!(
|
||||
"control file {:?} is locked by some other process: {}",
|
||||
&control_file_path,
|
||||
e
|
||||
)
|
||||
})?;
|
||||
|
||||
let mut control_file = OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.create(matches!(create, CreateControlFile::True))
|
||||
.open(&control_file_path)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"failed to open control file at {}",
|
||||
control_file_path.display(),
|
||||
)
|
||||
})?;
|
||||
|
||||
// Empty file is legit on 'create', don't try to deser from it.
|
||||
let state = if control_file.metadata().unwrap().len() == 0 {
|
||||
if let CreateControlFile::False = create {
|
||||
bail!("control file is empty");
|
||||
}
|
||||
SafeKeeperState::new()
|
||||
} else {
|
||||
let mut buf = Vec::new();
|
||||
control_file
|
||||
.read_to_end(&mut buf)
|
||||
.with_context(|| "failed to read control file")?;
|
||||
|
||||
let calculated_checksum = crc32c::crc32c(&buf[..buf.len() - CHECKSUM_SIZE]);
|
||||
|
||||
let expected_checksum_bytes: &[u8; CHECKSUM_SIZE] =
|
||||
buf[buf.len() - CHECKSUM_SIZE..].try_into()?;
|
||||
let expected_checksum = u32::from_le_bytes(*expected_checksum_bytes);
|
||||
|
||||
ensure!(
|
||||
calculated_checksum == expected_checksum,
|
||||
format!(
|
||||
"safekeeper control file checksum mismatch: expected {} got {}",
|
||||
expected_checksum, calculated_checksum
|
||||
)
|
||||
);
|
||||
|
||||
FileStorage::deser_sk_state(&mut &buf[..buf.len() - CHECKSUM_SIZE]).with_context(
|
||||
|| format!("while reading control file {}", control_file_path.display(),),
|
||||
)?
|
||||
};
|
||||
|
||||
let timelineid_str = format!("{}", timelineid);
|
||||
|
||||
Ok((
|
||||
FileStorage {
|
||||
lock_file,
|
||||
timeline_dir,
|
||||
conf: conf.clone(),
|
||||
persist_control_file_seconds: PERSIST_CONTROL_FILE_SECONDS
|
||||
.with_label_values(&[&timelineid_str]),
|
||||
},
|
||||
state,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
impl Storage for FileStorage {
|
||||
// persists state durably to underlying storage
|
||||
// for description see https://lwn.net/Articles/457667/
|
||||
fn persist(&mut self, s: &SafeKeeperState, sync: bool) -> Result<()> {
|
||||
let _timer = if sync {
|
||||
&self.persist_sync_control_file_seconds
|
||||
} else {
|
||||
&self.persist_nosync_control_file_seconds
|
||||
}
|
||||
.start_timer();
|
||||
fn persist(&mut self, s: &SafeKeeperState) -> Result<()> {
|
||||
let _timer = &self.persist_control_file_seconds.start_timer();
|
||||
|
||||
// write data to safekeeper.control.partial
|
||||
let control_partial_path = self.timeline_dir.join(CONTROL_FILE_NAME_PARTIAL);
|
||||
@@ -464,7 +457,11 @@ impl Storage for FileStorage {
|
||||
&control_partial_path.display()
|
||||
)
|
||||
})?;
|
||||
let mut buf = s.ser().with_context(|| "failed to serialize state")?;
|
||||
let mut buf: Vec<u8> = Vec::new();
|
||||
buf.write_u32::<LittleEndian>(SK_MAGIC)?;
|
||||
buf.write_u32::<LittleEndian>(SK_FORMAT_VERSION)?;
|
||||
s.ser_into(&mut buf)?;
|
||||
|
||||
// calculate checksum before resize
|
||||
let checksum = crc32c::crc32c(&buf);
|
||||
buf.extend_from_slice(&checksum.to_le_bytes());
|
||||
@@ -476,36 +473,32 @@ impl Storage for FileStorage {
|
||||
)
|
||||
})?;
|
||||
|
||||
if sync {
|
||||
// fsync the file
|
||||
control_partial.sync_all().with_context(|| {
|
||||
format!(
|
||||
"failed to sync partial control file at {}",
|
||||
control_partial_path.display()
|
||||
)
|
||||
})?;
|
||||
}
|
||||
// fsync the file
|
||||
control_partial.sync_all().with_context(|| {
|
||||
format!(
|
||||
"failed to sync partial control file at {}",
|
||||
control_partial_path.display()
|
||||
)
|
||||
})?;
|
||||
|
||||
let control_path = self.timeline_dir.join(CONTROL_FILE_NAME);
|
||||
|
||||
// rename should be atomic
|
||||
fs::rename(&control_partial_path, &control_path)?;
|
||||
if sync {
|
||||
// this sync is not required by any standard but postgres does this (see durable_rename)
|
||||
File::open(&control_path)
|
||||
.and_then(|f| f.sync_all())
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"failed to sync control file at: {}",
|
||||
&control_path.display()
|
||||
)
|
||||
})?;
|
||||
// this sync is not required by any standard but postgres does this (see durable_rename)
|
||||
File::open(&control_path)
|
||||
.and_then(|f| f.sync_all())
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"failed to sync control file at: {}",
|
||||
&control_path.display()
|
||||
)
|
||||
})?;
|
||||
|
||||
// fsync the directory (linux specific)
|
||||
File::open(&self.timeline_dir)
|
||||
.and_then(|f| f.sync_all())
|
||||
.with_context(|| "failed to sync control file directory")?;
|
||||
}
|
||||
// fsync the directory (linux specific)
|
||||
File::open(&self.timeline_dir)
|
||||
.and_then(|f| f.sync_all())
|
||||
.with_context(|| "failed to sync control file directory")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -682,7 +675,7 @@ mod test {
|
||||
use super::FileStorage;
|
||||
use crate::{
|
||||
safekeeper::{SafeKeeperState, Storage},
|
||||
timeline::{CreateControlFile, SharedState, CONTROL_FILE_NAME},
|
||||
timeline::{CreateControlFile, CONTROL_FILE_NAME},
|
||||
SafeKeeperConf,
|
||||
};
|
||||
use anyhow::Result;
|
||||
@@ -704,7 +697,7 @@ mod test {
|
||||
) -> Result<(FileStorage, SafeKeeperState)> {
|
||||
fs::create_dir_all(&conf.timeline_dir(&timeline_id))
|
||||
.expect("failed to create timeline dir");
|
||||
SharedState::load_from_control_file(conf, timeline_id, create)
|
||||
FileStorage::load_from_control_file(conf, timeline_id, create)
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -717,9 +710,7 @@ mod test {
|
||||
.expect("failed to read state");
|
||||
// change something
|
||||
state.wal_start_lsn = Lsn(42);
|
||||
storage
|
||||
.persist(&state, true)
|
||||
.expect("failed to persist state");
|
||||
storage.persist(&state).expect("failed to persist state");
|
||||
}
|
||||
|
||||
let (_, state) = load_from_control_file(&conf, timeline_id, CreateControlFile::False)
|
||||
@@ -737,9 +728,7 @@ mod test {
|
||||
.expect("failed to read state");
|
||||
// change something
|
||||
state.wal_start_lsn = Lsn(42);
|
||||
storage
|
||||
.persist(&state, true)
|
||||
.expect("failed to persist state");
|
||||
storage.persist(&state).expect("failed to persist state");
|
||||
}
|
||||
let control_path = conf.timeline_dir(&timeline_id).join(CONTROL_FILE_NAME);
|
||||
let mut data = fs::read(&control_path).unwrap();
|
||||
@@ -749,7 +738,7 @@ mod test {
|
||||
match load_from_control_file(&conf, timeline_id, CreateControlFile::False) {
|
||||
Err(err) => assert!(err
|
||||
.to_string()
|
||||
.contains("safe keeper state checksum mismatch")),
|
||||
.contains("safekeeper control file checksum mismatch")),
|
||||
Ok(_) => panic!("expected error"),
|
||||
}
|
||||
}
|
||||
|
||||
60
walkeeper/src/upgrade.rs
Normal file
60
walkeeper/src/upgrade.rs
Normal file
@@ -0,0 +1,60 @@
|
||||
//! Code to deal with safekeeper control file upgrades
|
||||
use crate::safekeeper::{
|
||||
AcceptorState, PgUuid, SafeKeeperState, ServerInfo, Term, TermHistory, TermSwitchEntry,
|
||||
};
|
||||
use anyhow::{bail, Result};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tracing::*;
|
||||
use zenith_utils::{bin_ser::LeSer, lsn::Lsn};
|
||||
|
||||
/// Persistent consensus state of the acceptor.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
struct AcceptorStateV1 {
|
||||
/// acceptor's last term it voted for (advanced in 1 phase)
|
||||
term: Term,
|
||||
/// acceptor's epoch (advanced, i.e. bumped to 'term' when VCL is reached).
|
||||
epoch: Term,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
struct SafeKeeperStateV1 {
|
||||
/// persistent acceptor state
|
||||
acceptor_state: AcceptorStateV1,
|
||||
/// information about server
|
||||
server: ServerInfo,
|
||||
/// Unique id of the last *elected* proposer we dealed with. Not needed
|
||||
/// for correctness, exists for monitoring purposes.
|
||||
proposer_uuid: PgUuid,
|
||||
/// part of WAL acknowledged by quorum and available locally
|
||||
commit_lsn: Lsn,
|
||||
/// minimal LSN which may be needed for recovery of some safekeeper (end_lsn
|
||||
/// of last record streamed to everyone)
|
||||
truncate_lsn: Lsn,
|
||||
// Safekeeper starts receiving WAL from this LSN, zeros before it ought to
|
||||
// be skipped during decoding.
|
||||
wal_start_lsn: Lsn,
|
||||
}
|
||||
|
||||
pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState> {
|
||||
// migrate to storing full term history
|
||||
if version == 1 {
|
||||
info!("reading safekeeper control file version {}", version);
|
||||
let oldstate = SafeKeeperStateV1::des(&buf[..buf.len()])?;
|
||||
let ac = AcceptorState {
|
||||
term: oldstate.acceptor_state.term,
|
||||
term_history: TermHistory(vec![TermSwitchEntry {
|
||||
term: oldstate.acceptor_state.epoch,
|
||||
lsn: Lsn(0),
|
||||
}]),
|
||||
};
|
||||
return Ok(SafeKeeperState {
|
||||
acceptor_state: ac,
|
||||
server: oldstate.server.clone(),
|
||||
proposer_uuid: oldstate.proposer_uuid,
|
||||
commit_lsn: oldstate.commit_lsn,
|
||||
truncate_lsn: oldstate.truncate_lsn,
|
||||
wal_start_lsn: oldstate.wal_start_lsn,
|
||||
});
|
||||
}
|
||||
bail!("unsupported safekeeper control file version {}", version)
|
||||
}
|
||||
@@ -3,9 +3,10 @@
|
||||
//! receive WAL from wal_proposer and send it to WAL receivers
|
||||
//!
|
||||
use anyhow::Result;
|
||||
use log::*;
|
||||
use regex::Regex;
|
||||
use std::net::{TcpListener, TcpStream};
|
||||
use std::thread;
|
||||
use tracing::*;
|
||||
|
||||
use crate::send_wal::SendWalHandler;
|
||||
use crate::SafeKeeperConf;
|
||||
@@ -33,9 +34,19 @@ pub fn thread_main(conf: SafeKeeperConf, listener: TcpListener) -> Result<()> {
|
||||
}
|
||||
}
|
||||
|
||||
// Get unique thread id (Rust internal), with ThreadId removed for shorter printing
|
||||
fn get_tid() -> u64 {
|
||||
let tids = format!("{:?}", thread::current().id());
|
||||
let r = Regex::new(r"ThreadId\((\d+)\)").unwrap();
|
||||
let caps = r.captures(&tids).unwrap();
|
||||
caps.get(1).unwrap().as_str().parse().unwrap()
|
||||
}
|
||||
|
||||
/// This is run by `thread_main` above, inside a background thread.
|
||||
///
|
||||
fn handle_socket(socket: TcpStream, conf: SafeKeeperConf) -> Result<()> {
|
||||
let _enter = info_span!("", tid = ?get_tid()).entered();
|
||||
|
||||
socket.set_nodelay(true)?;
|
||||
|
||||
let mut conn_handler = SendWalHandler::new(conf);
|
||||
|
||||
@@ -36,10 +36,17 @@ pg_port = {pageserver_pg_port}
|
||||
http_port = {pageserver_http_port}
|
||||
auth_type = '{pageserver_auth_type}'
|
||||
|
||||
[[safekeepers]]
|
||||
name = '{safekeeper_name}'
|
||||
pg_port = {safekeeper_pg_port}
|
||||
http_port = {safekeeper_http_port}
|
||||
"#,
|
||||
pageserver_pg_port = DEFAULT_PAGESERVER_PG_PORT,
|
||||
pageserver_http_port = DEFAULT_PAGESERVER_HTTP_PORT,
|
||||
pageserver_auth_type = AuthType::Trust,
|
||||
safekeeper_name = DEFAULT_SAFEKEEPER_NAME,
|
||||
safekeeper_pg_port = DEFAULT_SAFEKEEPER_PG_PORT,
|
||||
safekeeper_http_port = DEFAULT_SAFEKEEPER_HTTP_PORT,
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user