From 6cb14b4200429bc2eb50b5f9879918188965b156 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Tue, 10 May 2022 20:44:56 +0400 Subject: [PATCH] Optionally remove WAL on safekeepers without s3 offloading. And do that on staging, until offloading is merged. --- .circleci/ansible/production.hosts | 1 + .circleci/ansible/staging.hosts | 1 + .circleci/ansible/systemd/safekeeper.service | 2 +- safekeeper/src/bin/safekeeper.rs | 15 +++++++++++++++ safekeeper/src/lib.rs | 2 ++ safekeeper/src/remove_wal.rs | 2 +- safekeeper/src/safekeeper.rs | 9 +++++++-- safekeeper/src/timeline.rs | 4 ++-- 8 files changed, 30 insertions(+), 6 deletions(-) diff --git a/.circleci/ansible/production.hosts b/.circleci/ansible/production.hosts index f32b57154c..2ed8f517f7 100644 --- a/.circleci/ansible/production.hosts +++ b/.circleci/ansible/production.hosts @@ -15,3 +15,4 @@ console_mgmt_base_url = http://console-release.local bucket_name = zenith-storage-oregon bucket_region = us-west-2 etcd_endpoints = etcd-release.local:2379 +safekeeper_enable_s3_offload = true diff --git a/.circleci/ansible/staging.hosts b/.circleci/ansible/staging.hosts index 71166c531e..3ea815b907 100644 --- a/.circleci/ansible/staging.hosts +++ b/.circleci/ansible/staging.hosts @@ -16,3 +16,4 @@ console_mgmt_base_url = http://console-staging.local bucket_name = zenith-staging-storage-us-east-1 bucket_region = us-east-1 etcd_endpoints = etcd-staging.local:2379 +safekeeper_enable_s3_offload = false diff --git a/.circleci/ansible/systemd/safekeeper.service b/.circleci/ansible/systemd/safekeeper.service index cac38d8756..55088db859 100644 --- a/.circleci/ansible/systemd/safekeeper.service +++ b/.circleci/ansible/systemd/safekeeper.service @@ -6,7 +6,7 @@ After=network.target auditd.service Type=simple User=safekeeper Environment=RUST_BACKTRACE=1 ZENITH_REPO_DIR=/storage/safekeeper/data LD_LIBRARY_PATH=/usr/local/lib -ExecStart=/usr/local/bin/safekeeper -l {{ inventory_hostname }}.local:6500 --listen-http {{ inventory_hostname }}.local:7676 -p {{ first_pageserver }}:6400 -D /storage/safekeeper/data --broker-endpoints={{ etcd_endpoints }} +ExecStart=/usr/local/bin/safekeeper -l {{ inventory_hostname }}.local:6500 --listen-http {{ inventory_hostname }}.local:7676 -p {{ first_pageserver }}:6400 -D /storage/safekeeper/data --broker-endpoints={{ etcd_endpoints }} --enable-s3-offload={{ safekeeper_enable_s3_offload }} ExecReload=/bin/kill -HUP $MAINPID KillMode=mixed KillSignal=SIGINT diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index 7e979840c2..d0df7093ff 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -115,6 +115,14 @@ fn main() -> Result<()> { .takes_value(true) .help("a prefix to always use when polling/pusing data in etcd from this safekeeper"), ) + .arg( + Arg::new("enable-s3-offload") + .long("enable-s3-offload") + .takes_value(true) + .default_value("true") + .default_missing_value("true") + .help("Enable/disable s3 offloading. When disabled, safekeeper removes WAL ignoring s3 WAL horizon."), + ) .get_matches(); if let Some(addr) = arg_matches.value_of("dump-control-file") { @@ -172,6 +180,13 @@ fn main() -> Result<()> { conf.broker_etcd_prefix = prefix.to_string(); } + // Seems like there is no better way to accept bool values explicitly in clap. + conf.s3_offload_enabled = arg_matches + .value_of("enable-s3-offload") + .unwrap() + .parse() + .context("failed to parse bool enable-s3-offload bool")?; + start_safekeeper(conf, given_id, arg_matches.is_present("init")) } diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index f74e5be992..c848de9e71 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -53,6 +53,7 @@ pub struct SafeKeeperConf { pub my_id: ZNodeId, pub broker_endpoints: Option>, pub broker_etcd_prefix: String, + pub s3_offload_enabled: bool, } impl SafeKeeperConf { @@ -79,6 +80,7 @@ impl Default for SafeKeeperConf { my_id: ZNodeId(0), broker_endpoints: None, broker_etcd_prefix: defaults::DEFAULT_NEON_BROKER_PREFIX.to_string(), + s3_offload_enabled: true, } } } diff --git a/safekeeper/src/remove_wal.rs b/safekeeper/src/remove_wal.rs index 9474f65e5f..3278d51bd3 100644 --- a/safekeeper/src/remove_wal.rs +++ b/safekeeper/src/remove_wal.rs @@ -12,7 +12,7 @@ pub fn thread_main(conf: SafeKeeperConf) { let active_tlis = GlobalTimelines::get_active_timelines(); for zttid in &active_tlis { if let Ok(tli) = GlobalTimelines::get(&conf, *zttid, false) { - if let Err(e) = tli.remove_old_wal() { + if let Err(e) = tli.remove_old_wal(conf.s3_offload_enabled) { warn!( "failed to remove WAL for tenant {} timeline {}: {}", tli.zttid.tenant_id, tli.zttid.timeline_id, e diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index b9264565dc..fff1c269b6 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -930,13 +930,18 @@ where /// offloading. /// While it is safe to use inmem values for determining horizon, /// we use persistent to make possible normal states less surprising. - pub fn get_horizon_segno(&self) -> XLogSegNo { + pub fn get_horizon_segno(&self, s3_offload_enabled: bool) -> XLogSegNo { + let s3_offload_horizon = if s3_offload_enabled { + self.state.s3_wal_lsn + } else { + Lsn(u64::MAX) + }; let horizon_lsn = min( min( self.state.remote_consistent_lsn, self.state.peer_horizon_lsn, ), - self.state.s3_wal_lsn, + s3_offload_horizon, ); horizon_lsn.segment_number(self.state.server.wal_seg_size as usize) } diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 140d6660ac..8b1072a54b 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -479,7 +479,7 @@ impl Timeline { shared_state.sk.wal_store.flush_lsn() } - pub fn remove_old_wal(&self) -> Result<()> { + pub fn remove_old_wal(&self, s3_offload_enabled: bool) -> Result<()> { let horizon_segno: XLogSegNo; let remover: Box Result<(), anyhow::Error>>; { @@ -488,7 +488,7 @@ impl Timeline { if shared_state.sk.state.server.wal_seg_size == 0 { return Ok(()); } - horizon_segno = shared_state.sk.get_horizon_segno(); + horizon_segno = shared_state.sk.get_horizon_segno(s3_offload_enabled); remover = shared_state.sk.wal_store.remove_up_to(); if horizon_segno <= 1 || horizon_segno <= shared_state.last_removed_segno { return Ok(());