diff --git a/.circleci/ansible/production.hosts b/.circleci/ansible/production.hosts index 6cefd724d8..03c6cf57e0 100644 --- a/.circleci/ansible/production.hosts +++ b/.circleci/ansible/production.hosts @@ -16,4 +16,3 @@ console_mgmt_base_url = http://console-release.local bucket_name = zenith-storage-oregon bucket_region = us-west-2 etcd_endpoints = etcd-release.local:2379 -safekeeper_enable_s3_offload = false diff --git a/.circleci/ansible/staging.hosts b/.circleci/ansible/staging.hosts index d99ffa6dac..cf5b98eaa1 100644 --- a/.circleci/ansible/staging.hosts +++ b/.circleci/ansible/staging.hosts @@ -17,4 +17,3 @@ console_mgmt_base_url = http://console-staging.local bucket_name = zenith-staging-storage-us-east-1 bucket_region = us-east-1 etcd_endpoints = etcd-staging.local:2379 -safekeeper_enable_s3_offload = false diff --git a/.circleci/ansible/systemd/safekeeper.service b/.circleci/ansible/systemd/safekeeper.service index 55088db859..a6b443c3e7 100644 --- a/.circleci/ansible/systemd/safekeeper.service +++ b/.circleci/ansible/systemd/safekeeper.service @@ -6,7 +6,7 @@ After=network.target auditd.service Type=simple User=safekeeper Environment=RUST_BACKTRACE=1 ZENITH_REPO_DIR=/storage/safekeeper/data LD_LIBRARY_PATH=/usr/local/lib -ExecStart=/usr/local/bin/safekeeper -l {{ inventory_hostname }}.local:6500 --listen-http {{ inventory_hostname }}.local:7676 -p {{ first_pageserver }}:6400 -D /storage/safekeeper/data --broker-endpoints={{ etcd_endpoints }} --enable-s3-offload={{ safekeeper_enable_s3_offload }} +ExecStart=/usr/local/bin/safekeeper -l {{ inventory_hostname }}.local:6500 --listen-http {{ inventory_hostname }}.local:7676 -p {{ first_pageserver }}:6400 -D /storage/safekeeper/data --broker-endpoints={{ etcd_endpoints }} --remote_storage='{bucket_name={{bucket_name}}, bucket_region={{bucket_region}}, prefix_in_bucket=wal}' ExecReload=/bin/kill -HUP $MAINPID KillMode=mixed KillSignal=SIGINT diff --git a/Cargo.lock b/Cargo.lock index 840953f645..e39375c221 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1722,9 +1722,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.9.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da32515d9f6e6e489d7bc9d84c71b060db7247dc035bbe44eac88cf87486d8d5" +checksum = "87f3e037eac156d1775da914196f0f37741a274155e34a0b7e427c35d2a2ecb9" [[package]] name = "oorandom" @@ -2403,6 +2403,7 @@ dependencies = [ "tempfile", "tokio", "tokio-util 0.7.0", + "toml_edit", "tracing", "workspace_hack", ] @@ -2654,6 +2655,7 @@ name = "safekeeper" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "byteorder", "bytes", "clap 3.0.14", @@ -2662,12 +2664,14 @@ dependencies = [ "daemonize", "etcd_broker", "fs2", + "futures", "git-version", "hex", "humantime", "hyper", "lazy_static", "metrics", + "once_cell", "postgres", "postgres-protocol", "postgres_ffi", @@ -2681,6 +2685,7 @@ dependencies = [ "tokio", "tokio-postgres", "tokio-util 0.7.0", + "toml_edit", "tracing", "url", "utils", diff --git a/control_plane/src/lib.rs b/control_plane/src/lib.rs index c3469c3350..4dfca588ad 100644 --- a/control_plane/src/lib.rs +++ b/control_plane/src/lib.rs @@ -49,3 +49,12 @@ fn fill_rust_env_vars(cmd: &mut Command) -> &mut Command { cmd } } + +fn fill_aws_secrets_vars(mut cmd: &mut Command) -> &mut Command { + for env_key in ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"] { + if let Ok(value) = std::env::var(env_key) { + cmd = cmd.env(env_key, value); + } + } + cmd +} diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 015b33f591..2623f65242 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -167,6 +167,8 @@ pub struct SafekeeperConf { pub pg_port: u16, pub http_port: u16, pub sync: bool, + pub remote_storage: Option, + pub backup_threads: Option, } impl Default for SafekeeperConf { @@ -176,6 +178,8 @@ impl Default for SafekeeperConf { pg_port: 0, http_port: 0, sync: true, + remote_storage: None, + backup_threads: None, } } } @@ -377,6 +381,7 @@ impl LocalEnv { base_path != Path::new(""), "repository base path is missing" ); + ensure!( !base_path.exists(), "directory '{}' already exists. Perhaps already initialized?", diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index 303d6850df..972b6d48ae 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -23,7 +23,7 @@ use utils::{ use crate::local_env::{LocalEnv, SafekeeperConf}; use crate::storage::PageServerNode; -use crate::{fill_rust_env_vars, read_pidfile}; +use crate::{fill_aws_secrets_vars, fill_rust_env_vars, read_pidfile}; #[derive(Error, Debug)] pub enum SafekeeperHttpError { @@ -143,6 +143,14 @@ impl SafekeeperNode { if let Some(prefix) = self.env.etcd_broker.broker_etcd_prefix.as_deref() { cmd.args(&["--broker-etcd-prefix", prefix]); } + if let Some(threads) = self.conf.backup_threads { + cmd.args(&["--backup-threads", threads.to_string().as_ref()]); + } + if let Some(ref remote_storage) = self.conf.remote_storage { + cmd.args(&["--remote-storage", remote_storage]); + } + + fill_aws_secrets_vars(&mut cmd); if !cmd.status()?.success() { bail!( diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index 355c7c250d..24cdbce8f3 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -25,7 +25,7 @@ use utils::{ }; use crate::local_env::LocalEnv; -use crate::{fill_rust_env_vars, read_pidfile}; +use crate::{fill_aws_secrets_vars, fill_rust_env_vars, read_pidfile}; use pageserver::tenant_mgr::TenantInfo; #[derive(Error, Debug)] @@ -493,12 +493,3 @@ impl PageServerNode { Ok(timeline_info_response) } } - -fn fill_aws_secrets_vars(mut cmd: &mut Command) -> &mut Command { - for env_key in ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"] { - if let Ok(value) = std::env::var(env_key) { - cmd = cmd.env(env_key, value); - } - } - cmd -} diff --git a/libs/etcd_broker/src/lib.rs b/libs/etcd_broker/src/lib.rs index 271f657f43..7fe142502b 100644 --- a/libs/etcd_broker/src/lib.rs +++ b/libs/etcd_broker/src/lib.rs @@ -43,10 +43,10 @@ pub struct SkTimelineInfo { #[serde_as(as = "Option")] #[serde(default)] pub commit_lsn: Option, - /// LSN up to which safekeeper offloaded WAL to s3. + /// LSN up to which safekeeper has backed WAL. #[serde_as(as = "Option")] #[serde(default)] - pub s3_wal_lsn: Option, + pub backup_lsn: Option, /// LSN of last checkpoint uploaded by pageserver. #[serde_as(as = "Option")] #[serde(default)] diff --git a/libs/remote_storage/Cargo.toml b/libs/remote_storage/Cargo.toml index 5c62e28fda..b11b3cf371 100644 --- a/libs/remote_storage/Cargo.toml +++ b/libs/remote_storage/Cargo.toml @@ -6,7 +6,6 @@ edition = "2021" [dependencies] anyhow = { version = "1.0", features = ["backtrace"] } async-trait = "0.1" - metrics = { version = "0.1", path = "../metrics" } once_cell = "1.8.0" rusoto_core = "0.48" @@ -15,6 +14,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1" tokio = { version = "1.17", features = ["sync", "macros", "fs", "io-util"] } tokio-util = { version = "0.7", features = ["io"] } +toml_edit = { version = "0.13", features = ["easy"] } tracing = "0.1.27" workspace_hack = { version = "0.1", path = "../../workspace_hack" } diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 8092e4fc49..0889cb720c 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -16,8 +16,10 @@ use std::{ path::{Path, PathBuf}, }; -use anyhow::Context; +use anyhow::{bail, Context}; + use tokio::io; +use toml_edit::Item; use tracing::info; pub use self::{ @@ -203,6 +205,90 @@ pub fn path_with_suffix_extension(original_path: impl AsRef, suffix: &str) .with_extension(new_extension.as_ref()) } +impl RemoteStorageConfig { + pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result { + let local_path = toml.get("local_path"); + let bucket_name = toml.get("bucket_name"); + let bucket_region = toml.get("bucket_region"); + + let max_concurrent_syncs = NonZeroUsize::new( + parse_optional_integer("max_concurrent_syncs", toml)? + .unwrap_or(DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS), + ) + .context("Failed to parse 'max_concurrent_syncs' as a positive integer")?; + + let max_sync_errors = NonZeroU32::new( + parse_optional_integer("max_sync_errors", toml)? + .unwrap_or(DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS), + ) + .context("Failed to parse 'max_sync_errors' as a positive integer")?; + + let concurrency_limit = NonZeroUsize::new( + parse_optional_integer("concurrency_limit", toml)? + .unwrap_or(DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT), + ) + .context("Failed to parse 'concurrency_limit' as a positive integer")?; + + let storage = match (local_path, bucket_name, bucket_region) { + (None, None, None) => bail!("no 'local_path' nor 'bucket_name' option"), + (_, Some(_), None) => { + bail!("'bucket_region' option is mandatory if 'bucket_name' is given ") + } + (_, None, Some(_)) => { + bail!("'bucket_name' option is mandatory if 'bucket_region' is given ") + } + (None, Some(bucket_name), Some(bucket_region)) => RemoteStorageKind::AwsS3(S3Config { + bucket_name: parse_toml_string("bucket_name", bucket_name)?, + bucket_region: parse_toml_string("bucket_region", bucket_region)?, + prefix_in_bucket: toml + .get("prefix_in_bucket") + .map(|prefix_in_bucket| parse_toml_string("prefix_in_bucket", prefix_in_bucket)) + .transpose()?, + endpoint: toml + .get("endpoint") + .map(|endpoint| parse_toml_string("endpoint", endpoint)) + .transpose()?, + concurrency_limit, + }), + (Some(local_path), None, None) => RemoteStorageKind::LocalFs(PathBuf::from( + parse_toml_string("local_path", local_path)?, + )), + (Some(_), Some(_), _) => bail!("local_path and bucket_name are mutually exclusive"), + }; + + Ok(RemoteStorageConfig { + max_concurrent_syncs, + max_sync_errors, + storage, + }) + } +} + +// Helper functions to parse a toml Item +fn parse_optional_integer(name: &str, item: &toml_edit::Item) -> anyhow::Result> +where + I: TryFrom, + E: std::error::Error + Send + Sync + 'static, +{ + let toml_integer = match item.get(name) { + Some(item) => item + .as_integer() + .with_context(|| format!("configure option {name} is not an integer"))?, + None => return Ok(None), + }; + + I::try_from(toml_integer) + .map(Some) + .with_context(|| format!("configure option {name} is too large")) +} + +fn parse_toml_string(name: &str, item: &Item) -> anyhow::Result { + let s = item + .as_str() + .with_context(|| format!("configure option {name} is not a string"))?; + Ok(s.to_string()) +} + #[cfg(test)] mod tests { use super::*; diff --git a/libs/utils/src/lsn.rs b/libs/utils/src/lsn.rs index c09d8c67ce..3dab2a625c 100644 --- a/libs/utils/src/lsn.rs +++ b/libs/utils/src/lsn.rs @@ -26,6 +26,9 @@ impl Lsn { /// Maximum possible value for an LSN pub const MAX: Lsn = Lsn(u64::MAX); + /// Invalid value for InvalidXLogRecPtr, as defined in xlogdefs.h + pub const INVALID: Lsn = Lsn(0); + /// Subtract a number, returning None on overflow. pub fn checked_sub>(self, other: T) -> Option { let other: u64 = other.into(); @@ -103,6 +106,12 @@ impl Lsn { pub fn is_aligned(&self) -> bool { *self == self.align() } + + /// Return if the LSN is valid + /// mimics postgres XLogRecPtrIsInvalid macro + pub fn is_valid(self) -> bool { + self != Lsn::INVALID + } } impl From for Lsn { diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 6c045d77ae..dc9d7161a2 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -5,9 +5,9 @@ //! See also `settings.md` for better description on every parameter. use anyhow::{anyhow, bail, ensure, Context, Result}; -use remote_storage::{RemoteStorageConfig, RemoteStorageKind, S3Config}; +use remote_storage::RemoteStorageConfig; use std::env; -use std::num::{NonZeroU32, NonZeroUsize}; + use std::path::{Path, PathBuf}; use std::str::FromStr; use std::time::Duration; @@ -394,7 +394,7 @@ impl PageServerConf { )), "auth_type" => builder.auth_type(parse_toml_from_str(key, item)?), "remote_storage" => { - builder.remote_storage_config(Some(Self::parse_remote_storage_config(item)?)) + builder.remote_storage_config(Some(RemoteStorageConfig::from_toml(item)?)) } "tenant_config" => { t_conf = Self::parse_toml_tenant_conf(item)?; @@ -484,64 +484,6 @@ impl PageServerConf { Ok(t_conf) } - /// subroutine of parse_config(), to parse the `[remote_storage]` table. - fn parse_remote_storage_config(toml: &toml_edit::Item) -> anyhow::Result { - let local_path = toml.get("local_path"); - let bucket_name = toml.get("bucket_name"); - let bucket_region = toml.get("bucket_region"); - - let max_concurrent_syncs = NonZeroUsize::new( - parse_optional_integer("max_concurrent_syncs", toml)? - .unwrap_or(remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS), - ) - .context("Failed to parse 'max_concurrent_syncs' as a positive integer")?; - - let max_sync_errors = NonZeroU32::new( - parse_optional_integer("max_sync_errors", toml)? - .unwrap_or(remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS), - ) - .context("Failed to parse 'max_sync_errors' as a positive integer")?; - - let concurrency_limit = NonZeroUsize::new( - parse_optional_integer("concurrency_limit", toml)? - .unwrap_or(remote_storage::DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT), - ) - .context("Failed to parse 'concurrency_limit' as a positive integer")?; - - let storage = match (local_path, bucket_name, bucket_region) { - (None, None, None) => bail!("no 'local_path' nor 'bucket_name' option"), - (_, Some(_), None) => { - bail!("'bucket_region' option is mandatory if 'bucket_name' is given ") - } - (_, None, Some(_)) => { - bail!("'bucket_name' option is mandatory if 'bucket_region' is given ") - } - (None, Some(bucket_name), Some(bucket_region)) => RemoteStorageKind::AwsS3(S3Config { - bucket_name: parse_toml_string("bucket_name", bucket_name)?, - bucket_region: parse_toml_string("bucket_region", bucket_region)?, - prefix_in_bucket: toml - .get("prefix_in_bucket") - .map(|prefix_in_bucket| parse_toml_string("prefix_in_bucket", prefix_in_bucket)) - .transpose()?, - endpoint: toml - .get("endpoint") - .map(|endpoint| parse_toml_string("endpoint", endpoint)) - .transpose()?, - concurrency_limit, - }), - (Some(local_path), None, None) => RemoteStorageKind::LocalFs(PathBuf::from( - parse_toml_string("local_path", local_path)?, - )), - (Some(_), Some(_), _) => bail!("local_path and bucket_name are mutually exclusive"), - }; - - Ok(RemoteStorageConfig { - max_concurrent_syncs, - max_sync_errors, - storage, - }) - } - #[cfg(test)] pub fn test_repo_dir(test_name: &str) -> PathBuf { PathBuf::from(format!("../tmp_check/test_{test_name}")) @@ -592,23 +534,6 @@ fn parse_toml_u64(name: &str, item: &Item) -> Result { Ok(i as u64) } -fn parse_optional_integer(name: &str, item: &toml_edit::Item) -> anyhow::Result> -where - I: TryFrom, - E: std::error::Error + Send + Sync + 'static, -{ - let toml_integer = match item.get(name) { - Some(item) => item - .as_integer() - .with_context(|| format!("configure option {name} is not an integer"))?, - None => return Ok(None), - }; - - I::try_from(toml_integer) - .map(Some) - .with_context(|| format!("configure option {name} is too large")) -} - fn parse_toml_duration(name: &str, item: &Item) -> Result { let s = item .as_str() @@ -651,8 +576,12 @@ fn parse_toml_array(name: &str, item: &Item) -> anyhow::Result> { #[cfg(test)] mod tests { - use std::fs; + use std::{ + fs, + num::{NonZeroU32, NonZeroUsize}, + }; + use remote_storage::{RemoteStorageKind, S3Config}; use tempfile::{tempdir, TempDir}; use super::*; diff --git a/safekeeper/Cargo.toml b/safekeeper/Cargo.toml index 417cf58cd5..373108c61b 100644 --- a/safekeeper/Cargo.toml +++ b/safekeeper/Cargo.toml @@ -30,6 +30,10 @@ const_format = "0.2.21" tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } tokio-util = { version = "0.7", features = ["io"] } git-version = "0.3.5" +async-trait = "0.1" +once_cell = "1.10.0" +futures = "0.3.13" +toml_edit = { version = "0.13", features = ["easy"] } postgres_ffi = { path = "../libs/postgres_ffi" } metrics = { path = "../libs/metrics" } diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index 290b7c738a..a7628482d9 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -6,22 +6,27 @@ use clap::{App, Arg}; use const_format::formatcp; use daemonize::Daemonize; use fs2::FileExt; +use remote_storage::RemoteStorageConfig; use std::fs::{self, File}; use std::io::{ErrorKind, Write}; use std::path::{Path, PathBuf}; use std::thread; use tokio::sync::mpsc; +use toml_edit::Document; use tracing::*; use url::{ParseError, Url}; use safekeeper::control_file::{self}; -use safekeeper::defaults::{DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR}; +use safekeeper::defaults::{ + DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR, DEFAULT_WAL_BACKUP_RUNTIME_THREADS, +}; +use safekeeper::http; use safekeeper::remove_wal; use safekeeper::timeline::GlobalTimelines; +use safekeeper::wal_backup; use safekeeper::wal_service; use safekeeper::SafeKeeperConf; use safekeeper::{broker, callmemaybe}; -use safekeeper::{http, s3_offload}; use utils::{ http::endpoint, logging, project_git_version, shutdown::exit_now, signals, tcp_listener, zid::NodeId, @@ -71,12 +76,6 @@ fn main() -> anyhow::Result<()> { .long("pageserver") .takes_value(true), ) - .arg( - Arg::new("ttl") - .long("ttl") - .takes_value(true) - .help("interval for keeping WAL at safekeeper node, after which them will be uploaded to S3 and removed locally"), - ) .arg( Arg::new("recall") .long("recall") @@ -118,12 +117,20 @@ fn main() -> anyhow::Result<()> { .help("a prefix to always use when polling/pusing data in etcd from this safekeeper"), ) .arg( - Arg::new("enable-s3-offload") - .long("enable-s3-offload") + Arg::new("wal-backup-threads").long("backup-threads").takes_value(true).help(formatcp!("number of threads for wal backup (default {DEFAULT_WAL_BACKUP_RUNTIME_THREADS}")), + ).arg( + Arg::new("remote-storage") + .long("remote-storage") + .takes_value(true) + .help("Remote storage configuration for WAL backup (offloading to s3) as TOML inline table, e.g. {\"max_concurrent_syncs\" = 17, \"max_sync_errors\": 13, \"bucket_name\": \"\", \"bucket_region\":\"\", \"concurrency_limit\": 119}.\nSafekeeper offloads WAL to [prefix_in_bucket/]//, mirroring structure on the file system.") + ) + .arg( + Arg::new("enable-wal-backup") + .long("enable-wal-backup") .takes_value(true) .default_value("true") .default_missing_value("true") - .help("Enable/disable s3 offloading. When disabled, safekeeper removes WAL ignoring s3 WAL horizon."), + .help("Enable/disable WAL backup to s3. When disabled, safekeeper removes WAL ignoring WAL backup horizon."), ) .get_matches(); @@ -157,10 +164,6 @@ fn main() -> anyhow::Result<()> { conf.listen_http_addr = addr.to_owned(); } - if let Some(ttl) = arg_matches.value_of("ttl") { - conf.ttl = Some(humantime::parse_duration(ttl)?); - } - if let Some(recall) = arg_matches.value_of("recall") { conf.recall_period = humantime::parse_duration(recall)?; } @@ -182,9 +185,21 @@ fn main() -> anyhow::Result<()> { conf.broker_etcd_prefix = prefix.to_string(); } + if let Some(backup_threads) = arg_matches.value_of("wal-backup-threads") { + conf.backup_runtime_threads = backup_threads + .parse() + .with_context(|| format!("Failed to parse backup threads {}", backup_threads))?; + } + if let Some(storage_conf) = arg_matches.value_of("remote-storage") { + // funny toml doesn't consider plain inline table as valid document, so wrap in a key to parse + let storage_conf_toml = format!("remote_storage = {}", storage_conf); + let parsed_toml = storage_conf_toml.parse::()?; // parse + let (_, storage_conf_parsed_toml) = parsed_toml.iter().next().unwrap(); // and strip key off again + conf.remote_storage = Some(RemoteStorageConfig::from_toml(storage_conf_parsed_toml)?); + } // Seems like there is no better way to accept bool values explicitly in clap. - conf.s3_offload_enabled = arg_matches - .value_of("enable-s3-offload") + conf.wal_backup_enabled = arg_matches + .value_of("enable-wal-backup") .unwrap() .parse() .context("failed to parse bool enable-s3-offload bool")?; @@ -252,7 +267,8 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option, init: bo let signals = signals::install_shutdown_handlers()?; let mut threads = vec![]; let (callmemaybe_tx, callmemaybe_rx) = mpsc::unbounded_channel(); - GlobalTimelines::set_callmemaybe_tx(callmemaybe_tx); + let (wal_backup_launcher_tx, wal_backup_launcher_rx) = mpsc::channel(100); + GlobalTimelines::init(callmemaybe_tx, wal_backup_launcher_tx); let conf_ = conf.clone(); threads.push( @@ -270,17 +286,6 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option, init: bo })?, ); - if conf.ttl.is_some() { - let conf_ = conf.clone(); - threads.push( - thread::Builder::new() - .name("S3 offload thread".into()) - .spawn(|| { - s3_offload::thread_main(conf_); - })?, - ); - } - let conf_cloned = conf.clone(); let safekeeper_thread = thread::Builder::new() .name("Safekeeper thread".into()) @@ -330,6 +335,15 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option, init: bo })?, ); + let conf_ = conf.clone(); + threads.push( + thread::Builder::new() + .name("wal backup launcher thread".into()) + .spawn(move || { + wal_backup::wal_backup_launcher_thread_main(conf_, wal_backup_launcher_rx); + })?, + ); + // TODO: put more thoughts into handling of failed threads // We probably should restart them. diff --git a/safekeeper/src/broker.rs b/safekeeper/src/broker.rs index 59d282d378..676719b60d 100644 --- a/safekeeper/src/broker.rs +++ b/safekeeper/src/broker.rs @@ -1,5 +1,6 @@ //! Communication with etcd, providing safekeeper peers and pageserver coordination. +use anyhow::anyhow; use anyhow::Context; use anyhow::Error; use anyhow::Result; @@ -7,9 +8,11 @@ use etcd_broker::Client; use etcd_broker::PutOptions; use etcd_broker::SkTimelineSubscriptionKind; use std::time::Duration; +use tokio::spawn; use tokio::task::JoinHandle; use tokio::{runtime, time::sleep}; use tracing::*; +use url::Url; use crate::{timeline::GlobalTimelines, SafeKeeperConf}; use utils::zid::{NodeId, ZTenantTimelineId}; @@ -44,6 +47,118 @@ fn timeline_safekeeper_path( ) } +pub struct Election { + pub election_name: String, + pub candidate_name: String, + pub broker_endpoints: Vec, +} + +impl Election { + pub fn new(election_name: String, candidate_name: String, broker_endpoints: Vec) -> Self { + Self { + election_name, + candidate_name, + broker_endpoints, + } + } +} + +pub struct ElectionLeader { + client: Client, + keep_alive: JoinHandle>, +} + +impl ElectionLeader { + pub async fn check_am_i( + &mut self, + election_name: String, + candidate_name: String, + ) -> Result { + let resp = self.client.leader(election_name).await?; + + let kv = resp.kv().ok_or(anyhow!("failed to get leader response"))?; + let leader = kv.value_str()?; + + Ok(leader == candidate_name) + } + + pub async fn give_up(self) { + // self.keep_alive.abort(); + // TODO: it'll be wise to resign here but it'll happen after lease expiration anyway + // should we await for keep alive termination? + let _ = self.keep_alive.await; + } +} + +pub async fn get_leader(req: &Election) -> Result { + let mut client = Client::connect(req.broker_endpoints.clone(), None) + .await + .context("Could not connect to etcd")?; + + let lease = client + .lease_grant(LEASE_TTL_SEC, None) + .await + .context("Could not acquire a lease"); + + let lease_id = lease.map(|l| l.id()).unwrap(); + + let keep_alive = spawn::<_>(lease_keep_alive(client.clone(), lease_id)); + + if let Err(e) = client + .campaign( + req.election_name.clone(), + req.candidate_name.clone(), + lease_id, + ) + .await + { + keep_alive.abort(); + let _ = keep_alive.await; + return Err(e.into()); + } + + Ok(ElectionLeader { client, keep_alive }) +} + +async fn lease_keep_alive(mut client: Client, lease_id: i64) -> Result<()> { + let (mut keeper, mut ka_stream) = client + .lease_keep_alive(lease_id) + .await + .context("failed to create keepalive stream")?; + + loop { + let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC); + + keeper + .keep_alive() + .await + .context("failed to send LeaseKeepAliveRequest")?; + + ka_stream + .message() + .await + .context("failed to receive LeaseKeepAliveResponse")?; + + sleep(push_interval).await; + } +} + +pub fn get_campaign_name( + election_name: String, + broker_prefix: String, + timeline_id: &ZTenantTimelineId, +) -> String { + return format!( + "{}/{}", + SkTimelineSubscriptionKind::timeline(broker_prefix, *timeline_id).watch_key(), + election_name + ); +} + +pub fn get_candiate_name(system_id: NodeId) -> String { + format!("id_{}", system_id) +} + /// Push once in a while data about all active timelines to the broker. async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> { let mut client = Client::connect(&conf.broker_endpoints, None).await?; @@ -59,7 +174,7 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> { // sensitive and there is no risk of deadlock as we don't await while // lock is held. for zttid in GlobalTimelines::get_active_timelines() { - if let Ok(tli) = GlobalTimelines::get(&conf, zttid, false) { + if let Some(tli) = GlobalTimelines::get_loaded(zttid) { let sk_info = tli.get_public_info(&conf)?; let put_opts = PutOptions::new().with_lease(lease.id()); client @@ -106,12 +221,13 @@ async fn pull_loop(conf: SafeKeeperConf) -> Result<()> { // note: there are blocking operations below, but it's considered fine for now if let Ok(tli) = GlobalTimelines::get(&conf, zttid, false) { for (safekeeper_id, info) in sk_info { - tli.record_safekeeper_info(&info, safekeeper_id)? + tli.record_safekeeper_info(&info, safekeeper_id).await? } } } } None => { + // XXX it means we lost connection with etcd, error is consumed inside sub object debug!("timeline updates sender closed, aborting the pull loop"); return Ok(()); } @@ -142,11 +258,12 @@ async fn main_loop(conf: SafeKeeperConf) { }, res = async { pull_handle.as_mut().unwrap().await }, if pull_handle.is_some() => { // was it panic or normal error? - let err = match res { - Ok(res_internal) => res_internal.unwrap_err(), - Err(err_outer) => err_outer.into(), + match res { + Ok(res_internal) => if let Err(err_inner) = res_internal { + warn!("pull task failed: {:?}", err_inner); + } + Err(err_outer) => { warn!("pull task panicked: {:?}", err_outer) } }; - warn!("pull task failed: {:?}", err); pull_handle = None; }, _ = ticker.tick() => { diff --git a/safekeeper/src/control_file_upgrade.rs b/safekeeper/src/control_file_upgrade.rs index 22716de1a0..8d36472540 100644 --- a/safekeeper/src/control_file_upgrade.rs +++ b/safekeeper/src/control_file_upgrade.rs @@ -165,7 +165,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result timeline_start_lsn: Lsn(0), local_start_lsn: Lsn(0), commit_lsn: oldstate.commit_lsn, - s3_wal_lsn: Lsn(0), + backup_lsn: Lsn(0), peer_horizon_lsn: oldstate.truncate_lsn, remote_consistent_lsn: Lsn(0), peers: Peers(vec![]), @@ -188,7 +188,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result timeline_start_lsn: Lsn(0), local_start_lsn: Lsn(0), commit_lsn: oldstate.commit_lsn, - s3_wal_lsn: Lsn(0), + backup_lsn: Lsn(0), peer_horizon_lsn: oldstate.truncate_lsn, remote_consistent_lsn: Lsn(0), peers: Peers(vec![]), @@ -211,7 +211,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result timeline_start_lsn: Lsn(0), local_start_lsn: Lsn(0), commit_lsn: oldstate.commit_lsn, - s3_wal_lsn: Lsn(0), + backup_lsn: Lsn(0), peer_horizon_lsn: oldstate.truncate_lsn, remote_consistent_lsn: Lsn(0), peers: Peers(vec![]), @@ -234,7 +234,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result timeline_start_lsn: Lsn(0), local_start_lsn: Lsn(0), commit_lsn: oldstate.commit_lsn, - s3_wal_lsn: Lsn(0), + backup_lsn: Lsn::INVALID, peer_horizon_lsn: oldstate.peer_horizon_lsn, remote_consistent_lsn: Lsn(0), peers: Peers(vec![]), diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index 3f6ade970d..b0197a9a2a 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -70,19 +70,19 @@ struct TimelineStatus { timeline_id: ZTimelineId, acceptor_state: AcceptorStateStatus, #[serde(serialize_with = "display_serialize")] + flush_lsn: Lsn, + #[serde(serialize_with = "display_serialize")] timeline_start_lsn: Lsn, #[serde(serialize_with = "display_serialize")] local_start_lsn: Lsn, #[serde(serialize_with = "display_serialize")] commit_lsn: Lsn, #[serde(serialize_with = "display_serialize")] - s3_wal_lsn: Lsn, + backup_lsn: Lsn, #[serde(serialize_with = "display_serialize")] peer_horizon_lsn: Lsn, #[serde(serialize_with = "display_serialize")] remote_consistent_lsn: Lsn, - #[serde(serialize_with = "display_serialize")] - flush_lsn: Lsn, } /// Report info about timeline. @@ -107,13 +107,13 @@ async fn timeline_status_handler(request: Request) -> Result) -> Result, pub recall_period: Duration, + pub remote_storage: Option, + pub backup_runtime_threads: usize, + pub wal_backup_enabled: bool, pub my_id: NodeId, pub broker_endpoints: Vec, pub broker_etcd_prefix: String, - pub s3_offload_enabled: bool, } impl SafeKeeperConf { @@ -77,12 +81,13 @@ impl Default for SafeKeeperConf { no_sync: false, listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(), listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(), - ttl: None, + remote_storage: None, recall_period: defaults::DEFAULT_RECALL_PERIOD, my_id: NodeId(0), broker_endpoints: Vec::new(), broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(), - s3_offload_enabled: true, + backup_runtime_threads: DEFAULT_WAL_BACKUP_RUNTIME_THREADS, + wal_backup_enabled: true, } } } diff --git a/safekeeper/src/receive_wal.rs b/safekeeper/src/receive_wal.rs index 0ef335c9ed..88b7816912 100644 --- a/safekeeper/src/receive_wal.rs +++ b/safekeeper/src/receive_wal.rs @@ -85,16 +85,10 @@ impl<'pg> ReceiveWalConn<'pg> { _ => bail!("unexpected message {:?} instead of greeting", next_msg), } - // Register the connection and defer unregister. - spg.timeline - .get() - .on_compute_connect(self.pageserver_connstr.as_ref())?; - let _guard = ComputeConnectionGuard { - timeline: Arc::clone(spg.timeline.get()), - }; - let mut next_msg = Some(next_msg); + let mut first_time_through = true; + let mut _guard: Option = None; loop { if matches!(next_msg, Some(ProposerAcceptorMessage::AppendRequest(_))) { // poll AppendRequest's without blocking and write WAL to disk without flushing, @@ -122,6 +116,18 @@ impl<'pg> ReceiveWalConn<'pg> { self.write_msg(&reply)?; } } + if first_time_through { + // Register the connection and defer unregister. Do that only + // after processing first message, as it sets wal_seg_size, + // wanted by many. + spg.timeline + .get() + .on_compute_connect(self.pageserver_connstr.as_ref())?; + _guard = Some(ComputeConnectionGuard { + timeline: Arc::clone(spg.timeline.get()), + }); + first_time_through = false; + } // blocking wait for the next message if next_msg.is_none() { diff --git a/safekeeper/src/remove_wal.rs b/safekeeper/src/remove_wal.rs index 3278d51bd3..004c0243f9 100644 --- a/safekeeper/src/remove_wal.rs +++ b/safekeeper/src/remove_wal.rs @@ -12,7 +12,7 @@ pub fn thread_main(conf: SafeKeeperConf) { let active_tlis = GlobalTimelines::get_active_timelines(); for zttid in &active_tlis { if let Ok(tli) = GlobalTimelines::get(&conf, *zttid, false) { - if let Err(e) = tli.remove_old_wal(conf.s3_offload_enabled) { + if let Err(e) = tli.remove_old_wal(conf.wal_backup_enabled) { warn!( "failed to remove WAL for tenant {} timeline {}: {}", tli.zttid.tenant_id, tli.zttid.timeline_id, e diff --git a/safekeeper/src/s3_offload.rs b/safekeeper/src/s3_offload.rs deleted file mode 100644 index 2851c0b8a0..0000000000 --- a/safekeeper/src/s3_offload.rs +++ /dev/null @@ -1,107 +0,0 @@ -// -// Offload old WAL segments to S3 and remove them locally -// Needs `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` environment variables to be set -// if no IAM bucket access is used. -// - -use anyhow::{bail, Context}; -use postgres_ffi::xlog_utils::*; -use remote_storage::{ - GenericRemoteStorage, RemoteStorage, RemoteStorageConfig, S3Bucket, S3Config, S3ObjectKey, -}; -use std::collections::HashSet; -use std::env; -use std::num::{NonZeroU32, NonZeroUsize}; -use std::path::Path; -use std::time::SystemTime; -use tokio::fs::{self, File}; -use tokio::io::BufReader; -use tokio::runtime; -use tokio::time::sleep; -use tracing::*; -use walkdir::WalkDir; - -use crate::SafeKeeperConf; - -pub fn thread_main(conf: SafeKeeperConf) { - // Create a new thread pool - // - // FIXME: keep it single-threaded for now, make it easier to debug with gdb, - // and we're not concerned with performance yet. - //let runtime = runtime::Runtime::new().unwrap(); - let runtime = runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - - info!("Starting S3 offload task"); - - runtime.block_on(async { - main_loop(&conf).await.unwrap(); - }); -} - -async fn offload_files( - remote_storage: &S3Bucket, - listing: &HashSet, - dir_path: &Path, - conf: &SafeKeeperConf, -) -> anyhow::Result { - let horizon = SystemTime::now() - conf.ttl.unwrap(); - let mut n: u64 = 0; - for entry in WalkDir::new(dir_path) { - let entry = entry?; - let path = entry.path(); - - if path.is_file() - && IsXLogFileName(entry.file_name().to_str().unwrap()) - && entry.metadata().unwrap().created().unwrap() <= horizon - { - let remote_path = remote_storage.remote_object_id(path)?; - if !listing.contains(&remote_path) { - let file = File::open(&path).await?; - let file_length = file.metadata().await?.len() as usize; - remote_storage - .upload(BufReader::new(file), file_length, &remote_path, None) - .await?; - - fs::remove_file(&path).await?; - n += 1; - } - } - } - Ok(n) -} - -async fn main_loop(conf: &SafeKeeperConf) -> anyhow::Result<()> { - let remote_storage = match GenericRemoteStorage::new( - conf.workdir.clone(), - &RemoteStorageConfig { - max_concurrent_syncs: NonZeroUsize::new(10).unwrap(), - max_sync_errors: NonZeroU32::new(1).unwrap(), - storage: remote_storage::RemoteStorageKind::AwsS3(S3Config { - bucket_name: "zenith-testbucket".to_string(), - bucket_region: env::var("S3_REGION").context("S3_REGION env var is not set")?, - prefix_in_bucket: Some("walarchive/".to_string()), - endpoint: Some(env::var("S3_ENDPOINT").context("S3_ENDPOINT env var is not set")?), - concurrency_limit: NonZeroUsize::new(20).unwrap(), - }), - }, - )? { - GenericRemoteStorage::Local(_) => { - bail!("Unexpected: got local storage for the remote config") - } - GenericRemoteStorage::S3(remote_storage) => remote_storage, - }; - - loop { - let listing = remote_storage - .list() - .await? - .into_iter() - .collect::>(); - let n = offload_files(&remote_storage, &listing, &conf.workdir, conf).await?; - info!("Offload {n} files to S3"); - sleep(conf.ttl.unwrap()).await; - } -} diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index b8b969929d..9a07127771 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -19,6 +19,7 @@ use lazy_static::lazy_static; use crate::control_file; use crate::send_wal::HotStandbyFeedback; + use crate::wal_storage; use metrics::{register_gauge_vec, Gauge, GaugeVec}; use postgres_ffi::xlog_utils::MAX_SEND_SIZE; @@ -141,7 +142,7 @@ pub struct ServerInfo { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PeerInfo { /// LSN up to which safekeeper offloaded WAL to s3. - s3_wal_lsn: Lsn, + backup_lsn: Lsn, /// Term of the last entry. term: Term, /// LSN of the last record. @@ -153,7 +154,7 @@ pub struct PeerInfo { impl PeerInfo { fn new() -> Self { Self { - s3_wal_lsn: Lsn(0), + backup_lsn: Lsn::INVALID, term: INVALID_TERM, flush_lsn: Lsn(0), commit_lsn: Lsn(0), @@ -193,9 +194,9 @@ pub struct SafeKeeperState { /// Part of WAL acknowledged by quorum and available locally. Always points /// to record boundary. pub commit_lsn: Lsn, - /// First LSN not yet offloaded to s3. Useful to persist to avoid finding - /// out offloading progress on boot. - pub s3_wal_lsn: Lsn, + /// LSN that points to the end of the last backed up segment. Useful to + /// persist to avoid finding out offloading progress on boot. + pub backup_lsn: Lsn, /// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn /// of last record streamed to everyone). Persisting it helps skipping /// recovery in walproposer, generally we compute it from peers. In @@ -217,7 +218,7 @@ pub struct SafeKeeperState { // are not flushed yet. pub struct SafekeeperMemState { pub commit_lsn: Lsn, - pub s3_wal_lsn: Lsn, // TODO: keep only persistent version + pub backup_lsn: Lsn, pub peer_horizon_lsn: Lsn, pub remote_consistent_lsn: Lsn, pub proposer_uuid: PgUuid, @@ -241,7 +242,7 @@ impl SafeKeeperState { timeline_start_lsn: Lsn(0), local_start_lsn: Lsn(0), commit_lsn: Lsn(0), - s3_wal_lsn: Lsn(0), + backup_lsn: Lsn::INVALID, peer_horizon_lsn: Lsn(0), remote_consistent_lsn: Lsn(0), peers: Peers(peers.iter().map(|p| (*p, PeerInfo::new())).collect()), @@ -559,7 +560,7 @@ where epoch_start_lsn: Lsn(0), inmem: SafekeeperMemState { commit_lsn: state.commit_lsn, - s3_wal_lsn: state.s3_wal_lsn, + backup_lsn: state.backup_lsn, peer_horizon_lsn: state.peer_horizon_lsn, remote_consistent_lsn: state.remote_consistent_lsn, proposer_uuid: state.proposer_uuid, @@ -649,7 +650,6 @@ where self.state.persist(&state)?; } - // pass wal_seg_size to read WAL and find flush_lsn self.wal_store.init_storage(&self.state)?; info!( @@ -764,6 +764,14 @@ where self.inmem.commit_lsn = commit_lsn; self.metrics.commit_lsn.set(self.inmem.commit_lsn.0 as f64); + // We got our first commit_lsn, which means we should sync + // everything to disk, to initialize the state. + if self.state.commit_lsn == Lsn::INVALID && commit_lsn != Lsn::INVALID { + self.inmem.backup_lsn = self.inmem.commit_lsn; // initialize backup_lsn + self.wal_store.flush_wal()?; + self.persist_control_file()?; + } + // If new commit_lsn reached epoch switch, force sync of control // file: walproposer in sync mode is very interested when this // happens. Note: this is for sync-safekeepers mode only, as @@ -775,22 +783,14 @@ where self.persist_control_file()?; } - // We got our first commit_lsn, which means we should sync - // everything to disk, to initialize the state. - if self.state.commit_lsn == Lsn(0) && commit_lsn > Lsn(0) { - self.wal_store.flush_wal()?; - self.persist_control_file()?; - } - Ok(()) } /// Persist in-memory state to the disk. fn persist_control_file(&mut self) -> Result<()> { let mut state = self.state.clone(); - state.commit_lsn = self.inmem.commit_lsn; - state.s3_wal_lsn = self.inmem.s3_wal_lsn; + state.backup_lsn = self.inmem.backup_lsn; state.peer_horizon_lsn = self.inmem.peer_horizon_lsn; state.remote_consistent_lsn = self.inmem.remote_consistent_lsn; state.proposer_uuid = self.inmem.proposer_uuid; @@ -898,11 +898,11 @@ where self.update_commit_lsn()?; } } - if let Some(s3_wal_lsn) = sk_info.s3_wal_lsn { - let new_s3_wal_lsn = max(s3_wal_lsn, self.inmem.s3_wal_lsn); + if let Some(backup_lsn) = sk_info.backup_lsn { + let new_backup_lsn = max(backup_lsn, self.inmem.backup_lsn); sync_control_file |= - self.state.s3_wal_lsn + (self.state.server.wal_seg_size as u64) < new_s3_wal_lsn; - self.inmem.s3_wal_lsn = new_s3_wal_lsn; + self.state.backup_lsn + (self.state.server.wal_seg_size as u64) < new_backup_lsn; + self.inmem.backup_lsn = new_backup_lsn; } if let Some(remote_consistent_lsn) = sk_info.remote_consistent_lsn { let new_remote_consistent_lsn = @@ -930,29 +930,23 @@ where /// offloading. /// While it is safe to use inmem values for determining horizon, /// we use persistent to make possible normal states less surprising. - pub fn get_horizon_segno(&self, s3_offload_enabled: bool) -> XLogSegNo { - let s3_offload_horizon = if s3_offload_enabled { - self.state.s3_wal_lsn - } else { - Lsn(u64::MAX) - }; - let horizon_lsn = min( - min( - self.state.remote_consistent_lsn, - self.state.peer_horizon_lsn, - ), - s3_offload_horizon, + pub fn get_horizon_segno(&self, wal_backup_enabled: bool) -> XLogSegNo { + let mut horizon_lsn = min( + self.state.remote_consistent_lsn, + self.state.peer_horizon_lsn, ); + if wal_backup_enabled { + horizon_lsn = min(horizon_lsn, self.state.backup_lsn); + } horizon_lsn.segment_number(self.state.server.wal_seg_size as usize) } } #[cfg(test)] mod tests { - use std::ops::Deref; - use super::*; use crate::wal_storage::Storage; + use std::ops::Deref; // fake storage for tests struct InMemoryState { @@ -1013,6 +1007,7 @@ mod tests { }; let wal_store = DummyWalStore { lsn: Lsn(0) }; let ztli = ZTimelineId::from([0u8; 16]); + let mut sk = SafeKeeper::new(ztli, storage, wal_store, NodeId(0)).unwrap(); // check voting for 1 is ok @@ -1028,6 +1023,7 @@ mod tests { let storage = InMemoryState { persisted_state: state, }; + sk = SafeKeeper::new(ztli, storage, sk.wal_store, NodeId(0)).unwrap(); // and ensure voting second time for 1 is not ok @@ -1045,6 +1041,7 @@ mod tests { }; let wal_store = DummyWalStore { lsn: Lsn(0) }; let ztli = ZTimelineId::from([0u8; 16]); + let mut sk = SafeKeeper::new(ztli, storage, wal_store, NodeId(0)).unwrap(); let mut ar_hdr = AppendRequestHeader { diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index d52dd6ea57..a89ed18071 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -315,7 +315,7 @@ impl ReplicationConn { } else { // TODO: also check once in a while whether we are walsender // to right pageserver. - if spg.timeline.get().check_deactivate(replica_id)? { + if spg.timeline.get().stop_walsender(replica_id)? { // Shut down, timeline is suspended. // TODO create proper error type for this bail!("end streaming to {:?}", spg.appname); diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 0953439bd8..74a61410fd 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -8,6 +8,7 @@ use lazy_static::lazy_static; use postgres_ffi::xlog_utils::XLogSegNo; use serde::Serialize; +use tokio::sync::watch; use std::cmp::{max, min}; use std::collections::HashMap; @@ -15,7 +16,7 @@ use std::fs::{self}; use std::sync::{Arc, Condvar, Mutex, MutexGuard}; use std::time::Duration; -use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::mpsc::{Sender, UnboundedSender}; use tracing::*; use utils::{ @@ -25,13 +26,13 @@ use utils::{ }; use crate::callmemaybe::{CallmeEvent, SubscriptionStateKey}; - use crate::control_file; use crate::safekeeper::{ AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState, SafekeeperMemState, }; use crate::send_wal::HotStandbyFeedback; + use crate::wal_storage; use crate::wal_storage::Storage as wal_storage_iface; use crate::SafeKeeperConf; @@ -81,10 +82,14 @@ struct SharedState { notified_commit_lsn: Lsn, /// State of replicas replicas: Vec>, - /// Inactive clusters shouldn't occupy any resources, so timeline is - /// activated whenever there is a compute connection or pageserver is not - /// caughtup (it must have latest WAL for new compute start) and suspended - /// otherwise. + /// True when WAL backup launcher oversees the timeline, making sure WAL is + /// offloaded, allows to bother launcher less. + wal_backup_active: bool, + /// True whenever there is at least some pending activity on timeline: live + /// compute connection, pageserver is not caughtup (it must have latest WAL + /// for new compute start) or WAL backuping is not finished. Practically it + /// means safekeepers broadcast info to peers about the timeline, old WAL is + /// trimmed. /// /// TODO: it might be better to remove tli completely from GlobalTimelines /// when tli is inactive instead of having this flag. @@ -103,6 +108,7 @@ impl SharedState { ) -> Result { let state = SafeKeeperState::new(zttid, peer_ids); let control_store = control_file::FileStorage::create_new(zttid, conf, state)?; + let wal_store = wal_storage::PhysicalStorage::new(zttid, conf); let sk = SafeKeeper::new(zttid.timeline_id, control_store, wal_store, conf.my_id)?; @@ -110,6 +116,7 @@ impl SharedState { notified_commit_lsn: Lsn(0), sk, replicas: Vec::new(), + wal_backup_active: false, active: false, num_computes: 0, pageserver_connstr: None, @@ -129,15 +136,62 @@ impl SharedState { notified_commit_lsn: Lsn(0), sk: SafeKeeper::new(zttid.timeline_id, control_store, wal_store, conf.my_id)?, replicas: Vec::new(), + wal_backup_active: false, active: false, num_computes: 0, pageserver_connstr: None, last_removed_segno: 0, }) } + fn is_active(&self) -> bool { + self.is_wal_backup_required() + // FIXME: add tracking of relevant pageservers and check them here individually, + // otherwise migration won't work (we suspend too early). + || self.sk.inmem.remote_consistent_lsn <= self.sk.inmem.commit_lsn + } - /// Activate the timeline: start/change walsender (via callmemaybe). - fn activate( + /// Mark timeline active/inactive and return whether s3 offloading requires + /// start/stop action. + fn update_status(&mut self) -> bool { + self.active = self.is_active(); + self.is_wal_backup_action_pending() + } + + /// Should we run s3 offloading in current state? + fn is_wal_backup_required(&self) -> bool { + let seg_size = self.get_wal_seg_size(); + self.num_computes > 0 || + // Currently only the whole segment is offloaded, so compare segment numbers. + (self.sk.inmem.commit_lsn.segment_number(seg_size) > + self.sk.inmem.backup_lsn.segment_number(seg_size)) + } + + /// Is current state of s3 offloading is not what it ought to be? + fn is_wal_backup_action_pending(&self) -> bool { + let res = self.wal_backup_active != self.is_wal_backup_required(); + if res { + let action_pending = if self.is_wal_backup_required() { + "start" + } else { + "stop" + }; + trace!( + "timeline {} s3 offloading action {} pending: num_computes={}, commit_lsn={}, backup_lsn={}", + self.sk.state.timeline_id, action_pending, self.num_computes, self.sk.inmem.commit_lsn, self.sk.inmem.backup_lsn + ); + } + res + } + + /// Returns whether s3 offloading is required and sets current status as + /// matching. + fn wal_backup_attend(&mut self) -> bool { + self.wal_backup_active = self.is_wal_backup_required(); + self.wal_backup_active + } + + /// start/change walsender (via callmemaybe). + fn callmemaybe_sub( &mut self, zttid: &ZTenantTimelineId, pageserver_connstr: Option<&String>, @@ -179,42 +233,42 @@ impl SharedState { ); } self.pageserver_connstr = pageserver_connstr.map(|c| c.to_owned()); - self.active = true; Ok(()) } /// Deactivate the timeline: stop callmemaybe. - fn deactivate( + fn callmemaybe_unsub( &mut self, zttid: &ZTenantTimelineId, callmemaybe_tx: &UnboundedSender, ) -> Result<()> { - if self.active { - if let Some(ref pageserver_connstr) = self.pageserver_connstr { - let subscription_key = SubscriptionStateKey::new( - zttid.tenant_id, - zttid.timeline_id, - pageserver_connstr.to_owned(), - ); - callmemaybe_tx - .send(CallmeEvent::Unsubscribe(subscription_key)) - .unwrap_or_else(|e| { - error!( - "failed to send Unsubscribe request to callmemaybe thread {}", - e - ); - }); - info!( - "timeline {} is unsubscribed from callmemaybe to {}", - zttid.timeline_id, - self.pageserver_connstr.as_ref().unwrap() - ); - } - self.active = false; + if let Some(ref pageserver_connstr) = self.pageserver_connstr { + let subscription_key = SubscriptionStateKey::new( + zttid.tenant_id, + zttid.timeline_id, + pageserver_connstr.to_owned(), + ); + callmemaybe_tx + .send(CallmeEvent::Unsubscribe(subscription_key)) + .unwrap_or_else(|e| { + error!( + "failed to send Unsubscribe request to callmemaybe thread {}", + e + ); + }); + info!( + "timeline {} is unsubscribed from callmemaybe to {}", + zttid.timeline_id, + self.pageserver_connstr.as_ref().unwrap() + ); } Ok(()) } + fn get_wal_seg_size(&self) -> usize { + self.sk.state.server.wal_seg_size as usize + } + /// Get combined state of all alive replicas pub fn get_replicas_state(&self) -> ReplicaState { let mut acc = ReplicaState::new(); @@ -278,6 +332,13 @@ impl SharedState { pub struct Timeline { pub zttid: ZTenantTimelineId, pub callmemaybe_tx: UnboundedSender, + /// Sending here asks for wal backup launcher attention (start/stop + /// offloading). Sending zttid instead of concrete command allows to do + /// sending without timeline lock. + wal_backup_launcher_tx: Sender, + commit_lsn_watch_tx: watch::Sender, + /// For breeding receivers. + commit_lsn_watch_rx: watch::Receiver, mutex: Mutex, /// conditional variable used to notify wal senders cond: Condvar, @@ -287,11 +348,17 @@ impl Timeline { fn new( zttid: ZTenantTimelineId, callmemaybe_tx: UnboundedSender, + wal_backup_launcher_tx: Sender, shared_state: SharedState, ) -> Timeline { + let (commit_lsn_watch_tx, commit_lsn_watch_rx) = + watch::channel(shared_state.sk.inmem.commit_lsn); Timeline { zttid, callmemaybe_tx, + wal_backup_launcher_tx, + commit_lsn_watch_tx, + commit_lsn_watch_rx, mutex: Mutex::new(shared_state), cond: Condvar::new(), } @@ -301,13 +368,21 @@ impl Timeline { /// not running yet. /// Can fail only if channel to a static thread got closed, which is not normal at all. pub fn on_compute_connect(&self, pageserver_connstr: Option<&String>) -> Result<()> { - let mut shared_state = self.mutex.lock().unwrap(); - shared_state.num_computes += 1; - // FIXME: currently we always adopt latest pageserver connstr, but we - // should have kind of generations assigned by compute to distinguish - // the latest one or even pass it through consensus to reliably deliver - // to all safekeepers. - shared_state.activate(&self.zttid, pageserver_connstr, &self.callmemaybe_tx)?; + let is_wal_backup_action_pending: bool; + { + let mut shared_state = self.mutex.lock().unwrap(); + shared_state.num_computes += 1; + is_wal_backup_action_pending = shared_state.update_status(); + // FIXME: currently we always adopt latest pageserver connstr, but we + // should have kind of generations assigned by compute to distinguish + // the latest one or even pass it through consensus to reliably deliver + // to all safekeepers. + shared_state.callmemaybe_sub(&self.zttid, pageserver_connstr, &self.callmemaybe_tx)?; + } + // Wake up wal backup launcher, if offloading not started yet. + if is_wal_backup_action_pending { + self.wal_backup_launcher_tx.blocking_send(self.zttid)?; + } Ok(()) } @@ -315,38 +390,43 @@ impl Timeline { /// pageserver doesn't need catchup. /// Can fail only if channel to a static thread got closed, which is not normal at all. pub fn on_compute_disconnect(&self) -> Result<()> { - let mut shared_state = self.mutex.lock().unwrap(); - shared_state.num_computes -= 1; - // If there is no pageserver, can suspend right away; otherwise let - // walsender do that. - if shared_state.num_computes == 0 && shared_state.pageserver_connstr.is_none() { - shared_state.deactivate(&self.zttid, &self.callmemaybe_tx)?; + let is_wal_backup_action_pending: bool; + { + let mut shared_state = self.mutex.lock().unwrap(); + shared_state.num_computes -= 1; + is_wal_backup_action_pending = shared_state.update_status(); + } + // Wake up wal backup launcher, if it is time to stop the offloading. + if is_wal_backup_action_pending { + self.wal_backup_launcher_tx.blocking_send(self.zttid)?; } Ok(()) } - /// Deactivate tenant if there is no computes and pageserver is caughtup, - /// assuming the pageserver status is in replica_id. - /// Returns true if deactivated. - pub fn check_deactivate(&self, replica_id: usize) -> Result { + /// Whether we still need this walsender running? + /// TODO: check this pageserver is actually interested in this timeline. + pub fn stop_walsender(&self, replica_id: usize) -> Result { let mut shared_state = self.mutex.lock().unwrap(); - if !shared_state.active { - // already suspended - return Ok(true); - } if shared_state.num_computes == 0 { let replica_state = shared_state.replicas[replica_id].unwrap(); - let deactivate = shared_state.notified_commit_lsn == Lsn(0) || // no data at all yet - (replica_state.last_received_lsn != Lsn::MAX && // Lsn::MAX means that we don't know the latest LSN yet. - replica_state.last_received_lsn >= shared_state.sk.inmem.commit_lsn); - if deactivate { - shared_state.deactivate(&self.zttid, &self.callmemaybe_tx)?; + let stop = shared_state.notified_commit_lsn == Lsn(0) || // no data at all yet + (replica_state.remote_consistent_lsn != Lsn::MAX && // Lsn::MAX means that we don't know the latest LSN yet. + replica_state.remote_consistent_lsn >= shared_state.sk.inmem.commit_lsn); + if stop { + shared_state.callmemaybe_unsub(&self.zttid, &self.callmemaybe_tx)?; return Ok(true); } } Ok(false) } + /// Returns whether s3 offloading is required and sets current status as + /// matching it. + pub fn wal_backup_attend(&self) -> bool { + let mut shared_state = self.mutex.lock().unwrap(); + shared_state.wal_backup_attend() + } + /// Deactivates the timeline, assuming it is being deleted. /// Returns whether the timeline was already active. /// @@ -354,10 +434,14 @@ impl Timeline { /// will stop by themselves eventually (possibly with errors, but no panics). There should be no /// compute threads (as we're deleting the timeline), actually. Some WAL may be left unsent, but /// we're deleting the timeline anyway. - pub fn deactivate_for_delete(&self) -> Result { - let mut shared_state = self.mutex.lock().unwrap(); - let was_active = shared_state.active; - shared_state.deactivate(&self.zttid, &self.callmemaybe_tx)?; + pub async fn deactivate_for_delete(&self) -> Result { + let was_active: bool; + { + let mut shared_state = self.mutex.lock().unwrap(); + was_active = shared_state.active; + shared_state.callmemaybe_unsub(&self.zttid, &self.callmemaybe_tx)?; + } + self.wal_backup_launcher_tx.send(self.zttid).await?; Ok(was_active) } @@ -391,6 +475,7 @@ impl Timeline { } // Notify caught-up WAL senders about new WAL data received + // TODO: replace-unify it with commit_lsn_watch. fn notify_wal_senders(&self, shared_state: &mut MutexGuard) { if shared_state.notified_commit_lsn < shared_state.sk.inmem.commit_lsn { shared_state.notified_commit_lsn = shared_state.sk.inmem.commit_lsn; @@ -398,12 +483,17 @@ impl Timeline { } } + pub fn get_commit_lsn_watch_rx(&self) -> watch::Receiver { + self.commit_lsn_watch_rx.clone() + } + /// Pass arrived message to the safekeeper. pub fn process_msg( &self, msg: &ProposerAcceptorMessage, ) -> Result> { let mut rmsg: Option; + let commit_lsn: Lsn; { let mut shared_state = self.mutex.lock().unwrap(); rmsg = shared_state.sk.process_msg(msg)?; @@ -419,15 +509,31 @@ impl Timeline { // Ping wal sender that new data might be available. self.notify_wal_senders(&mut shared_state); + commit_lsn = shared_state.sk.inmem.commit_lsn; } + self.commit_lsn_watch_tx.send(commit_lsn)?; Ok(rmsg) } + pub fn get_wal_seg_size(&self) -> usize { + self.mutex.lock().unwrap().get_wal_seg_size() + } + pub fn get_state(&self) -> (SafekeeperMemState, SafeKeeperState) { let shared_state = self.mutex.lock().unwrap(); (shared_state.sk.inmem.clone(), shared_state.sk.state.clone()) } + pub fn get_wal_backup_lsn(&self) -> Lsn { + self.mutex.lock().unwrap().sk.inmem.backup_lsn + } + + pub fn set_wal_backup_lsn(&self, backup_lsn: Lsn) { + self.mutex.lock().unwrap().sk.inmem.backup_lsn = backup_lsn; + // we should check whether to shut down offloader, but this will be done + // soon by peer communication anyway. + } + /// Prepare public safekeeper info for reporting. pub fn get_public_info(&self, conf: &SafeKeeperConf) -> anyhow::Result { let shared_state = self.mutex.lock().unwrap(); @@ -436,7 +542,6 @@ impl Timeline { flush_lsn: Some(shared_state.sk.wal_store.flush_lsn()), // note: this value is not flushed to control file yet and can be lost commit_lsn: Some(shared_state.sk.inmem.commit_lsn), - s3_wal_lsn: Some(shared_state.sk.inmem.s3_wal_lsn), // TODO: rework feedbacks to avoid max here remote_consistent_lsn: Some(max( shared_state.get_replicas_state().remote_consistent_lsn, @@ -444,14 +549,35 @@ impl Timeline { )), peer_horizon_lsn: Some(shared_state.sk.inmem.peer_horizon_lsn), safekeeper_connection_string: Some(conf.listen_pg_addr.clone()), + backup_lsn: Some(shared_state.sk.inmem.backup_lsn), }) } /// Update timeline state with peer safekeeper data. - pub fn record_safekeeper_info(&self, sk_info: &SkTimelineInfo, _sk_id: NodeId) -> Result<()> { - let mut shared_state = self.mutex.lock().unwrap(); - shared_state.sk.record_safekeeper_info(sk_info)?; - self.notify_wal_senders(&mut shared_state); + pub async fn record_safekeeper_info( + &self, + sk_info: &SkTimelineInfo, + _sk_id: NodeId, + ) -> Result<()> { + let is_wal_backup_action_pending: bool; + let commit_lsn: Lsn; + { + let mut shared_state = self.mutex.lock().unwrap(); + // WAL seg size not initialized yet (no message from compute ever + // received), can't do much without it. + if shared_state.get_wal_seg_size() == 0 { + return Ok(()); + } + shared_state.sk.record_safekeeper_info(sk_info)?; + self.notify_wal_senders(&mut shared_state); + is_wal_backup_action_pending = shared_state.update_status(); + commit_lsn = shared_state.sk.inmem.commit_lsn; + } + self.commit_lsn_watch_tx.send(commit_lsn)?; + // Wake up wal backup launcher, if it is time to stop the offloading. + if is_wal_backup_action_pending { + self.wal_backup_launcher_tx.send(self.zttid).await?; + } Ok(()) } @@ -476,16 +602,16 @@ impl Timeline { shared_state.sk.wal_store.flush_lsn() } - pub fn remove_old_wal(&self, s3_offload_enabled: bool) -> Result<()> { + pub fn remove_old_wal(&self, wal_backup_enabled: bool) -> Result<()> { let horizon_segno: XLogSegNo; let remover: Box Result<(), anyhow::Error>>; { let shared_state = self.mutex.lock().unwrap(); // WAL seg size not initialized yet, no WAL exists. - if shared_state.sk.state.server.wal_seg_size == 0 { + if shared_state.get_wal_seg_size() == 0 { return Ok(()); } - horizon_segno = shared_state.sk.get_horizon_segno(s3_offload_enabled); + horizon_segno = shared_state.sk.get_horizon_segno(wal_backup_enabled); remover = shared_state.sk.wal_store.remove_up_to(); if horizon_segno <= 1 || horizon_segno <= shared_state.last_removed_segno { return Ok(()); @@ -522,12 +648,14 @@ impl TimelineTools for Option> { struct GlobalTimelinesState { timelines: HashMap>, callmemaybe_tx: Option>, + wal_backup_launcher_tx: Option>, } lazy_static! { static ref TIMELINES_STATE: Mutex = Mutex::new(GlobalTimelinesState { timelines: HashMap::new(), - callmemaybe_tx: None + callmemaybe_tx: None, + wal_backup_launcher_tx: None, }); } @@ -541,10 +669,15 @@ pub struct TimelineDeleteForceResult { pub struct GlobalTimelines; impl GlobalTimelines { - pub fn set_callmemaybe_tx(callmemaybe_tx: UnboundedSender) { + pub fn init( + callmemaybe_tx: UnboundedSender, + wal_backup_launcher_tx: Sender, + ) { let mut state = TIMELINES_STATE.lock().unwrap(); assert!(state.callmemaybe_tx.is_none()); state.callmemaybe_tx = Some(callmemaybe_tx); + assert!(state.wal_backup_launcher_tx.is_none()); + state.wal_backup_launcher_tx = Some(wal_backup_launcher_tx); } fn create_internal( @@ -559,12 +692,14 @@ impl GlobalTimelines { // TODO: check directory existence let dir = conf.timeline_dir(&zttid); fs::create_dir_all(dir)?; + let shared_state = SharedState::create(conf, &zttid, peer_ids) .context("failed to create shared state")?; let new_tli = Arc::new(Timeline::new( zttid, state.callmemaybe_tx.as_ref().unwrap().clone(), + state.wal_backup_launcher_tx.as_ref().unwrap().clone(), shared_state, )); state.timelines.insert(zttid, Arc::clone(&new_tli)); @@ -594,8 +729,7 @@ impl GlobalTimelines { match state.timelines.get(&zttid) { Some(result) => Ok(Arc::clone(result)), None => { - let shared_state = - SharedState::restore(conf, &zttid).context("failed to restore shared state"); + let shared_state = SharedState::restore(conf, &zttid); let shared_state = match shared_state { Ok(shared_state) => shared_state, @@ -617,6 +751,7 @@ impl GlobalTimelines { let new_tli = Arc::new(Timeline::new( zttid, state.callmemaybe_tx.as_ref().unwrap().clone(), + state.wal_backup_launcher_tx.as_ref().unwrap().clone(), shared_state, )); state.timelines.insert(zttid, Arc::clone(&new_tli)); @@ -625,6 +760,12 @@ impl GlobalTimelines { } } + /// Get loaded timeline, if it exists. + pub fn get_loaded(zttid: ZTenantTimelineId) -> Option> { + let state = TIMELINES_STATE.lock().unwrap(); + state.timelines.get(&zttid).map(Arc::clone) + } + /// Get ZTenantTimelineIDs of all active timelines. pub fn get_active_timelines() -> Vec { let state = TIMELINES_STATE.lock().unwrap(); @@ -665,22 +806,23 @@ impl GlobalTimelines { /// b) an HTTP GET request about the timeline is made and it's able to restore the current state, or /// c) an HTTP POST request for timeline creation is made after the timeline is already deleted. /// TODO: ensure all of the above never happens. - pub fn delete_force( + pub async fn delete_force( conf: &SafeKeeperConf, zttid: &ZTenantTimelineId, ) -> Result { info!("deleting timeline {}", zttid); - let was_active = match TIMELINES_STATE.lock().unwrap().timelines.remove(zttid) { - None => false, - Some(tli) => tli.deactivate_for_delete()?, - }; + let timeline = TIMELINES_STATE.lock().unwrap().timelines.remove(zttid); + let mut was_active = false; + if let Some(tli) = timeline { + was_active = tli.deactivate_for_delete().await?; + } GlobalTimelines::delete_force_internal(conf, zttid, was_active) } /// Deactivates and deletes all timelines for the tenant, see `delete()`. /// Returns map of all timelines which the tenant had, `true` if a timeline was active. /// There may be a race if new timelines are created simultaneously. - pub fn delete_force_all_for_tenant( + pub async fn delete_force_all_for_tenant( conf: &SafeKeeperConf, tenant_id: &ZTenantId, ) -> Result> { @@ -691,14 +833,15 @@ impl GlobalTimelines { let timelines = &mut TIMELINES_STATE.lock().unwrap().timelines; for (&zttid, tli) in timelines.iter() { if zttid.tenant_id == *tenant_id { - to_delete.insert(zttid, tli.deactivate_for_delete()?); + to_delete.insert(zttid, tli.clone()); } } // TODO: test that the correct subset of timelines is removed. It's complicated because they are implicitly created currently. timelines.retain(|zttid, _| !to_delete.contains_key(zttid)); } let mut deleted = HashMap::new(); - for (zttid, was_active) in to_delete { + for (zttid, timeline) in to_delete { + let was_active = timeline.deactivate_for_delete().await?; deleted.insert( zttid, GlobalTimelines::delete_force_internal(conf, &zttid, was_active)?, diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs new file mode 100644 index 0000000000..ef8ebe14e1 --- /dev/null +++ b/safekeeper/src/wal_backup.rs @@ -0,0 +1,418 @@ +use anyhow::{Context, Result}; +use tokio::task::JoinHandle; + +use std::cmp::min; +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::time::Duration; + +use postgres_ffi::xlog_utils::{XLogFileName, XLogSegNo, XLogSegNoOffsetToRecPtr, PG_TLI}; +use remote_storage::{GenericRemoteStorage, RemoteStorage}; +use tokio::fs::File; +use tokio::runtime::Builder; + +use tokio::select; +use tokio::sync::mpsc::{self, Receiver, Sender}; +use tokio::sync::watch; +use tokio::time::sleep; +use tracing::*; + +use utils::{lsn::Lsn, zid::ZTenantTimelineId}; + +use crate::broker::{Election, ElectionLeader}; +use crate::timeline::{GlobalTimelines, Timeline}; +use crate::{broker, SafeKeeperConf}; + +use once_cell::sync::OnceCell; + +const BACKUP_ELECTION_NAME: &str = "WAL_BACKUP"; + +const BROKER_CONNECTION_RETRY_DELAY_MS: u64 = 1000; + +const UPLOAD_FAILURE_RETRY_MIN_MS: u64 = 10; +const UPLOAD_FAILURE_RETRY_MAX_MS: u64 = 5000; + +pub fn wal_backup_launcher_thread_main( + conf: SafeKeeperConf, + wal_backup_launcher_rx: Receiver, +) { + let rt = Builder::new_multi_thread() + .worker_threads(conf.backup_runtime_threads) + .enable_all() + .build() + .expect("failed to create wal backup runtime"); + + rt.block_on(async { + wal_backup_launcher_main_loop(conf, wal_backup_launcher_rx).await; + }); +} + +/// Check whether wal backup is required for timeline and mark that launcher is +/// aware of current status (if timeline exists). +fn is_wal_backup_required(zttid: ZTenantTimelineId) -> bool { + if let Some(tli) = GlobalTimelines::get_loaded(zttid) { + tli.wal_backup_attend() + } else { + false + } +} + +struct WalBackupTaskHandle { + shutdown_tx: Sender<()>, + handle: JoinHandle<()>, +} + +/// Sits on wal_backup_launcher_rx and starts/stops per timeline wal backup +/// tasks. Having this in separate task simplifies locking, allows to reap +/// panics and separate elections from offloading itself. +async fn wal_backup_launcher_main_loop( + conf: SafeKeeperConf, + mut wal_backup_launcher_rx: Receiver, +) { + info!( + "wal backup launcher started, remote config {:?}", + conf.remote_storage + ); + + let conf_ = conf.clone(); + REMOTE_STORAGE.get_or_init(|| { + conf_.remote_storage.as_ref().map(|c| { + GenericRemoteStorage::new(conf_.workdir, c).expect("failed to create remote storage") + }) + }); + + let mut tasks: HashMap = HashMap::new(); + + loop { + // channel is never expected to get closed + let zttid = wal_backup_launcher_rx.recv().await.unwrap(); + let is_wal_backup_required = is_wal_backup_required(zttid); + if conf.remote_storage.is_none() || !conf.wal_backup_enabled { + continue; /* just drain the channel and do nothing */ + } + // do we need to do anything at all? + if is_wal_backup_required != tasks.contains_key(&zttid) { + if is_wal_backup_required { + // need to start the task + info!("starting wal backup task for {}", zttid); + + // TODO: decide who should offload in launcher itself by simply checking current state + let election_name = broker::get_campaign_name( + BACKUP_ELECTION_NAME.to_string(), + conf.broker_etcd_prefix.clone(), + &zttid, + ); + let my_candidate_name = broker::get_candiate_name(conf.my_id); + let election = broker::Election::new( + election_name, + my_candidate_name, + conf.broker_endpoints.clone(), + ); + + let (shutdown_tx, shutdown_rx) = mpsc::channel(1); + let timeline_dir = conf.timeline_dir(&zttid); + + let handle = tokio::spawn( + backup_task_main(zttid, timeline_dir, shutdown_rx, election) + .instrument(info_span!("WAL backup", zttid = %zttid)), + ); + + tasks.insert( + zttid, + WalBackupTaskHandle { + shutdown_tx, + handle, + }, + ); + } else { + // need to stop the task + info!("stopping wal backup task for {}", zttid); + + let wb_handle = tasks.remove(&zttid).unwrap(); + // Tell the task to shutdown. Error means task exited earlier, that's ok. + let _ = wb_handle.shutdown_tx.send(()).await; + // Await the task itself. TODO: restart panicked tasks earlier. + // Hm, why I can't await on reference to handle? + if let Err(e) = wb_handle.handle.await { + warn!("WAL backup task for {} panicked: {}", zttid, e); + } + } + } + } +} + +struct WalBackupTask { + timeline: Arc, + timeline_dir: PathBuf, + wal_seg_size: usize, + commit_lsn_watch_rx: watch::Receiver, + leader: Option, + election: Election, +} + +/// Offload single timeline. +async fn backup_task_main( + zttid: ZTenantTimelineId, + timeline_dir: PathBuf, + mut shutdown_rx: Receiver<()>, + election: Election, +) { + info!("started"); + let timeline: Arc = if let Some(tli) = GlobalTimelines::get_loaded(zttid) { + tli + } else { + /* Timeline could get deleted while task was starting, just exit then. */ + info!("no timeline, exiting"); + return; + }; + + let mut wb = WalBackupTask { + wal_seg_size: timeline.get_wal_seg_size(), + commit_lsn_watch_rx: timeline.get_commit_lsn_watch_rx(), + timeline, + timeline_dir, + leader: None, + election, + }; + + // task is spinned up only when wal_seg_size already initialized + assert!(wb.wal_seg_size > 0); + + let mut canceled = false; + select! { + _ = wb.run() => {} + _ = shutdown_rx.recv() => { + canceled = true; + } + } + if let Some(l) = wb.leader { + l.give_up().await; + } + info!("task {}", if canceled { "canceled" } else { "terminated" }); +} + +impl WalBackupTask { + async fn run(&mut self) { + let mut backup_lsn = Lsn(0); + + // election loop + loop { + let mut retry_attempt = 0u32; + + if let Some(l) = self.leader.take() { + l.give_up().await; + } + + match broker::get_leader(&self.election).await { + Ok(l) => { + self.leader = Some(l); + } + Err(e) => { + error!("error during leader election {:?}", e); + sleep(Duration::from_millis(BROKER_CONNECTION_RETRY_DELAY_MS)).await; + continue; + } + } + + // offload loop + loop { + if retry_attempt == 0 { + // wait for new WAL to arrive + if let Err(e) = self.commit_lsn_watch_rx.changed().await { + // should never happen, as we hold Arc to timeline. + error!("commit_lsn watch shut down: {:?}", e); + return; + } + } else { + // or just sleep if we errored previously + let mut retry_delay = UPLOAD_FAILURE_RETRY_MAX_MS; + if let Some(backoff_delay) = + UPLOAD_FAILURE_RETRY_MIN_MS.checked_shl(retry_attempt) + { + retry_delay = min(retry_delay, backoff_delay); + } + sleep(Duration::from_millis(retry_delay)).await; + } + + let commit_lsn = *self.commit_lsn_watch_rx.borrow(); + assert!( + commit_lsn >= backup_lsn, + "backup lsn should never pass commit lsn" + ); + + if backup_lsn.segment_number(self.wal_seg_size) + == commit_lsn.segment_number(self.wal_seg_size) + { + continue; /* nothing to do, common case as we wake up on every commit_lsn bump */ + } + // Perhaps peers advanced the position, check shmem value. + backup_lsn = self.timeline.get_wal_backup_lsn(); + if backup_lsn.segment_number(self.wal_seg_size) + == commit_lsn.segment_number(self.wal_seg_size) + { + continue; + } + + if let Some(l) = self.leader.as_mut() { + // Optimization idea for later: + // Avoid checking election leader every time by returning current lease grant expiration time + // Re-check leadership only after expiration time, + // such approach woud reduce overhead on write-intensive workloads + + match l + .check_am_i( + self.election.election_name.clone(), + self.election.candidate_name.clone(), + ) + .await + { + Ok(leader) => { + if !leader { + info!("leader has changed"); + break; + } + } + Err(e) => { + warn!("error validating leader, {:?}", e); + break; + } + } + } + + match backup_lsn_range( + backup_lsn, + commit_lsn, + self.wal_seg_size, + &self.timeline_dir, + ) + .await + { + Ok(backup_lsn_result) => { + backup_lsn = backup_lsn_result; + self.timeline.set_wal_backup_lsn(backup_lsn_result); + retry_attempt = 0; + } + Err(e) => { + error!( + "failed while offloading range {}-{}: {:?}", + backup_lsn, commit_lsn, e + ); + + retry_attempt = min(retry_attempt + 1, u32::MAX); + } + } + } + } + } +} + +pub async fn backup_lsn_range( + start_lsn: Lsn, + end_lsn: Lsn, + wal_seg_size: usize, + timeline_dir: &Path, +) -> Result { + let mut res = start_lsn; + let segments = get_segments(start_lsn, end_lsn, wal_seg_size); + for s in &segments { + backup_single_segment(s, timeline_dir) + .await + .with_context(|| format!("offloading segno {}", s.seg_no))?; + + res = s.end_lsn; + } + info!( + "offloaded segnos {:?} up to {}, previous backup_lsn {}", + segments.iter().map(|&s| s.seg_no).collect::>(), + end_lsn, + start_lsn, + ); + Ok(res) +} + +async fn backup_single_segment(seg: &Segment, timeline_dir: &Path) -> Result<()> { + let segment_file_name = seg.file_path(timeline_dir)?; + + backup_object(&segment_file_name, seg.size()).await?; + debug!("Backup of {} done", segment_file_name.display()); + + Ok(()) +} + +#[derive(Debug, Copy, Clone)] +pub struct Segment { + seg_no: XLogSegNo, + start_lsn: Lsn, + end_lsn: Lsn, +} + +impl Segment { + pub fn new(seg_no: u64, start_lsn: Lsn, end_lsn: Lsn) -> Self { + Self { + seg_no, + start_lsn, + end_lsn, + } + } + + pub fn object_name(self) -> String { + XLogFileName(PG_TLI, self.seg_no, self.size()) + } + + pub fn file_path(self, timeline_dir: &Path) -> Result { + Ok(timeline_dir.join(self.object_name())) + } + + pub fn size(self) -> usize { + (u64::from(self.end_lsn) - u64::from(self.start_lsn)) as usize + } +} + +fn get_segments(start: Lsn, end: Lsn, seg_size: usize) -> Vec { + let first_seg = start.segment_number(seg_size); + let last_seg = end.segment_number(seg_size); + + let res: Vec = (first_seg..last_seg) + .map(|s| { + let start_lsn = XLogSegNoOffsetToRecPtr(s, 0, seg_size); + let end_lsn = XLogSegNoOffsetToRecPtr(s + 1, 0, seg_size); + Segment::new(s, Lsn::from(start_lsn), Lsn::from(end_lsn)) + }) + .collect(); + res +} + +static REMOTE_STORAGE: OnceCell> = OnceCell::new(); + +async fn backup_object(source_file: &Path, size: usize) -> Result<()> { + let storage = REMOTE_STORAGE.get().expect("failed to get remote storage"); + + let file = File::open(&source_file).await?; + + // Storage is initialized by launcher at ths point. + match storage.as_ref().unwrap() { + GenericRemoteStorage::Local(local_storage) => { + let destination = local_storage.remote_object_id(source_file)?; + + debug!( + "local upload about to start from {} to {}", + source_file.display(), + destination.display() + ); + local_storage.upload(file, size, &destination, None).await + } + GenericRemoteStorage::S3(s3_storage) => { + let s3key = s3_storage.remote_object_id(source_file)?; + + debug!( + "S3 upload about to start from {} to {:?}", + source_file.display(), + s3key + ); + s3_storage.upload(file, size, &s3key, None).await + } + }?; + + Ok(()) +} diff --git a/test_runner/batch_others/test_wal_acceptor.py b/test_runner/batch_others/test_wal_acceptor.py index e1b7bd91ee..fc192c28e8 100644 --- a/test_runner/batch_others/test_wal_acceptor.py +++ b/test_runner/batch_others/test_wal_acceptor.py @@ -12,7 +12,7 @@ from contextlib import closing from dataclasses import dataclass, field from multiprocessing import Process, Value from pathlib import Path -from fixtures.zenith_fixtures import PgBin, Etcd, Postgres, Safekeeper, ZenithEnv, ZenithEnvBuilder, PortDistributor, SafekeeperPort, zenith_binpath, PgProtocol +from fixtures.zenith_fixtures import PgBin, Etcd, Postgres, RemoteStorageUsers, Safekeeper, ZenithEnv, ZenithEnvBuilder, PortDistributor, SafekeeperPort, zenith_binpath, PgProtocol from fixtures.utils import get_dir_size, lsn_to_hex, mkdir_if_needed, lsn_from_hex from fixtures.log_helper import log from typing import List, Optional, Any @@ -401,7 +401,7 @@ def test_wal_removal(zenith_env_builder: ZenithEnvBuilder): http_cli = env.safekeepers[0].http_client() # Pretend WAL is offloaded to s3. - http_cli.record_safekeeper_info(tenant_id, timeline_id, {'s3_wal_lsn': 'FFFFFFFF/FEFFFFFF'}) + http_cli.record_safekeeper_info(tenant_id, timeline_id, {'backup_lsn': 'FFFFFFFF/FEFFFFFF'}) # wait till first segment is removed on all safekeepers started_at = time.time() @@ -414,6 +414,56 @@ def test_wal_removal(zenith_env_builder: ZenithEnvBuilder): time.sleep(0.5) +@pytest.mark.parametrize('storage_type', ['mock_s3', 'local_fs']) +def test_wal_backup(zenith_env_builder: ZenithEnvBuilder, storage_type: str): + zenith_env_builder.num_safekeepers = 3 + if storage_type == 'local_fs': + zenith_env_builder.enable_local_fs_remote_storage() + elif storage_type == 'mock_s3': + zenith_env_builder.enable_s3_mock_remote_storage('test_safekeepers_wal_backup') + else: + raise RuntimeError(f'Unknown storage type: {storage_type}') + zenith_env_builder.remote_storage_users = RemoteStorageUsers.SAFEKEEPER + + env = zenith_env_builder.init_start() + + env.zenith_cli.create_branch('test_safekeepers_wal_backup') + pg = env.postgres.create_start('test_safekeepers_wal_backup') + + # learn zenith timeline from compute + tenant_id = pg.safe_psql("show zenith.zenith_tenant")[0][0] + timeline_id = pg.safe_psql("show zenith.zenith_timeline")[0][0] + + pg_conn = pg.connect() + cur = pg_conn.cursor() + cur.execute('create table t(key int, value text)') + + # Shut down subsequently each of safekeepers and fill a segment while sk is + # down; ensure segment gets offloaded by others. + offloaded_seg_end = ['0/2000000', '0/3000000', '0/4000000'] + for victim, seg_end in zip(env.safekeepers, offloaded_seg_end): + victim.stop() + # roughly fills one segment + cur.execute("insert into t select generate_series(1,250000), 'payload'") + live_sk = [sk for sk in env.safekeepers if sk != victim][0] + http_cli = live_sk.http_client() + + started_at = time.time() + while True: + tli_status = http_cli.timeline_status(tenant_id, timeline_id) + log.info(f"live sk status is {tli_status}") + + if lsn_from_hex(tli_status.backup_lsn) >= lsn_from_hex(seg_end): + break + elapsed = time.time() - started_at + if elapsed > 20: + raise RuntimeError( + f"timed out waiting {elapsed:.0f}s segment ending at {seg_end} get offloaded") + time.sleep(0.5) + + victim.start() + + class ProposerPostgres(PgProtocol): """Object for running postgres without ZenithEnv""" def __init__(self, diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index 7f5b2ad2aa..a2e8c82d30 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -1,6 +1,7 @@ from __future__ import annotations from dataclasses import field +from enum import Flag, auto import textwrap from cached_property import cached_property import asyncpg @@ -421,10 +422,51 @@ class MockS3Server: def secret_key(self) -> str: return 'test' + def access_env_vars(self) -> Dict[Any, Any]: + return { + 'AWS_ACCESS_KEY_ID': self.access_key(), + 'AWS_SECRET_ACCESS_KEY': self.secret_key(), + } + def kill(self): self.subprocess.kill() +@dataclass +class LocalFsStorage: + local_path: Path + + +@dataclass +class S3Storage: + bucket_name: str + bucket_region: str + endpoint: Optional[str] + + +RemoteStorage = Union[LocalFsStorage, S3Storage] + + +# serialize as toml inline table +def remote_storage_to_toml_inline_table(remote_storage): + if isinstance(remote_storage, LocalFsStorage): + res = f"local_path='{remote_storage.local_path}'" + elif isinstance(remote_storage, S3Storage): + res = f"bucket_name='{remote_storage.bucket_name}', bucket_region='{remote_storage.bucket_region}'" + if remote_storage.endpoint is not None: + res += f", endpoint='{remote_storage.endpoint}'" + else: + raise Exception(f'Unknown storage configuration {remote_storage}') + else: + raise Exception("invalid remote storage type") + return f"{{{res}}}" + + +class RemoteStorageUsers(Flag): + PAGESERVER = auto() + SAFEKEEPER = auto() + + class ZenithEnvBuilder: """ Builder object to create a Zenith runtime environment @@ -440,6 +482,7 @@ class ZenithEnvBuilder: broker: Etcd, mock_s3_server: MockS3Server, remote_storage: Optional[RemoteStorage] = None, + remote_storage_users: RemoteStorageUsers = RemoteStorageUsers.PAGESERVER, pageserver_config_override: Optional[str] = None, num_safekeepers: int = 1, pageserver_auth_enabled: bool = False, @@ -449,6 +492,7 @@ class ZenithEnvBuilder: self.rust_log_override = rust_log_override self.port_distributor = port_distributor self.remote_storage = remote_storage + self.remote_storage_users = remote_storage_users self.broker = broker self.mock_s3_server = mock_s3_server self.pageserver_config_override = pageserver_config_override @@ -497,9 +541,9 @@ class ZenithEnvBuilder: aws_access_key_id=self.mock_s3_server.access_key(), aws_secret_access_key=self.mock_s3_server.secret_key(), ).create_bucket(Bucket=bucket_name) - self.remote_storage = S3Storage(bucket=bucket_name, + self.remote_storage = S3Storage(bucket_name=bucket_name, endpoint=mock_endpoint, - region=mock_region) + bucket_region=mock_region) def __enter__(self): return self @@ -557,6 +601,7 @@ class ZenithEnv: self.safekeepers: List[Safekeeper] = [] self.broker = config.broker self.remote_storage = config.remote_storage + self.remote_storage_users = config.remote_storage_users # generate initial tenant ID here instead of letting 'zenith init' generate it, # so that we don't need to dig it out of the config file afterwards. @@ -605,8 +650,12 @@ class ZenithEnv: id = {id} pg_port = {port.pg} http_port = {port.http} - sync = false # Disable fsyncs to make the tests go faster - """) + sync = false # Disable fsyncs to make the tests go faster""") + if bool(self.remote_storage_users + & RemoteStorageUsers.SAFEKEEPER) and self.remote_storage is not None: + toml += textwrap.dedent(f""" + remote_storage = "{remote_storage_to_toml_inline_table(self.remote_storage)}" + """) safekeeper = Safekeeper(env=self, id=id, port=port) self.safekeepers.append(safekeeper) @@ -638,7 +687,7 @@ def _shared_simple_env(request: Any, mock_s3_server: MockS3Server, default_broker: Etcd) -> Iterator[ZenithEnv]: """ - Internal fixture backing the `zenith_simple_env` fixture. If TEST_SHARED_FIXTURES + # Internal fixture backing the `zenith_simple_env` fixture. If TEST_SHARED_FIXTURES is set, this is shared by all tests using `zenith_simple_env`. """ @@ -822,20 +871,6 @@ class PageserverPort: http: int -@dataclass -class LocalFsStorage: - root: Path - - -@dataclass -class S3Storage: - bucket: str - region: str - endpoint: Optional[str] - - -RemoteStorage = Union[LocalFsStorage, S3Storage] - CREATE_TIMELINE_ID_EXTRACTOR = re.compile(r"^Created timeline '(?P[^']+)'", re.MULTILINE) CREATE_TIMELINE_ID_EXTRACTOR = re.compile(r"^Created timeline '(?P[^']+)'", @@ -998,6 +1033,7 @@ class ZenithCli: append_pageserver_param_overrides( params_to_update=cmd, remote_storage=self.env.remote_storage, + remote_storage_users=self.env.remote_storage_users, pageserver_config_override=self.env.pageserver.config_override) res = self.raw_cli(cmd) @@ -1022,14 +1058,10 @@ class ZenithCli: append_pageserver_param_overrides( params_to_update=start_args, remote_storage=self.env.remote_storage, + remote_storage_users=self.env.remote_storage_users, pageserver_config_override=self.env.pageserver.config_override) - s3_env_vars = None - if self.env.s3_mock_server: - s3_env_vars = { - 'AWS_ACCESS_KEY_ID': self.env.s3_mock_server.access_key(), - 'AWS_SECRET_ACCESS_KEY': self.env.s3_mock_server.secret_key(), - } + s3_env_vars = self.env.s3_mock_server.access_env_vars() if self.env.s3_mock_server else None return self.raw_cli(start_args, extra_env_vars=s3_env_vars) def pageserver_stop(self, immediate=False) -> 'subprocess.CompletedProcess[str]': @@ -1041,7 +1073,8 @@ class ZenithCli: return self.raw_cli(cmd) def safekeeper_start(self, id: int) -> 'subprocess.CompletedProcess[str]': - return self.raw_cli(['safekeeper', 'start', str(id)]) + s3_env_vars = self.env.s3_mock_server.access_env_vars() if self.env.s3_mock_server else None + return self.raw_cli(['safekeeper', 'start', str(id)], extra_env_vars=s3_env_vars) def safekeeper_stop(self, id: Optional[int] = None, @@ -1237,22 +1270,13 @@ class ZenithPageserver(PgProtocol): def append_pageserver_param_overrides( params_to_update: List[str], remote_storage: Optional[RemoteStorage], + remote_storage_users: RemoteStorageUsers, pageserver_config_override: Optional[str] = None, ): - if remote_storage is not None: - if isinstance(remote_storage, LocalFsStorage): - pageserver_storage_override = f"local_path='{remote_storage.root}'" - elif isinstance(remote_storage, S3Storage): - pageserver_storage_override = f"bucket_name='{remote_storage.bucket}',\ - bucket_region='{remote_storage.region}'" - - if remote_storage.endpoint is not None: - pageserver_storage_override += f",endpoint='{remote_storage.endpoint}'" - - else: - raise Exception(f'Unknown storage configuration {remote_storage}') + if bool(remote_storage_users & RemoteStorageUsers.PAGESERVER) and remote_storage is not None: + remote_storage_toml_table = remote_storage_to_toml_inline_table(remote_storage) params_to_update.append( - f'--pageserver-config-override=remote_storage={{{pageserver_storage_override}}}') + f'--pageserver-config-override=remote_storage={remote_storage_toml_table}') env_overrides = os.getenv('ZENITH_PAGESERVER_OVERRIDES') if env_overrides is not None: @@ -1786,8 +1810,9 @@ class Safekeeper: class SafekeeperTimelineStatus: acceptor_epoch: int flush_lsn: str - remote_consistent_lsn: str timeline_start_lsn: str + backup_lsn: str + remote_consistent_lsn: str @dataclass @@ -1812,8 +1837,9 @@ class SafekeeperHttpClient(requests.Session): resj = res.json() return SafekeeperTimelineStatus(acceptor_epoch=resj['acceptor_state']['epoch'], flush_lsn=resj['flush_lsn'], - remote_consistent_lsn=resj['remote_consistent_lsn'], - timeline_start_lsn=resj['timeline_start_lsn']) + timeline_start_lsn=resj['timeline_start_lsn'], + backup_lsn=resj['backup_lsn'], + remote_consistent_lsn=resj['remote_consistent_lsn']) def record_safekeeper_info(self, tenant_id: str, timeline_id: str, body): res = self.post(