Add WAL offloading to s3 on safekeepers.

Separate task is launched for each timeline and stopped when timeline doesn't
need offloading. Decision who offloads is done through etcd leader election;
currently there is no pre condition for participating, that's a TODO.

neon_local and tests infrastructure for remote storage in safekeepers added,
along with the test itself.

ref #1009

Co-authored-by: Anton Shyrabokau <ahtoxa@Antons-MacBook-Pro.local>
This commit is contained in:
Arseny Sher
2022-04-27 00:24:59 -07:00
parent 1d71949c51
commit 0e1bd57c53
28 changed files with 1146 additions and 429 deletions

View File

@@ -16,4 +16,3 @@ console_mgmt_base_url = http://console-release.local
bucket_name = zenith-storage-oregon
bucket_region = us-west-2
etcd_endpoints = etcd-release.local:2379
safekeeper_enable_s3_offload = false

View File

@@ -17,4 +17,3 @@ console_mgmt_base_url = http://console-staging.local
bucket_name = zenith-staging-storage-us-east-1
bucket_region = us-east-1
etcd_endpoints = etcd-staging.local:2379
safekeeper_enable_s3_offload = false

View File

@@ -6,7 +6,7 @@ After=network.target auditd.service
Type=simple
User=safekeeper
Environment=RUST_BACKTRACE=1 ZENITH_REPO_DIR=/storage/safekeeper/data LD_LIBRARY_PATH=/usr/local/lib
ExecStart=/usr/local/bin/safekeeper -l {{ inventory_hostname }}.local:6500 --listen-http {{ inventory_hostname }}.local:7676 -p {{ first_pageserver }}:6400 -D /storage/safekeeper/data --broker-endpoints={{ etcd_endpoints }} --enable-s3-offload={{ safekeeper_enable_s3_offload }}
ExecStart=/usr/local/bin/safekeeper -l {{ inventory_hostname }}.local:6500 --listen-http {{ inventory_hostname }}.local:7676 -p {{ first_pageserver }}:6400 -D /storage/safekeeper/data --broker-endpoints={{ etcd_endpoints }} --remote_storage='{bucket_name={{bucket_name}}, bucket_region={{bucket_region}}, prefix_in_bucket=wal}'
ExecReload=/bin/kill -HUP $MAINPID
KillMode=mixed
KillSignal=SIGINT

9
Cargo.lock generated
View File

