Optionally remove WAL on safekeepers without s3 offloading.

And do that on staging, until offloading is merged.
This commit is contained in:
Arseny Sher
2022-05-10 20:44:56 +04:00
parent 87dfa99734
commit 6cb14b4200
8 changed files with 30 additions and 6 deletions

View File

@@ -15,3 +15,4 @@ console_mgmt_base_url = http://console-release.local
bucket_name = zenith-storage-oregon
bucket_region = us-west-2
etcd_endpoints = etcd-release.local:2379
safekeeper_enable_s3_offload = true

View File

@@ -16,3 +16,4 @@ console_mgmt_base_url = http://console-staging.local
bucket_name = zenith-staging-storage-us-east-1
bucket_region = us-east-1
etcd_endpoints = etcd-staging.local:2379
safekeeper_enable_s3_offload = false

View File

@@ -6,7 +6,7 @@ After=network.target auditd.service
Type=simple
User=safekeeper
Environment=RUST_BACKTRACE=1 ZENITH_REPO_DIR=/storage/safekeeper/data LD_LIBRARY_PATH=/usr/local/lib
ExecStart=/usr/local/bin/safekeeper -l {{ inventory_hostname }}.local:6500 --listen-http {{ inventory_hostname }}.local:7676 -p {{ first_pageserver }}:6400 -D /storage/safekeeper/data --broker-endpoints={{ etcd_endpoints }}
ExecStart=/usr/local/bin/safekeeper -l {{ inventory_hostname }}.local:6500 --listen-http {{ inventory_hostname }}.local:7676 -p {{ first_pageserver }}:6400 -D /storage/safekeeper/data --broker-endpoints={{ etcd_endpoints }} --enable-s3-offload={{ safekeeper_enable_s3_offload }}
ExecReload=/bin/kill -HUP $MAINPID
KillMode=mixed
KillSignal=SIGINT

View File

@@ -115,6 +115,14 @@ fn main() -> Result<()> {
.takes_value(true)
.help("a prefix to always use when polling/pusing data in etcd from this safekeeper"),
)
.arg(
Arg::new("enable-s3-offload")
.long("enable-s3-offload")
.takes_value(true)
.default_value("true")
.default_missing_value("true")
.help("Enable/disable s3 offloading. When disabled, safekeeper removes WAL ignoring s3 WAL horizon."),
)
.get_matches();
if let Some(addr) = arg_matches.value_of("dump-control-file") {
@@ -172,6 +180,13 @@ fn main() -> Result<()> {
conf.broker_etcd_prefix = prefix.to_string();
}
// Seems like there is no better way to accept bool values explicitly in clap.
conf.s3_offload_enabled = arg_matches
.value_of("enable-s3-offload")
.unwrap()
.parse()
.context("failed to parse bool enable-s3-offload bool")?;
start_safekeeper(conf, given_id, arg_matches.is_present("init"))
}

View File

@@ -53,6 +53,7 @@ pub struct SafeKeeperConf {
pub my_id: ZNodeId,
pub broker_endpoints: Option<Vec<Url>>,
pub broker_etcd_prefix: String,
pub s3_offload_enabled: bool,
}
impl SafeKeeperConf {
@@ -79,6 +80,7 @@ impl Default for SafeKeeperConf {
my_id: ZNodeId(0),
broker_endpoints: None,
broker_etcd_prefix: defaults::DEFAULT_NEON_BROKER_PREFIX.to_string(),
s3_offload_enabled: true,
}
}
}

View File

@@ -12,7 +12,7 @@ pub fn thread_main(conf: SafeKeeperConf) {
let active_tlis = GlobalTimelines::get_active_timelines();
for zttid in &active_tlis {
if let Ok(tli) = GlobalTimelines::get(&conf, *zttid, false) {
if let Err(e) = tli.remove_old_wal() {
if let Err(e) = tli.remove_old_wal(conf.s3_offload_enabled) {
warn!(
"failed to remove WAL for tenant {} timeline {}: {}",
tli.zttid.tenant_id, tli.zttid.timeline_id, e

View File

@@ -930,13 +930,18 @@ where
/// offloading.
/// While it is safe to use inmem values for determining horizon,
/// we use persistent to make possible normal states less surprising.
pub fn get_horizon_segno(&self) -> XLogSegNo {
pub fn get_horizon_segno(&self, s3_offload_enabled: bool) -> XLogSegNo {
let s3_offload_horizon = if s3_offload_enabled {
self.state.s3_wal_lsn
} else {
Lsn(u64::MAX)
};
let horizon_lsn = min(
min(
self.state.remote_consistent_lsn,
self.state.peer_horizon_lsn,
),
self.state.s3_wal_lsn,
s3_offload_horizon,
);
horizon_lsn.segment_number(self.state.server.wal_seg_size as usize)
}

View File

@@ -479,7 +479,7 @@ impl Timeline {
shared_state.sk.wal_store.flush_lsn()
}
pub fn remove_old_wal(&self) -> Result<()> {
pub fn remove_old_wal(&self, s3_offload_enabled: bool) -> Result<()> {
let horizon_segno: XLogSegNo;
let remover: Box<dyn Fn(u64) -> Result<(), anyhow::Error>>;
{
@@ -488,7 +488,7 @@ impl Timeline {
if shared_state.sk.state.server.wal_seg_size == 0 {
return Ok(());
}
horizon_segno = shared_state.sk.get_horizon_segno();
horizon_segno = shared_state.sk.get_horizon_segno(s3_offload_enabled);
remover = shared_state.sk.wal_store.remove_up_to();
if horizon_segno <= 1 || horizon_segno <= shared_state.last_removed_segno {
return Ok(());