@@ -1722,9 +1722,9 @@ dependencies = [
[[package]]
name = "once_cell"
version = "1.9.0"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da32515d9f6e6e489d7bc9d84c71b060db7247dc035bbe44eac88cf87486d8d5"
checksum = "87f3e037eac156d1775da914196f0f37741a274155e34a0b7e427c35d2a2ecb9"
[[package]]
name = "oorandom"
@@ -2403,6 +2403,7 @@ dependencies = [
"tempfile",
"tokio",
"tokio-util 0.7.0",
"toml_edit",
"tracing",
"workspace_hack",
]
@@ -2654,6 +2655,7 @@ name = "safekeeper"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"byteorder",
"bytes",
"clap 3.0.14",
@@ -2662,12 +2664,14 @@ dependencies = [
"daemonize",
"etcd_broker",
"fs2",
"futures",
"git-version",
"hex",
"humantime",
"hyper",
"lazy_static",
"metrics",
"once_cell",
"postgres",
"postgres-protocol",
"postgres_ffi",
@@ -2681,6 +2685,7 @@ dependencies = [
"tokio",
"tokio-postgres",
"tokio-util 0.7.0",
"toml_edit",
"tracing",
"url",
"utils",

View File

@@ -49,3 +49,12 @@ fn fill_rust_env_vars(cmd: &mut Command) -> &mut Command {
cmd
}
}
fn fill_aws_secrets_vars(mut cmd: &mut Command) -> &mut Command {
for env_key in ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"] {
if let Ok(value) = std::env::var(env_key) {
cmd = cmd.env(env_key, value);
}
}
cmd
}

View File

@@ -167,6 +167,8 @@ pub struct SafekeeperConf {
pub pg_port: u16,
pub http_port: u16,
pub sync: bool,
pub remote_storage: Option<String>,
pub backup_threads: Option<u32>,
}
impl Default for SafekeeperConf {
@@ -176,6 +178,8 @@ impl Default for SafekeeperConf {
pg_port: 0,
http_port: 0,
sync: true,
remote_storage: None,
backup_threads: None,
}
}
}
@@ -377,6 +381,7 @@ impl LocalEnv {
base_path != Path::new(""),
"repository base path is missing"
);
ensure!(
!base_path.exists(),
"directory '{}' already exists. Perhaps already initialized?",

View File

@@ -23,7 +23,7 @@ use utils::{
use crate::local_env::{LocalEnv, SafekeeperConf};
use crate::storage::PageServerNode;
use crate::{fill_rust_env_vars, read_pidfile};
use crate::{fill_aws_secrets_vars, fill_rust_env_vars, read_pidfile};
#[derive(Error, Debug)]
pub enum SafekeeperHttpError {
@@ -143,6 +143,14 @@ impl SafekeeperNode {
if let Some(prefix) = self.env.etcd_broker.broker_etcd_prefix.as_deref() {
cmd.args(&["--broker-etcd-prefix", prefix]);
}
if let Some(threads) = self.conf.backup_threads {
cmd.args(&["--backup-threads", threads.to_string().as_ref()]);
}
if let Some(ref remote_storage) = self.conf.remote_storage {
cmd.args(&["--remote-storage", remote_storage]);
}
fill_aws_secrets_vars(&mut cmd);
if !cmd.status()?.success() {
bail!(

View File

@@ -25,7 +25,7 @@ use utils::{
};
use crate::local_env::LocalEnv;
use crate::{fill_rust_env_vars, read_pidfile};
use crate::{fill_aws_secrets_vars, fill_rust_env_vars, read_pidfile};
use pageserver::tenant_mgr::TenantInfo;
#[derive(Error, Debug)]
@@ -493,12 +493,3 @@ impl PageServerNode {
Ok(timeline_info_response)
}
}
fn fill_aws_secrets_vars(mut cmd: &mut Command) -> &mut Command {
for env_key in ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"] {
if let Ok(value) = std::env::var(env_key) {
cmd = cmd.env(env_key, value);
}
}
cmd
}

View File

@@ -43,10 +43,10 @@ pub struct SkTimelineInfo {
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub commit_lsn: Option<Lsn>,
/// LSN up to which safekeeper offloaded WAL to s3.
/// LSN up to which safekeeper has backed WAL.
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub s3_wal_lsn: Option<Lsn>,
pub backup_lsn: Option<Lsn>,
/// LSN of last checkpoint uploaded by pageserver.
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]

View File

@@ -6,7 +6,6 @@ edition = "2021"
[dependencies]
anyhow = { version = "1.0", features = ["backtrace"] }
async-trait = "0.1"
metrics = { version = "0.1", path = "../metrics" }
once_cell = "1.8.0"
rusoto_core = "0.48"
@@ -15,6 +14,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
tokio = { version = "1.17", features = ["sync", "macros", "fs", "io-util"] }
tokio-util = { version = "0.7", features = ["io"] }
toml_edit = { version = "0.13", features = ["easy"] }
tracing = "0.1.27"
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -16,8 +16,10 @@ use std::{
path::{Path, PathBuf},
};
use anyhow::Context;
use anyhow::{bail, Context};
use tokio::io;
use toml_edit::Item;
use tracing::info;
pub use self::{
@@ -203,6 +205,90 @@ pub fn path_with_suffix_extension(original_path: impl AsRef<Path>, suffix: &str)
.with_extension(new_extension.as_ref())
}
impl RemoteStorageConfig {
pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result<RemoteStorageConfig> {
let local_path = toml.get("local_path");
let bucket_name = toml.get("bucket_name");
let bucket_region = toml.get("bucket_region");
let max_concurrent_syncs = NonZeroUsize::new(
parse_optional_integer("max_concurrent_syncs", toml)?
.unwrap_or(DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS),
)
.context("Failed to parse 'max_concurrent_syncs' as a positive integer")?;
let max_sync_errors = NonZeroU32::new(
parse_optional_integer("max_sync_errors", toml)?
.unwrap_or(DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS),
)
.context("Failed to parse 'max_sync_errors' as a positive integer")?;
let concurrency_limit = NonZeroUsize::new(
parse_optional_integer("concurrency_limit", toml)?
.unwrap_or(DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT),
)
.context("Failed to parse 'concurrency_limit' as a positive integer")?;
let storage = match (local_path, bucket_name, bucket_region) {
(None, None, None) => bail!("no 'local_path' nor 'bucket_name' option"),
(_, Some(_), None) => {
bail!("'bucket_region' option is mandatory if 'bucket_name' is given ")
}
(_, None, Some(_)) => {
bail!("'bucket_name' option is mandatory if 'bucket_region' is given ")
}
(None, Some(bucket_name), Some(bucket_region)) => RemoteStorageKind::AwsS3(S3Config {
bucket_name: parse_toml_string("bucket_name", bucket_name)?,
bucket_region: parse_toml_string("bucket_region", bucket_region)?,
prefix_in_bucket: toml
.get("prefix_in_bucket")
.map(|prefix_in_bucket| parse_toml_string("prefix_in_bucket", prefix_in_bucket))
.transpose()?,
endpoint: toml
.get("endpoint")
.map(|endpoint| parse_toml_string("endpoint", endpoint))
.transpose()?,
concurrency_limit,
}),
(Some(local_path), None, None) => RemoteStorageKind::LocalFs(PathBuf::from(
parse_toml_string("local_path", local_path)?,
)),
(Some(_), Some(_), _) => bail!("local_path and bucket_name are mutually exclusive"),
};
Ok(RemoteStorageConfig {
max_concurrent_syncs,
max_sync_errors,
storage,
})
}
}
// Helper functions to parse a toml Item
fn parse_optional_integer<I, E>(name: &str, item: &toml_edit::Item) -> anyhow::Result<Option<I>>
where
I: TryFrom<i64, Error = E>,
E: std::error::Error + Send + Sync + 'static,
{
let toml_integer = match item.get(name) {
Some(item) => item
.as_integer()
.with_context(|| format!("configure option {name} is not an integer"))?,
None => return Ok(None),
};
I::try_from(toml_integer)
.map(Some)
.with_context(|| format!("configure option {name} is too large"))
}
fn parse_toml_string(name: &str, item: &Item) -> anyhow::Result<String> {
let s = item
.as_str()
.with_context(|| format!("configure option {name} is not a string"))?;
Ok(s.to_string())
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -26,6 +26,9 @@ impl Lsn {
/// Maximum possible value for an LSN
pub const MAX: Lsn = Lsn(u64::MAX);
/// Invalid value for InvalidXLogRecPtr, as defined in xlogdefs.h
pub const INVALID: Lsn = Lsn(0);
/// Subtract a number, returning None on overflow.
pub fn checked_sub<T: Into<u64>>(self, other: T) -> Option<Lsn> {
let other: u64 = other.into();
@@ -103,6 +106,12 @@ impl Lsn {
pub fn is_aligned(&self) -> bool {
*self == self.align()
}
/// Return if the LSN is valid
/// mimics postgres XLogRecPtrIsInvalid macro
pub fn is_valid(self) -> bool {
self != Lsn::INVALID
}
}
impl From<u64> for Lsn {

View File

@@ -5,9 +5,9 @@
//! See also `settings.md` for better description on every parameter.
use anyhow::{anyhow, bail, ensure, Context, Result};
use remote_storage::{RemoteStorageConfig, RemoteStorageKind, S3Config};
use remote_storage::RemoteStorageConfig;
use std::env;
use std::num::{NonZeroU32, NonZeroUsize};
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::time::Duration;
@@ -394,7 +394,7 @@ impl PageServerConf {
)),
"auth_type" => builder.auth_type(parse_toml_from_str(key, item)?),
"remote_storage" => {
builder.remote_storage_config(Some(Self::parse_remote_storage_config(item)?))
builder.remote_storage_config(Some(RemoteStorageConfig::from_toml(item)?))
}
"tenant_config" => {
t_conf = Self::parse_toml_tenant_conf(item)?;
@@ -484,64 +484,6 @@ impl PageServerConf {
Ok(t_conf)
}
/// subroutine of parse_config(), to parse the `[remote_storage]` table.
fn parse_remote_storage_config(toml: &toml_edit::Item) -> anyhow::Result<RemoteStorageConfig> {
let local_path = toml.get("local_path");
let bucket_name = toml.get("bucket_name");
let bucket_region = toml.get("bucket_region");
let max_concurrent_syncs = NonZeroUsize::new(
parse_optional_integer("max_concurrent_syncs", toml)?
.unwrap_or(remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS),
)
.context("Failed to parse 'max_concurrent_syncs' as a positive integer")?;
let max_sync_errors = NonZeroU32::new(
parse_optional_integer("max_sync_errors", toml)?
.unwrap_or(remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS),
)
.context("Failed to parse 'max_sync_errors' as a positive integer")?;
let concurrency_limit = NonZeroUsize::new(
parse_optional_integer("concurrency_limit", toml)?
.unwrap_or(remote_storage::DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT),
)
.context("Failed to parse 'concurrency_limit' as a positive integer")?;
let storage = match (local_path, bucket_name, bucket_region) {
(None, None, None) => bail!("no 'local_path' nor 'bucket_name' option"),
(_, Some(_), None) => {
bail!("'bucket_region' option is mandatory if 'bucket_name' is given ")
}
(_, None, Some(_)) => {
bail!("'bucket_name' option is mandatory if 'bucket_region' is given ")
}
(None, Some(bucket_name), Some(bucket_region)) => RemoteStorageKind::AwsS3(S3Config {
bucket_name: parse_toml_string("bucket_name", bucket_name)?,
bucket_region: parse_toml_string("bucket_region", bucket_region)?,
prefix_in_bucket: toml
.get("prefix_in_bucket")
.map(|prefix_in_bucket| parse_toml_string("prefix_in_bucket", prefix_in_bucket))
.transpose()?,
endpoint: toml
.get("endpoint")
.map(|endpoint| parse_toml_string("endpoint", endpoint))
.transpose()?,
concurrency_limit,
}),
(Some(local_path), None, None) => RemoteStorageKind::LocalFs(PathBuf::from(
parse_toml_string("local_path", local_path)?,
)),
(Some(_), Some(_), _) => bail!("local_path and bucket_name are mutually exclusive"),
};
Ok(RemoteStorageConfig {
max_concurrent_syncs,
max_sync_errors,
storage,
})
}
#[cfg(test)]
pub fn test_repo_dir(test_name: &str) -> PathBuf {
PathBuf::from(format!("../tmp_check/test_{test_name}"))
@@ -592,23 +534,6 @@ fn parse_toml_u64(name: &str, item: &Item) -> Result<u64> {
Ok(i as u64)
}
fn parse_optional_integer<I, E>(name: &str, item: &toml_edit::Item) -> anyhow::Result<Option<I>>
where
I: TryFrom<i64, Error = E>,
E: std::error::Error + Send + Sync + 'static,
{
let toml_integer = match item.get(name) {
Some(item) => item
.as_integer()
.with_context(|| format!("configure option {name} is not an integer"))?,
None => return Ok(None),
};
I::try_from(toml_integer)
.map(Some)
.with_context(|| format!("configure option {name} is too large"))
}
fn parse_toml_duration(name: &str, item: &Item) -> Result<Duration> {
let s = item
.as_str()
@@ -651,8 +576,12 @@ fn parse_toml_array(name: &str, item: &Item) -> anyhow::Result<Vec<String>> {
#[cfg(test)]
mod tests {
use std::fs;
use std::{
fs,
num::{NonZeroU32, NonZeroUsize},
};
use remote_storage::{RemoteStorageKind, S3Config};
use tempfile::{tempdir, TempDir};
use super::*;

View File

@@ -30,6 +30,10 @@ const_format = "0.2.21"
tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
tokio-util = { version = "0.7", features = ["io"] }
git-version = "0.3.5"
async-trait = "0.1"
once_cell = "1.10.0"
futures = "0.3.13"
toml_edit = { version = "0.13", features = ["easy"] }
postgres_ffi = { path = "../libs/postgres_ffi" }
metrics = { path = "../libs/metrics" }

View File

@@ -6,22 +6,27 @@ use clap::{App, Arg};
use const_format::formatcp;
use daemonize::Daemonize;
use fs2::FileExt;
use remote_storage::RemoteStorageConfig;
use std::fs::{self, File};
use std::io::{ErrorKind, Write};
use std::path::{Path, PathBuf};
use std::thread;
use tokio::sync::mpsc;
use toml_edit::Document;
use tracing::*;
use url::{ParseError, Url};
use safekeeper::control_file::{self};
use safekeeper::defaults::{DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR};
use safekeeper::defaults::{
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR, DEFAULT_WAL_BACKUP_RUNTIME_THREADS,
};
use safekeeper::http;
use safekeeper::remove_wal;
use safekeeper::timeline::GlobalTimelines;
use safekeeper::wal_backup;
use safekeeper::wal_service;
use safekeeper::SafeKeeperConf;
use safekeeper::{broker, callmemaybe};
use safekeeper::{http, s3_offload};
use utils::{
http::endpoint, logging, project_git_version, shutdown::exit_now, signals, tcp_listener,
zid::NodeId,
@@ -71,12 +76,6 @@ fn main() -> anyhow::Result<()> {
.long("pageserver")
.takes_value(true),
)
.arg(
Arg::new("ttl")
.long("ttl")
.takes_value(true)
.help("interval for keeping WAL at safekeeper node, after which them will be uploaded to S3 and removed locally"),
)
.arg(
Arg::new("recall")
.long("recall")
@@ -118,12 +117,20 @@ fn main() -> anyhow::Result<()> {
.help("a prefix to always use when polling/pusing data in etcd from this safekeeper"),
)
.arg(
Arg::new("enable-s3-offload")
.long("enable-s3-offload")
Arg::new("wal-backup-threads").long("backup-threads").takes_value(true).help(formatcp!("number of threads for wal backup (default {DEFAULT_WAL_BACKUP_RUNTIME_THREADS}")),
).arg(
Arg::new("remote-storage")
.long("remote-storage")
.takes_value(true)
.help("Remote storage configuration for WAL backup (offloading to s3) as TOML inline table, e.g. {\"max_concurrent_syncs\" = 17, \"max_sync_errors\": 13, \"bucket_name\": \"<BUCKETNAME>\", \"bucket_region\":\"<REGION>\", \"concurrency_limit\": 119}.\nSafekeeper offloads WAL to [prefix_in_bucket/]<tenant_id>/<timeline_id>/<segment_file>, mirroring structure on the file system.")
)
.arg(
Arg::new("enable-wal-backup")
.long("enable-wal-backup")
.takes_value(true)
.default_value("true")
.default_missing_value("true")
.help("Enable/disable s3 offloading. When disabled, safekeeper removes WAL ignoring s3 WAL horizon."),
.help("Enable/disable WAL backup to s3. When disabled, safekeeper removes WAL ignoring WAL backup horizon."),
)
.get_matches();
@@ -157,10 +164,6 @@ fn main() -> anyhow::Result<()> {
conf.listen_http_addr = addr.to_owned();
}
if let Some(ttl) = arg_matches.value_of("ttl") {
conf.ttl = Some(humantime::parse_duration(ttl)?);
}
if let Some(recall) = arg_matches.value_of("recall") {
conf.recall_period = humantime::parse_duration(recall)?;
}
@@ -182,9 +185,21 @@ fn main() -> anyhow::Result<()> {
conf.broker_etcd_prefix = prefix.to_string();
}
if let Some(backup_threads) = arg_matches.value_of("wal-backup-threads") {
conf.backup_runtime_threads = backup_threads
.parse()
.with_context(|| format!("Failed to parse backup threads {}", backup_threads))?;
}
if let Some(storage_conf) = arg_matches.value_of("remote-storage") {
// funny toml doesn't consider plain inline table as valid document, so wrap in a key to parse
let storage_conf_toml = format!("remote_storage = {}", storage_conf);
let parsed_toml = storage_conf_toml.parse::<Document>()?; // parse
let (_, storage_conf_parsed_toml) = parsed_toml.iter().next().unwrap(); // and strip key off again
conf.remote_storage = Some(RemoteStorageConfig::from_toml(storage_conf_parsed_toml)?);
}
// Seems like there is no better way to accept bool values explicitly in clap.
conf.s3_offload_enabled = arg_matches
.value_of("enable-s3-offload")
conf.wal_backup_enabled = arg_matches
.value_of("enable-wal-backup")
.unwrap()
.parse()
.context("failed to parse bool enable-s3-offload bool")?;
@@ -252,7 +267,8 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bo
let signals = signals::install_shutdown_handlers()?;
let mut threads = vec![];
let (callmemaybe_tx, callmemaybe_rx) = mpsc::unbounded_channel();
GlobalTimelines::set_callmemaybe_tx(callmemaybe_tx);
let (wal_backup_launcher_tx, wal_backup_launcher_rx) = mpsc::channel(100);
GlobalTimelines::init(callmemaybe_tx, wal_backup_launcher_tx);
let conf_ = conf.clone();
threads.push(
@@ -270,17 +286,6 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bo
})?,
);
if conf.ttl.is_some() {
let conf_ = conf.clone();
threads.push(
thread::Builder::new()
.name("S3 offload thread".into())
.spawn(|| {
s3_offload::thread_main(conf_);
})?,
);
}
let conf_cloned = conf.clone();
let safekeeper_thread = thread::Builder::new()
.name("Safekeeper thread".into())
@@ -330,6 +335,15 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bo
})?,
);
let conf_ = conf.clone();
threads.push(
thread::Builder::new()
.name("wal backup launcher thread".into())
.spawn(move || {
wal_backup::wal_backup_launcher_thread_main(conf_, wal_backup_launcher_rx);
})?,
);
// TODO: put more thoughts into handling of failed threads
// We probably should restart them.

View File

@@ -1,5 +1,6 @@
//! Communication with etcd, providing safekeeper peers and pageserver coordination.
use anyhow::anyhow;
use anyhow::Context;
use anyhow::Error;
use anyhow::Result;
@@ -7,9 +8,11 @@ use etcd_broker::Client;
use etcd_broker::PutOptions;
use etcd_broker::SkTimelineSubscriptionKind;
use std::time::Duration;
use tokio::spawn;
use tokio::task::JoinHandle;
use tokio::{runtime, time::sleep};
use tracing::*;
use url::Url;
use crate::{timeline::GlobalTimelines, SafeKeeperConf};
use utils::zid::{NodeId, ZTenantTimelineId};
@@ -44,6 +47,118 @@ fn timeline_safekeeper_path(
)
}
pub struct Election {
pub election_name: String,
pub candidate_name: String,
pub broker_endpoints: Vec<Url>,
}
impl Election {
pub fn new(election_name: String, candidate_name: String, broker_endpoints: Vec<Url>) -> Self {
Self {
election_name,
candidate_name,
broker_endpoints,
}
}
}
pub struct ElectionLeader {
client: Client,
keep_alive: JoinHandle<Result<()>>,
}
impl ElectionLeader {
pub async fn check_am_i(
&mut self,
election_name: String,
candidate_name: String,
) -> Result<bool> {
let resp = self.client.leader(election_name).await?;
let kv = resp.kv().ok_or(anyhow!("failed to get leader response"))?;
let leader = kv.value_str()?;
Ok(leader == candidate_name)
}
pub async fn give_up(self) {
// self.keep_alive.abort();
// TODO: it'll be wise to resign here but it'll happen after lease expiration anyway
// should we await for keep alive termination?
let _ = self.keep_alive.await;
}
}
pub async fn get_leader(req: &Election) -> Result<ElectionLeader> {
let mut client = Client::connect(req.broker_endpoints.clone(), None)
.await
.context("Could not connect to etcd")?;
let lease = client
.lease_grant(LEASE_TTL_SEC, None)
.await
.context("Could not acquire a lease");
let lease_id = lease.map(|l| l.id()).unwrap();
let keep_alive = spawn::<_>(lease_keep_alive(client.clone(), lease_id));
if let Err(e) = client
.campaign(
req.election_name.clone(),
req.candidate_name.clone(),
lease_id,
)
.await
{
keep_alive.abort();
let _ = keep_alive.await;
return Err(e.into());
}
Ok(ElectionLeader { client, keep_alive })
}
async fn lease_keep_alive(mut client: Client, lease_id: i64) -> Result<()> {
let (mut keeper, mut ka_stream) = client
.lease_keep_alive(lease_id)
.await
.context("failed to create keepalive stream")?;
loop {
let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC);
keeper
.keep_alive()
.await
.context("failed to send LeaseKeepAliveRequest")?;
ka_stream
.message()
.await
.context("failed to receive LeaseKeepAliveResponse")?;
sleep(push_interval).await;
}
}
pub fn get_campaign_name(
election_name: String,
broker_prefix: String,
timeline_id: &ZTenantTimelineId,
) -> String {
return format!(
"{}/{}",
SkTimelineSubscriptionKind::timeline(broker_prefix, *timeline_id).watch_key(),
election_name
);
}
pub fn get_candiate_name(system_id: NodeId) -> String {
format!("id_{}", system_id)
}
/// Push once in a while data about all active timelines to the broker.
async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
let mut client = Client::connect(&conf.broker_endpoints, None).await?;
@@ -59,7 +174,7 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
// sensitive and there is no risk of deadlock as we don't await while
// lock is held.
for zttid in GlobalTimelines::get_active_timelines() {
if let Ok(tli) = GlobalTimelines::get(&conf, zttid, false) {
if let Some(tli) = GlobalTimelines::get_loaded(zttid) {
let sk_info = tli.get_public_info(&conf)?;
let put_opts = PutOptions::new().with_lease(lease.id());
client
@@ -106,12 +221,13 @@ async fn pull_loop(conf: SafeKeeperConf) -> Result<()> {
// note: there are blocking operations below, but it's considered fine for now
if let Ok(tli) = GlobalTimelines::get(&conf, zttid, false) {
for (safekeeper_id, info) in sk_info {
tli.record_safekeeper_info(&info, safekeeper_id)?
tli.record_safekeeper_info(&info, safekeeper_id).await?
}
}
}
}
None => {
// XXX it means we lost connection with etcd, error is consumed inside sub object
debug!("timeline updates sender closed, aborting the pull loop");
return Ok(());
}
@@ -142,11 +258,12 @@ async fn main_loop(conf: SafeKeeperConf) {
},
res = async { pull_handle.as_mut().unwrap().await }, if pull_handle.is_some() => {
// was it panic or normal error?
let err = match res {
Ok(res_internal) => res_internal.unwrap_err(),
Err(err_outer) => err_outer.into(),
match res {
Ok(res_internal) => if let Err(err_inner) = res_internal {
warn!("pull task failed: {:?}", err_inner);
}
Err(err_outer) => { warn!("pull task panicked: {:?}", err_outer) }
};
warn!("pull task failed: {:?}", err);
pull_handle = None;
},
_ = ticker.tick() => {

View File

@@ -165,7 +165,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
timeline_start_lsn: Lsn(0),
local_start_lsn: Lsn(0),
commit_lsn: oldstate.commit_lsn,
s3_wal_lsn: Lsn(0),
backup_lsn: Lsn(0),
peer_horizon_lsn: oldstate.truncate_lsn,
remote_consistent_lsn: Lsn(0),
peers: Peers(vec![]),
@@ -188,7 +188,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
timeline_start_lsn: Lsn(0),
local_start_lsn: Lsn(0),
commit_lsn: oldstate.commit_lsn,
s3_wal_lsn: Lsn(0),
backup_lsn: Lsn(0),
peer_horizon_lsn: oldstate.truncate_lsn,
remote_consistent_lsn: Lsn(0),
peers: Peers(vec![]),
@@ -211,7 +211,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
timeline_start_lsn: Lsn(0),
local_start_lsn: Lsn(0),
commit_lsn: oldstate.commit_lsn,
s3_wal_lsn: Lsn(0),
backup_lsn: Lsn(0),
peer_horizon_lsn: oldstate.truncate_lsn,
remote_consistent_lsn: Lsn(0),
peers: Peers(vec![]),
@@ -234,7 +234,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
timeline_start_lsn: Lsn(0),
local_start_lsn: Lsn(0),
commit_lsn: oldstate.commit_lsn,
s3_wal_lsn: Lsn(0),
backup_lsn: Lsn::INVALID,
peer_horizon_lsn: oldstate.peer_horizon_lsn,
remote_consistent_lsn: Lsn(0),
peers: Peers(vec![]),

View File

@@ -70,19 +70,19 @@ struct TimelineStatus {
timeline_id: ZTimelineId,
acceptor_state: AcceptorStateStatus,
#[serde(serialize_with = "display_serialize")]
flush_lsn: Lsn,
#[serde(serialize_with = "display_serialize")]
timeline_start_lsn: Lsn,
#[serde(serialize_with = "display_serialize")]
local_start_lsn: Lsn,
#[serde(serialize_with = "display_serialize")]
commit_lsn: Lsn,
#[serde(serialize_with = "display_serialize")]
s3_wal_lsn: Lsn,
backup_lsn: Lsn,
#[serde(serialize_with = "display_serialize")]
peer_horizon_lsn: Lsn,
#[serde(serialize_with = "display_serialize")]
remote_consistent_lsn: Lsn,
#[serde(serialize_with = "display_serialize")]
flush_lsn: Lsn,
}
/// Report info about timeline.
@@ -107,13 +107,13 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
tenant_id: zttid.tenant_id,
timeline_id: zttid.timeline_id,
acceptor_state: acc_state,
flush_lsn,
timeline_start_lsn: state.timeline_start_lsn,
local_start_lsn: state.local_start_lsn,
commit_lsn: inmem.commit_lsn,
s3_wal_lsn: inmem.s3_wal_lsn,
backup_lsn: inmem.backup_lsn,
peer_horizon_lsn: inmem.peer_horizon_lsn,
remote_consistent_lsn: inmem.remote_consistent_lsn,
flush_lsn,
};
json_response(StatusCode::OK, status)
}
@@ -148,7 +148,9 @@ async fn timeline_delete_force_handler(
ensure_no_body(&mut request).await?;
json_response(
StatusCode::OK,
GlobalTimelines::delete_force(get_conf(&request), &zttid).map_err(ApiError::from_err)?,
GlobalTimelines::delete_force(get_conf(&request), &zttid)
.await
.map_err(ApiError::from_err)?,
)
}
@@ -162,6 +164,7 @@ async fn tenant_delete_force_handler(
json_response(
StatusCode::OK,
GlobalTimelines::delete_force_all_for_tenant(get_conf(&request), &tenant_id)
.await
.map_err(ApiError::from_err)?
.iter()
.map(|(zttid, resp)| (format!("{}", zttid.timeline_id), *resp))
@@ -178,7 +181,8 @@ async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<B
let safekeeper_info: SkTimelineInfo = json_request(&mut request).await?;
let tli = GlobalTimelines::get(get_conf(&request), zttid, false).map_err(ApiError::from_err)?;
tli.record_safekeeper_info(&safekeeper_info, NodeId(1))?;
tli.record_safekeeper_info(&safekeeper_info, NodeId(1))
.await?;
json_response(StatusCode::OK, ())
}

View File

@@ -1,4 +1,6 @@
use defaults::DEFAULT_WAL_BACKUP_RUNTIME_THREADS;
//
use remote_storage::RemoteStorageConfig;
use std::path::PathBuf;
use std::time::Duration;
use url::Url;
@@ -14,10 +16,10 @@ pub mod http;
pub mod json_ctrl;
pub mod receive_wal;
pub mod remove_wal;
pub mod s3_offload;
pub mod safekeeper;
pub mod send_wal;
pub mod timeline;
pub mod wal_backup;
pub mod wal_service;
pub mod wal_storage;
@@ -31,6 +33,7 @@ pub mod defaults {
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 7676;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
pub const DEFAULT_RECALL_PERIOD: Duration = Duration::from_secs(10);
pub const DEFAULT_WAL_BACKUP_RUNTIME_THREADS: usize = 8;
}
#[derive(Debug, Clone)]
@@ -47,12 +50,13 @@ pub struct SafeKeeperConf {
pub no_sync: bool,
pub listen_pg_addr: String,
pub listen_http_addr: String,
pub ttl: Option<Duration>,
pub recall_period: Duration,
pub remote_storage: Option<RemoteStorageConfig>,
pub backup_runtime_threads: usize,
pub wal_backup_enabled: bool,
pub my_id: NodeId,
pub broker_endpoints: Vec<Url>,
pub broker_etcd_prefix: String,
pub s3_offload_enabled: bool,
}
impl SafeKeeperConf {
@@ -77,12 +81,13 @@ impl Default for SafeKeeperConf {
no_sync: false,
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
ttl: None,
remote_storage: None,
recall_period: defaults::DEFAULT_RECALL_PERIOD,
my_id: NodeId(0),
broker_endpoints: Vec::new(),
broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(),
s3_offload_enabled: true,
backup_runtime_threads: DEFAULT_WAL_BACKUP_RUNTIME_THREADS,
wal_backup_enabled: true,
}
}
}

View File

@@ -85,16 +85,10 @@ impl<'pg> ReceiveWalConn<'pg> {
_ => bail!("unexpected message {:?} instead of greeting", next_msg),
}
// Register the connection and defer unregister.
spg.timeline
.get()
.on_compute_connect(self.pageserver_connstr.as_ref())?;
let _guard = ComputeConnectionGuard {
timeline: Arc::clone(spg.timeline.get()),
};
let mut next_msg = Some(next_msg);
let mut first_time_through = true;
let mut _guard: Option<ComputeConnectionGuard> = None;
loop {
if matches!(next_msg, Some(ProposerAcceptorMessage::AppendRequest(_))) {
// poll AppendRequest's without blocking and write WAL to disk without flushing,
@@ -122,6 +116,18 @@ impl<'pg> ReceiveWalConn<'pg> {
self.write_msg(&reply)?;
}
}
if first_time_through {
// Register the connection and defer unregister. Do that only
// after processing first message, as it sets wal_seg_size,
// wanted by many.
spg.timeline
.get()
.on_compute_connect(self.pageserver_connstr.as_ref())?;
_guard = Some(ComputeConnectionGuard {
timeline: Arc::clone(spg.timeline.get()),
});
first_time_through = false;
}
// blocking wait for the next message
if next_msg.is_none() {

View File

@@ -12,7 +12,7 @@ pub fn thread_main(conf: SafeKeeperConf) {
let active_tlis = GlobalTimelines::get_active_timelines();
for zttid in &active_tlis {
if let Ok(tli) = GlobalTimelines::get(&conf, *zttid, false) {
if let Err(e) = tli.remove_old_wal(conf.s3_offload_enabled) {
if let Err(e) = tli.remove_old_wal(conf.wal_backup_enabled) {
warn!(
"failed to remove WAL for tenant {} timeline {}: {}",
tli.zttid.tenant_id, tli.zttid.timeline_id, e

View File

@@ -1,107 +0,0 @@
//
// Offload old WAL segments to S3 and remove them locally
// Needs `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` environment variables to be set
// if no IAM bucket access is used.
//
use anyhow::{bail, Context};
use postgres_ffi::xlog_utils::*;
use remote_storage::{
GenericRemoteStorage, RemoteStorage, RemoteStorageConfig, S3Bucket, S3Config, S3ObjectKey,
};
use std::collections::HashSet;
use std::env;
use std::num::{NonZeroU32, NonZeroUsize};
use std::path::Path;
use std::time::SystemTime;
use tokio::fs::{self, File};
use tokio::io::BufReader;
use tokio::runtime;
use tokio::time::sleep;
use tracing::*;
use walkdir::WalkDir;
use crate::SafeKeeperConf;
pub fn thread_main(conf: SafeKeeperConf) {
// Create a new thread pool
//
// FIXME: keep it single-threaded for now, make it easier to debug with gdb,
// and we're not concerned with performance yet.
//let runtime = runtime::Runtime::new().unwrap();
let runtime = runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
info!("Starting S3 offload task");
runtime.block_on(async {
main_loop(&conf).await.unwrap();
});
}
async fn offload_files(
remote_storage: &S3Bucket,
listing: &HashSet<S3ObjectKey>,
dir_path: &Path,
conf: &SafeKeeperConf,
) -> anyhow::Result<u64> {
let horizon = SystemTime::now() - conf.ttl.unwrap();
let mut n: u64 = 0;
for entry in WalkDir::new(dir_path) {
let entry = entry?;
let path = entry.path();
if path.is_file()
&& IsXLogFileName(entry.file_name().to_str().unwrap())
&& entry.metadata().unwrap().created().unwrap() <= horizon
{
let remote_path = remote_storage.remote_object_id(path)?;
if !listing.contains(&remote_path) {
let file = File::open(&path).await?;
let file_length = file.metadata().await?.len() as usize;
remote_storage
.upload(BufReader::new(file), file_length, &remote_path, None)
.await?;
fs::remove_file(&path).await?;
n += 1;
}
}
}
Ok(n)
}
async fn main_loop(conf: &SafeKeeperConf) -> anyhow::Result<()> {
let remote_storage = match GenericRemoteStorage::new(
conf.workdir.clone(),
&RemoteStorageConfig {
max_concurrent_syncs: NonZeroUsize::new(10).unwrap(),
max_sync_errors: NonZeroU32::new(1).unwrap(),
storage: remote_storage::RemoteStorageKind::AwsS3(S3Config {
bucket_name: "zenith-testbucket".to_string(),
bucket_region: env::var("S3_REGION").context("S3_REGION env var is not set")?,
prefix_in_bucket: Some("walarchive/".to_string()),
endpoint: Some(env::var("S3_ENDPOINT").context("S3_ENDPOINT env var is not set")?),
concurrency_limit: NonZeroUsize::new(20).unwrap(),
}),
},
)? {
GenericRemoteStorage::Local(_) => {
bail!("Unexpected: got local storage for the remote config")
}
GenericRemoteStorage::S3(remote_storage) => remote_storage,
};
loop {
let listing = remote_storage
.list()
.await?
.into_iter()
.collect::<HashSet<_>>();
let n = offload_files(&remote_storage, &listing, &conf.workdir, conf).await?;
info!("Offload {n} files to S3");
sleep(conf.ttl.unwrap()).await;
}
}

View File

@@ -19,6 +19,7 @@ use lazy_static::lazy_static;
use crate::control_file;
use crate::send_wal::HotStandbyFeedback;
use crate::wal_storage;
use metrics::{register_gauge_vec, Gauge, GaugeVec};
use postgres_ffi::xlog_utils::MAX_SEND_SIZE;
@@ -141,7 +142,7 @@ pub struct ServerInfo {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PeerInfo {
/// LSN up to which safekeeper offloaded WAL to s3.
s3_wal_lsn: Lsn,
backup_lsn: Lsn,
/// Term of the last entry.
term: Term,
/// LSN of the last record.
@@ -153,7 +154,7 @@ pub struct PeerInfo {
impl PeerInfo {
fn new() -> Self {
Self {
s3_wal_lsn: Lsn(0),
backup_lsn: Lsn::INVALID,
term: INVALID_TERM,
flush_lsn: Lsn(0),
commit_lsn: Lsn(0),
@@ -193,9 +194,9 @@ pub struct SafeKeeperState {
/// Part of WAL acknowledged by quorum and available locally. Always points
/// to record boundary.
pub commit_lsn: Lsn,
/// First LSN not yet offloaded to s3. Useful to persist to avoid finding
/// out offloading progress on boot.
pub s3_wal_lsn: Lsn,
/// LSN that points to the end of the last backed up segment. Useful to
/// persist to avoid finding out offloading progress on boot.
pub backup_lsn: Lsn,
/// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn
/// of last record streamed to everyone). Persisting it helps skipping
/// recovery in walproposer, generally we compute it from peers. In
@@ -217,7 +218,7 @@ pub struct SafeKeeperState {
// are not flushed yet.
pub struct SafekeeperMemState {
pub commit_lsn: Lsn,
pub s3_wal_lsn: Lsn, // TODO: keep only persistent version
pub backup_lsn: Lsn,
pub peer_horizon_lsn: Lsn,
pub remote_consistent_lsn: Lsn,
pub proposer_uuid: PgUuid,
@@ -241,7 +242,7 @@ impl SafeKeeperState {
timeline_start_lsn: Lsn(0),
local_start_lsn: Lsn(0),
commit_lsn: Lsn(0),
s3_wal_lsn: Lsn(0),
backup_lsn: Lsn::INVALID,
peer_horizon_lsn: Lsn(0),
remote_consistent_lsn: Lsn(0),
peers: Peers(peers.iter().map(|p| (*p, PeerInfo::new())).collect()),
@@ -559,7 +560,7 @@ where
epoch_start_lsn: Lsn(0),
inmem: SafekeeperMemState {
commit_lsn: state.commit_lsn,
s3_wal_lsn: state.s3_wal_lsn,
backup_lsn: state.backup_lsn,
peer_horizon_lsn: state.peer_horizon_lsn,
remote_consistent_lsn: state.remote_consistent_lsn,
proposer_uuid: state.proposer_uuid,
@@ -649,7 +650,6 @@ where
self.state.persist(&state)?;
}
// pass wal_seg_size to read WAL and find flush_lsn
self.wal_store.init_storage(&self.state)?;
info!(
@@ -764,6 +764,14 @@ where
self.inmem.commit_lsn = commit_lsn;
self.metrics.commit_lsn.set(self.inmem.commit_lsn.0 as f64);
// We got our first commit_lsn, which means we should sync
// everything to disk, to initialize the state.
if self.state.commit_lsn == Lsn::INVALID && commit_lsn != Lsn::INVALID {
self.inmem.backup_lsn = self.inmem.commit_lsn; // initialize backup_lsn
self.wal_store.flush_wal()?;
self.persist_control_file()?;
}
// If new commit_lsn reached epoch switch, force sync of control
// file: walproposer in sync mode is very interested when this
// happens. Note: this is for sync-safekeepers mode only, as
@@ -775,22 +783,14 @@ where
self.persist_control_file()?;
}
// We got our first commit_lsn, which means we should sync
// everything to disk, to initialize the state.
if self.state.commit_lsn == Lsn(0) && commit_lsn > Lsn(0) {
self.wal_store.flush_wal()?;
self.persist_control_file()?;
}
Ok(())
}
/// Persist in-memory state to the disk.
fn persist_control_file(&mut self) -> Result<()> {
let mut state = self.state.clone();
state.commit_lsn = self.inmem.commit_lsn;
state.s3_wal_lsn = self.inmem.s3_wal_lsn;
state.backup_lsn = self.inmem.backup_lsn;
state.peer_horizon_lsn = self.inmem.peer_horizon_lsn;
state.remote_consistent_lsn = self.inmem.remote_consistent_lsn;
state.proposer_uuid = self.inmem.proposer_uuid;
@@ -898,11 +898,11 @@ where
self.update_commit_lsn()?;
}
}
if let Some(s3_wal_lsn) = sk_info.s3_wal_lsn {
let new_s3_wal_lsn = max(s3_wal_lsn, self.inmem.s3_wal_lsn);
if let Some(backup_lsn) = sk_info.backup_lsn {
let new_backup_lsn = max(backup_lsn, self.inmem.backup_lsn);
sync_control_file |=
self.state.s3_wal_lsn + (self.state.server.wal_seg_size as u64) < new_s3_wal_lsn;
self.inmem.s3_wal_lsn = new_s3_wal_lsn;
self.state.backup_lsn + (self.state.server.wal_seg_size as u64) < new_backup_lsn;
self.inmem.backup_lsn = new_backup_lsn;
}
if let Some(remote_consistent_lsn) = sk_info.remote_consistent_lsn {
let new_remote_consistent_lsn =
@@ -930,29 +930,23 @@ where
/// offloading.
/// While it is safe to use inmem values for determining horizon,
/// we use persistent to make possible normal states less surprising.
pub fn get_horizon_segno(&self, s3_offload_enabled: bool) -> XLogSegNo {
let s3_offload_horizon = if s3_offload_enabled {
self.state.s3_wal_lsn
} else {
Lsn(u64::MAX)
};
let horizon_lsn = min(
min(
self.state.remote_consistent_lsn,
self.state.peer_horizon_lsn,
),
s3_offload_horizon,
pub fn get_horizon_segno(&self, wal_backup_enabled: bool) -> XLogSegNo {
let mut horizon_lsn = min(
self.state.remote_consistent_lsn,
self.state.peer_horizon_lsn,
);
if wal_backup_enabled {
horizon_lsn = min(horizon_lsn, self.state.backup_lsn);
}
horizon_lsn.segment_number(self.state.server.wal_seg_size as usize)
}
}
#[cfg(test)]
mod tests {
use std::ops::Deref;
use super::*;
use crate::wal_storage::Storage;
use std::ops::Deref;
// fake storage for tests
struct InMemoryState {
@@ -1013,6 +1007,7 @@ mod tests {
};
let wal_store = DummyWalStore { lsn: Lsn(0) };
let ztli = ZTimelineId::from([0u8; 16]);
let mut sk = SafeKeeper::new(ztli, storage, wal_store, NodeId(0)).unwrap();
// check voting for 1 is ok
@@ -1028,6 +1023,7 @@ mod tests {
let storage = InMemoryState {
persisted_state: state,
};
sk = SafeKeeper::new(ztli, storage, sk.wal_store, NodeId(0)).unwrap();
// and ensure voting second time for 1 is not ok
@@ -1045,6 +1041,7 @@ mod tests {
};
let wal_store = DummyWalStore { lsn: Lsn(0) };
let ztli = ZTimelineId::from([0u8; 16]);
let mut sk = SafeKeeper::new(ztli, storage, wal_store, NodeId(0)).unwrap();
let mut ar_hdr = AppendRequestHeader {

View File

@@ -315,7 +315,7 @@ impl ReplicationConn {
} else {
// TODO: also check once in a while whether we are walsender
// to right pageserver.
if spg.timeline.get().check_deactivate(replica_id)? {
if spg.timeline.get().stop_walsender(replica_id)? {
// Shut down, timeline is suspended.
// TODO create proper error type for this
bail!("end streaming to {:?}", spg.appname);

View File

@@ -8,6 +8,7 @@ use lazy_static::lazy_static;
use postgres_ffi::xlog_utils::XLogSegNo;
use serde::Serialize;
use tokio::sync::watch;
use std::cmp::{max, min};
use std::collections::HashMap;
@@ -15,7 +16,7 @@ use std::fs::{self};
use std::sync::{Arc, Condvar, Mutex, MutexGuard};
use std::time::Duration;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::mpsc::{Sender, UnboundedSender};
use tracing::*;
use utils::{
@@ -25,13 +26,13 @@ use utils::{
};
use crate::callmemaybe::{CallmeEvent, SubscriptionStateKey};
use crate::control_file;
use crate::safekeeper::{
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState,
SafekeeperMemState,
};
use crate::send_wal::HotStandbyFeedback;
use crate::wal_storage;
use crate::wal_storage::Storage as wal_storage_iface;
use crate::SafeKeeperConf;
@@ -81,10 +82,14 @@ struct SharedState {
notified_commit_lsn: Lsn,
/// State of replicas
replicas: Vec<Option<ReplicaState>>,
/// Inactive clusters shouldn't occupy any resources, so timeline is
/// activated whenever there is a compute connection or pageserver is not
/// caughtup (it must have latest WAL for new compute start) and suspended
/// otherwise.
/// True when WAL backup launcher oversees the timeline, making sure WAL is
/// offloaded, allows to bother launcher less.
wal_backup_active: bool,
/// True whenever there is at least some pending activity on timeline: live
/// compute connection, pageserver is not caughtup (it must have latest WAL
/// for new compute start) or WAL backuping is not finished. Practically it
/// means safekeepers broadcast info to peers about the timeline, old WAL is
/// trimmed.
///
/// TODO: it might be better to remove tli completely from GlobalTimelines
/// when tli is inactive instead of having this flag.
@@ -103,6 +108,7 @@ impl SharedState {
) -> Result<Self> {
let state = SafeKeeperState::new(zttid, peer_ids);
let control_store = control_file::FileStorage::create_new(zttid, conf, state)?;
let wal_store = wal_storage::PhysicalStorage::new(zttid, conf);
let sk = SafeKeeper::new(zttid.timeline_id, control_store, wal_store, conf.my_id)?;
@@ -110,6 +116,7 @@ impl SharedState {
notified_commit_lsn: Lsn(0),
sk,
replicas: Vec::new(),
wal_backup_active: false,
active: false,
num_computes: 0,
pageserver_connstr: None,
@@ -129,15 +136,62 @@ impl SharedState {
notified_commit_lsn: Lsn(0),
sk: SafeKeeper::new(zttid.timeline_id, control_store, wal_store, conf.my_id)?,
replicas: Vec::new(),
wal_backup_active: false,
active: false,
num_computes: 0,
pageserver_connstr: None,
last_removed_segno: 0,
})
}
fn is_active(&self) -> bool {
self.is_wal_backup_required()
// FIXME: add tracking of relevant pageservers and check them here individually,
// otherwise migration won't work (we suspend too early).
|| self.sk.inmem.remote_consistent_lsn <= self.sk.inmem.commit_lsn
}
/// Activate the timeline: start/change walsender (via callmemaybe).
fn activate(
/// Mark timeline active/inactive and return whether s3 offloading requires
/// start/stop action.
fn update_status(&mut self) -> bool {
self.active = self.is_active();
self.is_wal_backup_action_pending()
}
/// Should we run s3 offloading in current state?
fn is_wal_backup_required(&self) -> bool {
let seg_size = self.get_wal_seg_size();
self.num_computes > 0 ||
// Currently only the whole segment is offloaded, so compare segment numbers.
(self.sk.inmem.commit_lsn.segment_number(seg_size) >
self.sk.inmem.backup_lsn.segment_number(seg_size))
}
/// Is current state of s3 offloading is not what it ought to be?
fn is_wal_backup_action_pending(&self) -> bool {
let res = self.wal_backup_active != self.is_wal_backup_required();
if res {
let action_pending = if self.is_wal_backup_required() {
"start"
} else {
"stop"
};
trace!(
"timeline {} s3 offloading action {} pending: num_computes={}, commit_lsn={}, backup_lsn={}",
self.sk.state.timeline_id, action_pending, self.num_computes, self.sk.inmem.commit_lsn, self.sk.inmem.backup_lsn
);
}
res
}
/// Returns whether s3 offloading is required and sets current status as
/// matching.
fn wal_backup_attend(&mut self) -> bool {
self.wal_backup_active = self.is_wal_backup_required();
self.wal_backup_active
}
/// start/change walsender (via callmemaybe).
fn callmemaybe_sub(
&mut self,
zttid: &ZTenantTimelineId,
pageserver_connstr: Option<&String>,
@@ -179,42 +233,42 @@ impl SharedState {
);
}
self.pageserver_connstr = pageserver_connstr.map(|c| c.to_owned());
self.active = true;
Ok(())
}
/// Deactivate the timeline: stop callmemaybe.
fn deactivate(
fn callmemaybe_unsub(
&mut self,
zttid: &ZTenantTimelineId,
callmemaybe_tx: &UnboundedSender<CallmeEvent>,
) -> Result<()> {
if self.active {
if let Some(ref pageserver_connstr) = self.pageserver_connstr {
let subscription_key = SubscriptionStateKey::new(
zttid.tenant_id,
zttid.timeline_id,
pageserver_connstr.to_owned(),
);
callmemaybe_tx
.send(CallmeEvent::Unsubscribe(subscription_key))
.unwrap_or_else(|e| {
error!(
"failed to send Unsubscribe request to callmemaybe thread {}",
e
);
});
info!(
"timeline {} is unsubscribed from callmemaybe to {}",
zttid.timeline_id,
self.pageserver_connstr.as_ref().unwrap()
);
}
self.active = false;
if let Some(ref pageserver_connstr) = self.pageserver_connstr {
let subscription_key = SubscriptionStateKey::new(
zttid.tenant_id,
zttid.timeline_id,
pageserver_connstr.to_owned(),
);
callmemaybe_tx
.send(CallmeEvent::Unsubscribe(subscription_key))
.unwrap_or_else(|e| {
error!(
"failed to send Unsubscribe request to callmemaybe thread {}",
e
);
});
info!(
"timeline {} is unsubscribed from callmemaybe to {}",
zttid.timeline_id,
self.pageserver_connstr.as_ref().unwrap()
);
}
Ok(())
}
fn get_wal_seg_size(&self) -> usize {
self.sk.state.server.wal_seg_size as usize
}
/// Get combined state of all alive replicas
pub fn get_replicas_state(&self) -> ReplicaState {
let mut acc = ReplicaState::new();
@@ -278,6 +332,13 @@ impl SharedState {
pub struct Timeline {
pub zttid: ZTenantTimelineId,
pub callmemaybe_tx: UnboundedSender<CallmeEvent>,
/// Sending here asks for wal backup launcher attention (start/stop
/// offloading). Sending zttid instead of concrete command allows to do
/// sending without timeline lock.
wal_backup_launcher_tx: Sender<ZTenantTimelineId>,
commit_lsn_watch_tx: watch::Sender<Lsn>,
/// For breeding receivers.
commit_lsn_watch_rx: watch::Receiver<Lsn>,
mutex: Mutex<SharedState>,
/// conditional variable used to notify wal senders
cond: Condvar,
@@ -287,11 +348,17 @@ impl Timeline {
fn new(
zttid: ZTenantTimelineId,
callmemaybe_tx: UnboundedSender<CallmeEvent>,
wal_backup_launcher_tx: Sender<ZTenantTimelineId>,
shared_state: SharedState,
) -> Timeline {
let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
watch::channel(shared_state.sk.inmem.commit_lsn);
Timeline {
zttid,
callmemaybe_tx,
wal_backup_launcher_tx,
commit_lsn_watch_tx,
commit_lsn_watch_rx,
mutex: Mutex::new(shared_state),
cond: Condvar::new(),
}
@@ -301,13 +368,21 @@ impl Timeline {
/// not running yet.
/// Can fail only if channel to a static thread got closed, which is not normal at all.
pub fn on_compute_connect(&self, pageserver_connstr: Option<&String>) -> Result<()> {
let mut shared_state = self.mutex.lock().unwrap();
shared_state.num_computes += 1;
// FIXME: currently we always adopt latest pageserver connstr, but we
// should have kind of generations assigned by compute to distinguish
// the latest one or even pass it through consensus to reliably deliver
// to all safekeepers.
shared_state.activate(&self.zttid, pageserver_connstr, &self.callmemaybe_tx)?;
let is_wal_backup_action_pending: bool;
{
let mut shared_state = self.mutex.lock().unwrap();
shared_state.num_computes += 1;
is_wal_backup_action_pending = shared_state.update_status();
// FIXME: currently we always adopt latest pageserver connstr, but we
// should have kind of generations assigned by compute to distinguish
// the latest one or even pass it through consensus to reliably deliver
// to all safekeepers.
shared_state.callmemaybe_sub(&self.zttid, pageserver_connstr, &self.callmemaybe_tx)?;
}
// Wake up wal backup launcher, if offloading not started yet.
if is_wal_backup_action_pending {
self.wal_backup_launcher_tx.blocking_send(self.zttid)?;
}
Ok(())
}
@@ -315,38 +390,43 @@ impl Timeline {
/// pageserver doesn't need catchup.
/// Can fail only if channel to a static thread got closed, which is not normal at all.
pub fn on_compute_disconnect(&self) -> Result<()> {
let mut shared_state = self.mutex.lock().unwrap();
shared_state.num_computes -= 1;
// If there is no pageserver, can suspend right away; otherwise let
// walsender do that.
if shared_state.num_computes == 0 && shared_state.pageserver_connstr.is_none() {
shared_state.deactivate(&self.zttid, &self.callmemaybe_tx)?;
let is_wal_backup_action_pending: bool;
{
let mut shared_state = self.mutex.lock().unwrap();
shared_state.num_computes -= 1;
is_wal_backup_action_pending = shared_state.update_status();
}
// Wake up wal backup launcher, if it is time to stop the offloading.
if is_wal_backup_action_pending {
self.wal_backup_launcher_tx.blocking_send(self.zttid)?;
}
Ok(())
}
/// Deactivate tenant if there is no computes and pageserver is caughtup,
/// assuming the pageserver status is in replica_id.
/// Returns true if deactivated.
pub fn check_deactivate(&self, replica_id: usize) -> Result<bool> {
/// Whether we still need this walsender running?
/// TODO: check this pageserver is actually interested in this timeline.
pub fn stop_walsender(&self, replica_id: usize) -> Result<bool> {
let mut shared_state = self.mutex.lock().unwrap();
if !shared_state.active {
// already suspended
return Ok(true);
}
if shared_state.num_computes == 0 {
let replica_state = shared_state.replicas[replica_id].unwrap();
let deactivate = shared_state.notified_commit_lsn == Lsn(0) || // no data at all yet
(replica_state.last_received_lsn != Lsn::MAX && // Lsn::MAX means that we don't know the latest LSN yet.
replica_state.last_received_lsn >= shared_state.sk.inmem.commit_lsn);
if deactivate {
shared_state.deactivate(&self.zttid, &self.callmemaybe_tx)?;
let stop = shared_state.notified_commit_lsn == Lsn(0) || // no data at all yet
(replica_state.remote_consistent_lsn != Lsn::MAX && // Lsn::MAX means that we don't know the latest LSN yet.
replica_state.remote_consistent_lsn >= shared_state.sk.inmem.commit_lsn);
if stop {
shared_state.callmemaybe_unsub(&self.zttid, &self.callmemaybe_tx)?;
return Ok(true);
}
}
Ok(false)
}
/// Returns whether s3 offloading is required and sets current status as
/// matching it.
pub fn wal_backup_attend(&self) -> bool {
let mut shared_state = self.mutex.lock().unwrap();
shared_state.wal_backup_attend()
}
/// Deactivates the timeline, assuming it is being deleted.
/// Returns whether the timeline was already active.
///
@@ -354,10 +434,14 @@ impl Timeline {
/// will stop by themselves eventually (possibly with errors, but no panics). There should be no
/// compute threads (as we're deleting the timeline), actually. Some WAL may be left unsent, but
/// we're deleting the timeline anyway.
pub fn deactivate_for_delete(&self) -> Result<bool> {
let mut shared_state = self.mutex.lock().unwrap();
let was_active = shared_state.active;
shared_state.deactivate(&self.zttid, &self.callmemaybe_tx)?;
pub async fn deactivate_for_delete(&self) -> Result<bool> {
let was_active: bool;
{
let mut shared_state = self.mutex.lock().unwrap();
was_active = shared_state.active;
shared_state.callmemaybe_unsub(&self.zttid, &self.callmemaybe_tx)?;
}
self.wal_backup_launcher_tx.send(self.zttid).await?;
Ok(was_active)
}
@@ -391,6 +475,7 @@ impl Timeline {
}
// Notify caught-up WAL senders about new WAL data received
// TODO: replace-unify it with commit_lsn_watch.
fn notify_wal_senders(&self, shared_state: &mut MutexGuard<SharedState>) {
if shared_state.notified_commit_lsn < shared_state.sk.inmem.commit_lsn {
shared_state.notified_commit_lsn = shared_state.sk.inmem.commit_lsn;
@@ -398,12 +483,17 @@ impl Timeline {
}
}
pub fn get_commit_lsn_watch_rx(&self) -> watch::Receiver<Lsn> {
self.commit_lsn_watch_rx.clone()
}
/// Pass arrived message to the safekeeper.
pub fn process_msg(
&self,
msg: &ProposerAcceptorMessage,
) -> Result<Option<AcceptorProposerMessage>> {
let mut rmsg: Option<AcceptorProposerMessage>;
let commit_lsn: Lsn;
{
let mut shared_state = self.mutex.lock().unwrap();
rmsg = shared_state.sk.process_msg(msg)?;
@@ -419,15 +509,31 @@ impl Timeline {
// Ping wal sender that new data might be available.
self.notify_wal_senders(&mut shared_state);
commit_lsn = shared_state.sk.inmem.commit_lsn;
}
self.commit_lsn_watch_tx.send(commit_lsn)?;
Ok(rmsg)
}
pub fn get_wal_seg_size(&self) -> usize {
self.mutex.lock().unwrap().get_wal_seg_size()
}
pub fn get_state(&self) -> (SafekeeperMemState, SafeKeeperState) {
let shared_state = self.mutex.lock().unwrap();
(shared_state.sk.inmem.clone(), shared_state.sk.state.clone())
}
pub fn get_wal_backup_lsn(&self) -> Lsn {
self.mutex.lock().unwrap().sk.inmem.backup_lsn
}
pub fn set_wal_backup_lsn(&self, backup_lsn: Lsn) {
self.mutex.lock().unwrap().sk.inmem.backup_lsn = backup_lsn;
// we should check whether to shut down offloader, but this will be done
// soon by peer communication anyway.
}
/// Prepare public safekeeper info for reporting.
pub fn get_public_info(&self, conf: &SafeKeeperConf) -> anyhow::Result<SkTimelineInfo> {
let shared_state = self.mutex.lock().unwrap();
@@ -436,7 +542,6 @@ impl Timeline {
flush_lsn: Some(shared_state.sk.wal_store.flush_lsn()),
// note: this value is not flushed to control file yet and can be lost
commit_lsn: Some(shared_state.sk.inmem.commit_lsn),
s3_wal_lsn: Some(shared_state.sk.inmem.s3_wal_lsn),
// TODO: rework feedbacks to avoid max here
remote_consistent_lsn: Some(max(
shared_state.get_replicas_state().remote_consistent_lsn,
@@ -444,14 +549,35 @@ impl Timeline {
)),
peer_horizon_lsn: Some(shared_state.sk.inmem.peer_horizon_lsn),
safekeeper_connection_string: Some(conf.listen_pg_addr.clone()),
backup_lsn: Some(shared_state.sk.inmem.backup_lsn),
})
}
/// Update timeline state with peer safekeeper data.
pub fn record_safekeeper_info(&self, sk_info: &SkTimelineInfo, _sk_id: NodeId) -> Result<()> {
let mut shared_state = self.mutex.lock().unwrap();
shared_state.sk.record_safekeeper_info(sk_info)?;
self.notify_wal_senders(&mut shared_state);
pub async fn record_safekeeper_info(
&self,
sk_info: &SkTimelineInfo,
_sk_id: NodeId,
) -> Result<()> {
let is_wal_backup_action_pending: bool;
let commit_lsn: Lsn;
{
let mut shared_state = self.mutex.lock().unwrap();
// WAL seg size not initialized yet (no message from compute ever
// received), can't do much without it.
if shared_state.get_wal_seg_size() == 0 {
return Ok(());
}
shared_state.sk.record_safekeeper_info(sk_info)?;
self.notify_wal_senders(&mut shared_state);
is_wal_backup_action_pending = shared_state.update_status();
commit_lsn = shared_state.sk.inmem.commit_lsn;
}
self.commit_lsn_watch_tx.send(commit_lsn)?;
// Wake up wal backup launcher, if it is time to stop the offloading.
if is_wal_backup_action_pending {
self.wal_backup_launcher_tx.send(self.zttid).await?;
}
Ok(())
}
@@ -476,16 +602,16 @@ impl Timeline {
shared_state.sk.wal_store.flush_lsn()
}
pub fn remove_old_wal(&self, s3_offload_enabled: bool) -> Result<()> {
pub fn remove_old_wal(&self, wal_backup_enabled: bool) -> Result<()> {
let horizon_segno: XLogSegNo;
let remover: Box<dyn Fn(u64) -> Result<(), anyhow::Error>>;
{
let shared_state = self.mutex.lock().unwrap();
// WAL seg size not initialized yet, no WAL exists.
if shared_state.sk.state.server.wal_seg_size == 0 {
if shared_state.get_wal_seg_size() == 0 {
return Ok(());
}
horizon_segno = shared_state.sk.get_horizon_segno(s3_offload_enabled);
horizon_segno = shared_state.sk.get_horizon_segno(wal_backup_enabled);
remover = shared_state.sk.wal_store.remove_up_to();
if horizon_segno <= 1 || horizon_segno <= shared_state.last_removed_segno {
return Ok(());
@@ -522,12 +648,14 @@ impl TimelineTools for Option<Arc<Timeline>> {
struct GlobalTimelinesState {
timelines: HashMap<ZTenantTimelineId, Arc<Timeline>>,
callmemaybe_tx: Option<UnboundedSender<CallmeEvent>>,
wal_backup_launcher_tx: Option<Sender<ZTenantTimelineId>>,
}
lazy_static! {
static ref TIMELINES_STATE: Mutex<GlobalTimelinesState> = Mutex::new(GlobalTimelinesState {
timelines: HashMap::new(),
callmemaybe_tx: None
callmemaybe_tx: None,
wal_backup_launcher_tx: None,
});
}
@@ -541,10 +669,15 @@ pub struct TimelineDeleteForceResult {
pub struct GlobalTimelines;
impl GlobalTimelines {
pub fn set_callmemaybe_tx(callmemaybe_tx: UnboundedSender<CallmeEvent>) {
pub fn init(
callmemaybe_tx: UnboundedSender<CallmeEvent>,
wal_backup_launcher_tx: Sender<ZTenantTimelineId>,
) {
let mut state = TIMELINES_STATE.lock().unwrap();
assert!(state.callmemaybe_tx.is_none());
state.callmemaybe_tx = Some(callmemaybe_tx);
assert!(state.wal_backup_launcher_tx.is_none());
state.wal_backup_launcher_tx = Some(wal_backup_launcher_tx);
}
fn create_internal(
@@ -559,12 +692,14 @@ impl GlobalTimelines {
// TODO: check directory existence
let dir = conf.timeline_dir(&zttid);
fs::create_dir_all(dir)?;
let shared_state = SharedState::create(conf, &zttid, peer_ids)
.context("failed to create shared state")?;
let new_tli = Arc::new(Timeline::new(
zttid,
state.callmemaybe_tx.as_ref().unwrap().clone(),
state.wal_backup_launcher_tx.as_ref().unwrap().clone(),
shared_state,
));
state.timelines.insert(zttid, Arc::clone(&new_tli));
@@ -594,8 +729,7 @@ impl GlobalTimelines {
match state.timelines.get(&zttid) {
Some(result) => Ok(Arc::clone(result)),
None => {
let shared_state =
SharedState::restore(conf, &zttid).context("failed to restore shared state");
let shared_state = SharedState::restore(conf, &zttid);
let shared_state = match shared_state {
Ok(shared_state) => shared_state,
@@ -617,6 +751,7 @@ impl GlobalTimelines {
let new_tli = Arc::new(Timeline::new(
zttid,
state.callmemaybe_tx.as_ref().unwrap().clone(),
state.wal_backup_launcher_tx.as_ref().unwrap().clone(),
shared_state,
));
state.timelines.insert(zttid, Arc::clone(&new_tli));
@@ -625,6 +760,12 @@ impl GlobalTimelines {
}
}
/// Get loaded timeline, if it exists.
pub fn get_loaded(zttid: ZTenantTimelineId) -> Option<Arc<Timeline>> {
let state = TIMELINES_STATE.lock().unwrap();
state.timelines.get(&zttid).map(Arc::clone)
}
/// Get ZTenantTimelineIDs of all active timelines.
pub fn get_active_timelines() -> Vec<ZTenantTimelineId> {
let state = TIMELINES_STATE.lock().unwrap();
@@ -665,22 +806,23 @@ impl GlobalTimelines {
/// b) an HTTP GET request about the timeline is made and it's able to restore the current state, or
/// c) an HTTP POST request for timeline creation is made after the timeline is already deleted.
/// TODO: ensure all of the above never happens.
pub fn delete_force(
pub async fn delete_force(
conf: &SafeKeeperConf,
zttid: &ZTenantTimelineId,
) -> Result<TimelineDeleteForceResult> {
info!("deleting timeline {}", zttid);
let was_active = match TIMELINES_STATE.lock().unwrap().timelines.remove(zttid) {
None => false,
Some(tli) => tli.deactivate_for_delete()?,
};
let timeline = TIMELINES_STATE.lock().unwrap().timelines.remove(zttid);
let mut was_active = false;
if let Some(tli) = timeline {
was_active = tli.deactivate_for_delete().await?;
}
GlobalTimelines::delete_force_internal(conf, zttid, was_active)
}
/// Deactivates and deletes all timelines for the tenant, see `delete()`.
/// Returns map of all timelines which the tenant had, `true` if a timeline was active.
/// There may be a race if new timelines are created simultaneously.
pub fn delete_force_all_for_tenant(
pub async fn delete_force_all_for_tenant(
conf: &SafeKeeperConf,
tenant_id: &ZTenantId,
) -> Result<HashMap<ZTenantTimelineId, TimelineDeleteForceResult>> {
@@ -691,14 +833,15 @@ impl GlobalTimelines {
let timelines = &mut TIMELINES_STATE.lock().unwrap().timelines;
for (&zttid, tli) in timelines.iter() {
if zttid.tenant_id == *tenant_id {
to_delete.insert(zttid, tli.deactivate_for_delete()?);
to_delete.insert(zttid, tli.clone());
}
}
// TODO: test that the correct subset of timelines is removed. It's complicated because they are implicitly created currently.
timelines.retain(|zttid, _| !to_delete.contains_key(zttid));
}
let mut deleted = HashMap::new();
for (zttid, was_active) in to_delete {
for (zttid, timeline) in to_delete {
let was_active = timeline.deactivate_for_delete().await?;
deleted.insert(
zttid,
GlobalTimelines::delete_force_internal(conf, &zttid, was_active)?,

View File

@@ -0,0 +1,418 @@
use anyhow::{Context, Result};
use tokio::task::JoinHandle;
use std::cmp::min;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use postgres_ffi::xlog_utils::{XLogFileName, XLogSegNo, XLogSegNoOffsetToRecPtr, PG_TLI};
use remote_storage::{GenericRemoteStorage, RemoteStorage};
use tokio::fs::File;
use tokio::runtime::Builder;
use tokio::select;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::sync::watch;
use tokio::time::sleep;
use tracing::*;
use utils::{lsn::Lsn, zid::ZTenantTimelineId};
use crate::broker::{Election, ElectionLeader};
use crate::timeline::{GlobalTimelines, Timeline};
use crate::{broker, SafeKeeperConf};
use once_cell::sync::OnceCell;
const BACKUP_ELECTION_NAME: &str = "WAL_BACKUP";
const BROKER_CONNECTION_RETRY_DELAY_MS: u64 = 1000;
const UPLOAD_FAILURE_RETRY_MIN_MS: u64 = 10;
const UPLOAD_FAILURE_RETRY_MAX_MS: u64 = 5000;
pub fn wal_backup_launcher_thread_main(
conf: SafeKeeperConf,
wal_backup_launcher_rx: Receiver<ZTenantTimelineId>,
) {
let rt = Builder::new_multi_thread()
.worker_threads(conf.backup_runtime_threads)
.enable_all()
.build()
.expect("failed to create wal backup runtime");
rt.block_on(async {
wal_backup_launcher_main_loop(conf, wal_backup_launcher_rx).await;
});
}
/// Check whether wal backup is required for timeline and mark that launcher is
/// aware of current status (if timeline exists).
fn is_wal_backup_required(zttid: ZTenantTimelineId) -> bool {
if let Some(tli) = GlobalTimelines::get_loaded(zttid) {
tli.wal_backup_attend()
} else {
false
}
}
struct WalBackupTaskHandle {
shutdown_tx: Sender<()>,
handle: JoinHandle<()>,
}
/// Sits on wal_backup_launcher_rx and starts/stops per timeline wal backup
/// tasks. Having this in separate task simplifies locking, allows to reap
/// panics and separate elections from offloading itself.
async fn wal_backup_launcher_main_loop(
conf: SafeKeeperConf,
mut wal_backup_launcher_rx: Receiver<ZTenantTimelineId>,
) {
info!(
"wal backup launcher started, remote config {:?}",
conf.remote_storage
);
let conf_ = conf.clone();
REMOTE_STORAGE.get_or_init(|| {
conf_.remote_storage.as_ref().map(|c| {
GenericRemoteStorage::new(conf_.workdir, c).expect("failed to create remote storage")
})
});
let mut tasks: HashMap<ZTenantTimelineId, WalBackupTaskHandle> = HashMap::new();
loop {
// channel is never expected to get closed
let zttid = wal_backup_launcher_rx.recv().await.unwrap();
let is_wal_backup_required = is_wal_backup_required(zttid);
if conf.remote_storage.is_none() || !conf.wal_backup_enabled {
continue; /* just drain the channel and do nothing */
}
// do we need to do anything at all?
if is_wal_backup_required != tasks.contains_key(&zttid) {
if is_wal_backup_required {
// need to start the task
info!("starting wal backup task for {}", zttid);
// TODO: decide who should offload in launcher itself by simply checking current state
let election_name = broker::get_campaign_name(
BACKUP_ELECTION_NAME.to_string(),
conf.broker_etcd_prefix.clone(),
&zttid,
);
let my_candidate_name = broker::get_candiate_name(conf.my_id);
let election = broker::Election::new(
election_name,
my_candidate_name,
conf.broker_endpoints.clone(),
);
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
let timeline_dir = conf.timeline_dir(&zttid);
let handle = tokio::spawn(
backup_task_main(zttid, timeline_dir, shutdown_rx, election)
.instrument(info_span!("WAL backup", zttid = %zttid)),
);
tasks.insert(
zttid,
WalBackupTaskHandle {
shutdown_tx,
handle,
},
);
} else {
// need to stop the task
info!("stopping wal backup task for {}", zttid);
let wb_handle = tasks.remove(&zttid).unwrap();
// Tell the task to shutdown. Error means task exited earlier, that's ok.
let _ = wb_handle.shutdown_tx.send(()).await;
// Await the task itself. TODO: restart panicked tasks earlier.
// Hm, why I can't await on reference to handle?
if let Err(e) = wb_handle.handle.await {
warn!("WAL backup task for {} panicked: {}", zttid, e);
}
}
}
}
}
struct WalBackupTask {
timeline: Arc<Timeline>,
timeline_dir: PathBuf,
wal_seg_size: usize,
commit_lsn_watch_rx: watch::Receiver<Lsn>,
leader: Option<ElectionLeader>,
election: Election,
}
/// Offload single timeline.
async fn backup_task_main(
zttid: ZTenantTimelineId,
timeline_dir: PathBuf,
mut shutdown_rx: Receiver<()>,
election: Election,
) {
info!("started");
let timeline: Arc<Timeline> = if let Some(tli) = GlobalTimelines::get_loaded(zttid) {
tli
} else {
/* Timeline could get deleted while task was starting, just exit then. */
info!("no timeline, exiting");
return;
};
let mut wb = WalBackupTask {
wal_seg_size: timeline.get_wal_seg_size(),
commit_lsn_watch_rx: timeline.get_commit_lsn_watch_rx(),
timeline,
timeline_dir,
leader: None,
election,
};
// task is spinned up only when wal_seg_size already initialized
assert!(wb.wal_seg_size > 0);
let mut canceled = false;
select! {
_ = wb.run() => {}
_ = shutdown_rx.recv() => {
canceled = true;
}
}
if let Some(l) = wb.leader {
l.give_up().await;
}
info!("task {}", if canceled { "canceled" } else { "terminated" });
}
impl WalBackupTask {
async fn run(&mut self) {
let mut backup_lsn = Lsn(0);
// election loop
loop {
let mut retry_attempt = 0u32;
if let Some(l) = self.leader.take() {
l.give_up().await;
}
match broker::get_leader(&self.election).await {
Ok(l) => {
self.leader = Some(l);
}
Err(e) => {
error!("error during leader election {:?}", e);
sleep(Duration::from_millis(BROKER_CONNECTION_RETRY_DELAY_MS)).await;
continue;
}
}
// offload loop
loop {
if retry_attempt == 0 {
// wait for new WAL to arrive
if let Err(e) = self.commit_lsn_watch_rx.changed().await {
// should never happen, as we hold Arc to timeline.
error!("commit_lsn watch shut down: {:?}", e);
return;
}
} else {
// or just sleep if we errored previously
let mut retry_delay = UPLOAD_FAILURE_RETRY_MAX_MS;
if let Some(backoff_delay) =
UPLOAD_FAILURE_RETRY_MIN_MS.checked_shl(retry_attempt)
{
retry_delay = min(retry_delay, backoff_delay);
}
sleep(Duration::from_millis(retry_delay)).await;
}
let commit_lsn = *self.commit_lsn_watch_rx.borrow();
assert!(
commit_lsn >= backup_lsn,
"backup lsn should never pass commit lsn"
);
if backup_lsn.segment_number(self.wal_seg_size)
== commit_lsn.segment_number(self.wal_seg_size)
{
continue; /* nothing to do, common case as we wake up on every commit_lsn bump */
}
// Perhaps peers advanced the position, check shmem value.
backup_lsn = self.timeline.get_wal_backup_lsn();
if backup_lsn.segment_number(self.wal_seg_size)
== commit_lsn.segment_number(self.wal_seg_size)
{
continue;
}
if let Some(l) = self.leader.as_mut() {
// Optimization idea for later:
// Avoid checking election leader every time by returning current lease grant expiration time
// Re-check leadership only after expiration time,
// such approach woud reduce overhead on write-intensive workloads
match l
.check_am_i(
self.election.election_name.clone(),
self.election.candidate_name.clone(),
)
.await
{
Ok(leader) => {
if !leader {
info!("leader has changed");
break;
}
}
Err(e) => {
warn!("error validating leader, {:?}", e);
break;
}
}
}
match backup_lsn_range(
backup_lsn,
commit_lsn,
self.wal_seg_size,
&self.timeline_dir,
)
.await
{
Ok(backup_lsn_result) => {
backup_lsn = backup_lsn_result;
self.timeline.set_wal_backup_lsn(backup_lsn_result);
retry_attempt = 0;
}
Err(e) => {
error!(
"failed while offloading range {}-{}: {:?}",
backup_lsn, commit_lsn, e
);
retry_attempt = min(retry_attempt + 1, u32::MAX);
}
}
}
}
}
}
pub async fn backup_lsn_range(
start_lsn: Lsn,
end_lsn: Lsn,
wal_seg_size: usize,
timeline_dir: &Path,
) -> Result<Lsn> {
let mut res = start_lsn;
let segments = get_segments(start_lsn, end_lsn, wal_seg_size);
for s in &segments {
backup_single_segment(s, timeline_dir)
.await
.with_context(|| format!("offloading segno {}", s.seg_no))?;
res = s.end_lsn;
}
info!(
"offloaded segnos {:?} up to {}, previous backup_lsn {}",
segments.iter().map(|&s| s.seg_no).collect::<Vec<_>>(),
end_lsn,
start_lsn,
);
Ok(res)
}
async fn backup_single_segment(seg: &Segment, timeline_dir: &Path) -> Result<()> {
let segment_file_name = seg.file_path(timeline_dir)?;
backup_object(&segment_file_name, seg.size()).await?;
debug!("Backup of {} done", segment_file_name.display());
Ok(())
}
#[derive(Debug, Copy, Clone)]
pub struct Segment {
seg_no: XLogSegNo,
start_lsn: Lsn,
end_lsn: Lsn,
}
impl Segment {
pub fn new(seg_no: u64, start_lsn: Lsn, end_lsn: Lsn) -> Self {
Self {
seg_no,
start_lsn,
end_lsn,
}
}
pub fn object_name(self) -> String {
XLogFileName(PG_TLI, self.seg_no, self.size())
}
pub fn file_path(self, timeline_dir: &Path) -> Result<PathBuf> {
Ok(timeline_dir.join(self.object_name()))
}
pub fn size(self) -> usize {
(u64::from(self.end_lsn) - u64::from(self.start_lsn)) as usize
}
}
fn get_segments(start: Lsn, end: Lsn, seg_size: usize) -> Vec<Segment> {
let first_seg = start.segment_number(seg_size);
let last_seg = end.segment_number(seg_size);
let res: Vec<Segment> = (first_seg..last_seg)
.map(|s| {
let start_lsn = XLogSegNoOffsetToRecPtr(s, 0, seg_size);
let end_lsn = XLogSegNoOffsetToRecPtr(s + 1, 0, seg_size);
Segment::new(s, Lsn::from(start_lsn), Lsn::from(end_lsn))
})
.collect();
res
}
static REMOTE_STORAGE: OnceCell<Option<GenericRemoteStorage>> = OnceCell::new();
async fn backup_object(source_file: &Path, size: usize) -> Result<()> {
let storage = REMOTE_STORAGE.get().expect("failed to get remote storage");
let file = File::open(&source_file).await?;
// Storage is initialized by launcher at ths point.
match storage.as_ref().unwrap() {
GenericRemoteStorage::Local(local_storage) => {
let destination = local_storage.remote_object_id(source_file)?;
debug!(
"local upload about to start from {} to {}",
source_file.display(),
destination.display()
);
local_storage.upload(file, size, &destination, None).await
}
GenericRemoteStorage::S3(s3_storage) => {
let s3key = s3_storage.remote_object_id(source_file)?;
debug!(
"S3 upload about to start from {} to {:?}",
source_file.display(),
s3key
);
s3_storage.upload(file, size, &s3key, None).await
}
}?;
Ok(())
}

View File

@@ -12,7 +12,7 @@ from contextlib import closing
from dataclasses import dataclass, field
from multiprocessing import Process, Value
from pathlib import Path
from fixtures.zenith_fixtures import PgBin, Etcd, Postgres, Safekeeper, ZenithEnv, ZenithEnvBuilder, PortDistributor, SafekeeperPort, zenith_binpath, PgProtocol
from fixtures.zenith_fixtures import PgBin, Etcd, Postgres, RemoteStorageUsers, Safekeeper, ZenithEnv, ZenithEnvBuilder, PortDistributor, SafekeeperPort, zenith_binpath, PgProtocol
from fixtures.utils import get_dir_size, lsn_to_hex, mkdir_if_needed, lsn_from_hex
from fixtures.log_helper import log
from typing import List, Optional, Any
@@ -401,7 +401,7 @@ def test_wal_removal(zenith_env_builder: ZenithEnvBuilder):
http_cli = env.safekeepers[0].http_client()
# Pretend WAL is offloaded to s3.
http_cli.record_safekeeper_info(tenant_id, timeline_id, {'s3_wal_lsn': 'FFFFFFFF/FEFFFFFF'})
http_cli.record_safekeeper_info(tenant_id, timeline_id, {'backup_lsn': 'FFFFFFFF/FEFFFFFF'})
# wait till first segment is removed on all safekeepers
started_at = time.time()
@@ -414,6 +414,56 @@ def test_wal_removal(zenith_env_builder: ZenithEnvBuilder):
time.sleep(0.5)
@pytest.mark.parametrize('storage_type', ['mock_s3', 'local_fs'])
def test_wal_backup(zenith_env_builder: ZenithEnvBuilder, storage_type: str):
zenith_env_builder.num_safekeepers = 3
if storage_type == 'local_fs':
zenith_env_builder.enable_local_fs_remote_storage()
elif storage_type == 'mock_s3':
zenith_env_builder.enable_s3_mock_remote_storage('test_safekeepers_wal_backup')
else:
raise RuntimeError(f'Unknown storage type: {storage_type}')
zenith_env_builder.remote_storage_users = RemoteStorageUsers.SAFEKEEPER
env = zenith_env_builder.init_start()
env.zenith_cli.create_branch('test_safekeepers_wal_backup')
pg = env.postgres.create_start('test_safekeepers_wal_backup')
# learn zenith timeline from compute
tenant_id = pg.safe_psql("show zenith.zenith_tenant")[0][0]
timeline_id = pg.safe_psql("show zenith.zenith_timeline")[0][0]
pg_conn = pg.connect()
cur = pg_conn.cursor()
cur.execute('create table t(key int, value text)')
# Shut down subsequently each of safekeepers and fill a segment while sk is
# down; ensure segment gets offloaded by others.
offloaded_seg_end = ['0/2000000', '0/3000000', '0/4000000']
for victim, seg_end in zip(env.safekeepers, offloaded_seg_end):
victim.stop()
# roughly fills one segment
cur.execute("insert into t select generate_series(1,250000), 'payload'")
live_sk = [sk for sk in env.safekeepers if sk != victim][0]
http_cli = live_sk.http_client()
started_at = time.time()
while True:
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
log.info(f"live sk status is {tli_status}")
if lsn_from_hex(tli_status.backup_lsn) >= lsn_from_hex(seg_end):
break
elapsed = time.time() - started_at
if elapsed > 20:
raise RuntimeError(
f"timed out waiting {elapsed:.0f}s segment ending at {seg_end} get offloaded")
time.sleep(0.5)
victim.start()
class ProposerPostgres(PgProtocol):
"""Object for running postgres without ZenithEnv"""
def __init__(self,

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
from dataclasses import field
from enum import Flag, auto
import textwrap
from cached_property import cached_property
import asyncpg
@@ -421,10 +422,51 @@ class MockS3Server:
def secret_key(self) -> str:
return 'test'
def access_env_vars(self) -> Dict[Any, Any]:
return {
'AWS_ACCESS_KEY_ID': self.access_key(),
'AWS_SECRET_ACCESS_KEY': self.secret_key(),
}
def kill(self):
self.subprocess.kill()
@dataclass
class LocalFsStorage:
local_path: Path
@dataclass
class S3Storage:
bucket_name: str
bucket_region: str
endpoint: Optional[str]
RemoteStorage = Union[LocalFsStorage, S3Storage]
# serialize as toml inline table
def remote_storage_to_toml_inline_table(remote_storage):
if isinstance(remote_storage, LocalFsStorage):
res = f"local_path='{remote_storage.local_path}'"
elif isinstance(remote_storage, S3Storage):
res = f"bucket_name='{remote_storage.bucket_name}', bucket_region='{remote_storage.bucket_region}'"
if remote_storage.endpoint is not None:
res += f", endpoint='{remote_storage.endpoint}'"
else:
raise Exception(f'Unknown storage configuration {remote_storage}')
else:
raise Exception("invalid remote storage type")
return f"{{{res}}}"
class RemoteStorageUsers(Flag):
PAGESERVER = auto()
SAFEKEEPER = auto()
class ZenithEnvBuilder:
"""
Builder object to create a Zenith runtime environment
@@ -440,6 +482,7 @@ class ZenithEnvBuilder:
broker: Etcd,
mock_s3_server: MockS3Server,
remote_storage: Optional[RemoteStorage] = None,
remote_storage_users: RemoteStorageUsers = RemoteStorageUsers.PAGESERVER,
pageserver_config_override: Optional[str] = None,
num_safekeepers: int = 1,
pageserver_auth_enabled: bool = False,
@@ -449,6 +492,7 @@ class ZenithEnvBuilder:
self.rust_log_override = rust_log_override
self.port_distributor = port_distributor
self.remote_storage = remote_storage
self.remote_storage_users = remote_storage_users
self.broker = broker
self.mock_s3_server = mock_s3_server
self.pageserver_config_override = pageserver_config_override
@@ -497,9 +541,9 @@ class ZenithEnvBuilder:
aws_access_key_id=self.mock_s3_server.access_key(),
aws_secret_access_key=self.mock_s3_server.secret_key(),
).create_bucket(Bucket=bucket_name)
self.remote_storage = S3Storage(bucket=bucket_name,
self.remote_storage = S3Storage(bucket_name=bucket_name,
endpoint=mock_endpoint,
region=mock_region)
bucket_region=mock_region)
def __enter__(self):
return self
@@ -557,6 +601,7 @@ class ZenithEnv:
self.safekeepers: List[Safekeeper] = []
self.broker = config.broker
self.remote_storage = config.remote_storage
self.remote_storage_users = config.remote_storage_users
# generate initial tenant ID here instead of letting 'zenith init' generate it,
# so that we don't need to dig it out of the config file afterwards.
@@ -605,8 +650,12 @@ class ZenithEnv:
id = {id}
pg_port = {port.pg}
http_port = {port.http}
sync = false # Disable fsyncs to make the tests go faster
""")
sync = false # Disable fsyncs to make the tests go faster""")
if bool(self.remote_storage_users
& RemoteStorageUsers.SAFEKEEPER) and self.remote_storage is not None:
toml += textwrap.dedent(f"""
remote_storage = "{remote_storage_to_toml_inline_table(self.remote_storage)}"
""")
safekeeper = Safekeeper(env=self, id=id, port=port)
self.safekeepers.append(safekeeper)
@@ -638,7 +687,7 @@ def _shared_simple_env(request: Any,
mock_s3_server: MockS3Server,
default_broker: Etcd) -> Iterator[ZenithEnv]:
"""
Internal fixture backing the `zenith_simple_env` fixture. If TEST_SHARED_FIXTURES
# Internal fixture backing the `zenith_simple_env` fixture. If TEST_SHARED_FIXTURES
is set, this is shared by all tests using `zenith_simple_env`.
"""
@@ -822,20 +871,6 @@ class PageserverPort:
http: int
@dataclass
class LocalFsStorage:
root: Path
@dataclass
class S3Storage:
bucket: str
region: str
endpoint: Optional[str]
RemoteStorage = Union[LocalFsStorage, S3Storage]
CREATE_TIMELINE_ID_EXTRACTOR = re.compile(r"^Created timeline '(?P<timeline_id>[^']+)'",
re.MULTILINE)
CREATE_TIMELINE_ID_EXTRACTOR = re.compile(r"^Created timeline '(?P<timeline_id>[^']+)'",
@@ -998,6 +1033,7 @@ class ZenithCli:
append_pageserver_param_overrides(
params_to_update=cmd,
remote_storage=self.env.remote_storage,
remote_storage_users=self.env.remote_storage_users,
pageserver_config_override=self.env.pageserver.config_override)
res = self.raw_cli(cmd)
@@ -1022,14 +1058,10 @@ class ZenithCli:
append_pageserver_param_overrides(
params_to_update=start_args,
remote_storage=self.env.remote_storage,
remote_storage_users=self.env.remote_storage_users,
pageserver_config_override=self.env.pageserver.config_override)
s3_env_vars = None
if self.env.s3_mock_server:
s3_env_vars = {
'AWS_ACCESS_KEY_ID': self.env.s3_mock_server.access_key(),
'AWS_SECRET_ACCESS_KEY': self.env.s3_mock_server.secret_key(),
}
s3_env_vars = self.env.s3_mock_server.access_env_vars() if self.env.s3_mock_server else None
return self.raw_cli(start_args, extra_env_vars=s3_env_vars)
def pageserver_stop(self, immediate=False) -> 'subprocess.CompletedProcess[str]':
@@ -1041,7 +1073,8 @@ class ZenithCli:
return self.raw_cli(cmd)
def safekeeper_start(self, id: int) -> 'subprocess.CompletedProcess[str]':
return self.raw_cli(['safekeeper', 'start', str(id)])
s3_env_vars = self.env.s3_mock_server.access_env_vars() if self.env.s3_mock_server else None
return self.raw_cli(['safekeeper', 'start', str(id)], extra_env_vars=s3_env_vars)
def safekeeper_stop(self,
id: Optional[int] = None,
@@ -1237,22 +1270,13 @@ class ZenithPageserver(PgProtocol):
def append_pageserver_param_overrides(
params_to_update: List[str],
remote_storage: Optional[RemoteStorage],
remote_storage_users: RemoteStorageUsers,
pageserver_config_override: Optional[str] = None,
):
if remote_storage is not None:
if isinstance(remote_storage, LocalFsStorage):
pageserver_storage_override = f"local_path='{remote_storage.root}'"
elif isinstance(remote_storage, S3Storage):
pageserver_storage_override = f"bucket_name='{remote_storage.bucket}',\
bucket_region='{remote_storage.region}'"
if remote_storage.endpoint is not None:
pageserver_storage_override += f",endpoint='{remote_storage.endpoint}'"
else:
raise Exception(f'Unknown storage configuration {remote_storage}')
if bool(remote_storage_users & RemoteStorageUsers.PAGESERVER) and remote_storage is not None:
remote_storage_toml_table = remote_storage_to_toml_inline_table(remote_storage)
params_to_update.append(
f'--pageserver-config-override=remote_storage={{{pageserver_storage_override}}}')
f'--pageserver-config-override=remote_storage={remote_storage_toml_table}')
env_overrides = os.getenv('ZENITH_PAGESERVER_OVERRIDES')
if env_overrides is not None:
@@ -1786,8 +1810,9 @@ class Safekeeper:
class SafekeeperTimelineStatus:
acceptor_epoch: int
flush_lsn: str
remote_consistent_lsn: str
timeline_start_lsn: str
backup_lsn: str
remote_consistent_lsn: str
@dataclass
@@ -1812,8 +1837,9 @@ class SafekeeperHttpClient(requests.Session):
resj = res.json()
return SafekeeperTimelineStatus(acceptor_epoch=resj['acceptor_state']['epoch'],
flush_lsn=resj['flush_lsn'],
remote_consistent_lsn=resj['remote_consistent_lsn'],
timeline_start_lsn=resj['timeline_start_lsn'])
timeline_start_lsn=resj['timeline_start_lsn'],
backup_lsn=resj['backup_lsn'],
remote_consistent_lsn=resj['remote_consistent_lsn'])
def record_safekeeper_info(self, tenant_id: str, timeline_id: str, body):
res = self.post(