diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index 7434f921cb..3fea3581a8 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -16,11 +16,11 @@ use url::{ParseError, Url}; use safekeeper::control_file::{self}; use safekeeper::defaults::{DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR}; -use safekeeper::http; -use safekeeper::s3_offload; +use safekeeper::remove_wal; use safekeeper::wal_service; use safekeeper::SafeKeeperConf; use safekeeper::{broker, callmemaybe}; +use safekeeper::{http, s3_offload}; use utils::{ http::endpoint, logging, shutdown::exit_now, signals, tcp_listener, zid::ZNodeId, GIT_VERSION, }; @@ -292,6 +292,15 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option, init: b ); } + let conf_ = conf.clone(); + threads.push( + thread::Builder::new() + .name("WAL removal thread".into()) + .spawn(|| { + remove_wal::thread_main(conf_); + })?, + ); + // TODO: put more thoughts into handling of failed threads // We probably should restart them. diff --git a/safekeeper/src/broker.rs b/safekeeper/src/broker.rs index b84b5cf789..8ce7bdf0e5 100644 --- a/safekeeper/src/broker.rs +++ b/safekeeper/src/broker.rs @@ -32,23 +32,28 @@ const ZENITH_PREFIX: &str = "zenith"; /// Published data about safekeeper. Fields made optional for easy migrations. #[serde_as] -#[derive(Deserialize, Serialize)] +#[derive(Debug, Deserialize, Serialize)] pub struct SafekeeperInfo { /// Term of the last entry. pub last_log_term: Option, /// LSN of the last record. #[serde_as(as = "Option")] + #[serde(default)] pub flush_lsn: Option, /// Up to which LSN safekeeper regards its WAL as committed. #[serde_as(as = "Option")] + #[serde(default)] pub commit_lsn: Option, /// LSN up to which safekeeper offloaded WAL to s3. #[serde_as(as = "Option")] + #[serde(default)] pub s3_wal_lsn: Option, /// LSN of last checkpoint uploaded by pageserver. #[serde_as(as = "Option")] + #[serde(default)] pub remote_consistent_lsn: Option, #[serde_as(as = "Option")] + #[serde(default)] pub peer_horizon_lsn: Option, } diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index 2d22332db9..fab8724430 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -5,6 +5,7 @@ use serde::Serializer; use std::fmt::Display; use std::sync::Arc; +use crate::broker::SafekeeperInfo; use crate::safekeeper::Term; use crate::safekeeper::TermHistory; use crate::timeline::GlobalTimelines; @@ -123,6 +124,20 @@ async fn timeline_create_handler(mut request: Request) -> Result) -> Result, ApiError> { + let zttid = ZTenantTimelineId::new( + parse_request_param(&request, "tenant_id")?, + parse_request_param(&request, "timeline_id")?, + ); + let safekeeper_info: SafekeeperInfo = json_request(&mut request).await?; + + let tli = GlobalTimelines::get(get_conf(&request), zttid, false).map_err(ApiError::from_err)?; + tli.record_safekeeper_info(&safekeeper_info, ZNodeId(1))?; + + json_response(StatusCode::OK, ()) +} + /// Safekeeper http router. pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder { let router = endpoint::make_router(); @@ -134,4 +149,9 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder timeline_status_handler, ) .post("/v1/timeline", timeline_create_handler) + // for tests + .post( + "/v1/record_safekeeper_info/:tenant_id/:timeline_id", + record_safekeeper_info, + ) } diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index 8951e8f680..6509e8166a 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -13,6 +13,7 @@ pub mod handler; pub mod http; pub mod json_ctrl; pub mod receive_wal; +pub mod remove_wal; pub mod s3_offload; pub mod safekeeper; pub mod send_wal; diff --git a/safekeeper/src/remove_wal.rs b/safekeeper/src/remove_wal.rs new file mode 100644 index 0000000000..9474f65e5f --- /dev/null +++ b/safekeeper/src/remove_wal.rs @@ -0,0 +1,25 @@ +//! Thread removing old WAL. + +use std::{thread, time::Duration}; + +use tracing::*; + +use crate::{timeline::GlobalTimelines, SafeKeeperConf}; + +pub fn thread_main(conf: SafeKeeperConf) { + let wal_removal_interval = Duration::from_millis(5000); + loop { + let active_tlis = GlobalTimelines::get_active_timelines(); + for zttid in &active_tlis { + if let Ok(tli) = GlobalTimelines::get(&conf, *zttid, false) { + if let Err(e) = tli.remove_old_wal() { + warn!( + "failed to remove WAL for tenant {} timeline {}: {}", + tli.zttid.tenant_id, tli.zttid.timeline_id, e + ); + } + } + } + thread::sleep(wal_removal_interval) + } +} diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 59174f34a2..048753152b 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -5,6 +5,8 @@ use byteorder::{LittleEndian, ReadBytesExt}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use postgres_ffi::xlog_utils::TimeLineID; + +use postgres_ffi::xlog_utils::XLogSegNo; use serde::{Deserialize, Serialize}; use std::cmp::max; use std::cmp::min; @@ -880,6 +882,24 @@ where } Ok(()) } + + /// Get oldest segno we still need to keep. We hold WAL till it is consumed + /// by all of 1) pageserver (remote_consistent_lsn) 2) peers 3) s3 + /// offloading. + /// While it is safe to use inmem values for determining horizon, + /// we use persistent to make possible normal states less surprising. + pub fn get_horizon_segno(&self) -> XLogSegNo { + let horizon_lsn = min( + min( + self.state.remote_consistent_lsn, + self.state.peer_horizon_lsn, + ), + self.state.s3_wal_lsn, + ); + let res = horizon_lsn.segment_number(self.state.server.wal_seg_size as usize); + info!("horizon is {}, res {}", horizon_lsn, res); + res + } } #[cfg(test)] @@ -935,6 +955,10 @@ mod tests { fn flush_wal(&mut self) -> Result<()> { Ok(()) } + + fn remove_up_to(&self) -> Box Result<()>> { + Box::new(move |_segno_up_to: XLogSegNo| Ok(())) + } } #[test] diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index fbae34251c..4a507015d3 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -4,6 +4,7 @@ use anyhow::{bail, Context, Result}; use lazy_static::lazy_static; +use postgres_ffi::xlog_utils::XLogSegNo; use std::cmp::{max, min}; use std::collections::HashMap; @@ -88,6 +89,7 @@ struct SharedState { active: bool, num_computes: u32, pageserver_connstr: Option, + last_removed_segno: XLogSegNo, } impl SharedState { @@ -109,6 +111,7 @@ impl SharedState { active: false, num_computes: 0, pageserver_connstr: None, + last_removed_segno: 0, }) } @@ -127,6 +130,7 @@ impl SharedState { active: false, num_computes: 0, pageserver_connstr: None, + last_removed_segno: 0, }) } @@ -459,6 +463,26 @@ impl Timeline { let shared_state = self.mutex.lock().unwrap(); shared_state.sk.wal_store.flush_lsn() } + + pub fn remove_old_wal(&self) -> Result<()> { + let horizon_segno: XLogSegNo; + let remover: Box Result<(), anyhow::Error>>; + { + let shared_state = self.mutex.lock().unwrap(); + horizon_segno = shared_state.sk.get_horizon_segno(); + remover = shared_state.sk.wal_store.remove_up_to(); + if horizon_segno <= 1 || horizon_segno <= shared_state.last_removed_segno { + return Ok(()); + } + // release the lock before removing + } + let _enter = + info_span!("", timeline = %self.zttid.tenant_id, tenant = %self.zttid.timeline_id) + .entered(); + remover(horizon_segno - 1)?; + self.mutex.lock().unwrap().last_removed_segno = horizon_segno; + Ok(()) + } } // Utilities needed by various Connection-like objects diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index 69a4fb11e1..503bd7c543 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -11,10 +11,12 @@ use anyhow::{anyhow, bail, Context, Result}; use std::io::{Read, Seek, SeekFrom}; use lazy_static::lazy_static; -use postgres_ffi::xlog_utils::{find_end_of_wal, XLogSegNo, PG_TLI}; +use postgres_ffi::xlog_utils::{ + find_end_of_wal, IsPartialXLogFileName, IsXLogFileName, XLogFromFileName, XLogSegNo, PG_TLI, +}; use std::cmp::min; -use std::fs::{self, File, OpenOptions}; +use std::fs::{self, remove_file, File, OpenOptions}; use std::io::Write; use std::path::{Path, PathBuf}; @@ -101,6 +103,10 @@ pub trait Storage { /// Durably store WAL on disk, up to the last written WAL record. fn flush_wal(&mut self) -> Result<()>; + + /// Remove all segments <= given segno. Returns closure as we want to do + /// that without timeline lock. + fn remove_up_to(&self) -> Box Result<()>>; } /// PhysicalStorage is a storage that stores WAL on disk. Writes are separated from flushes @@ -466,6 +472,44 @@ impl Storage for PhysicalStorage { self.update_flush_lsn(); Ok(()) } + + fn remove_up_to(&self) -> Box Result<()>> { + let timeline_dir = self.timeline_dir.clone(); + let wal_seg_size = self.wal_seg_size.unwrap(); + Box::new(move |segno_up_to: XLogSegNo| { + remove_up_to(&timeline_dir, wal_seg_size, segno_up_to) + }) + } +} + +/// Remove all WAL segments in timeline_dir <= given segno. +fn remove_up_to(timeline_dir: &Path, wal_seg_size: usize, segno_up_to: XLogSegNo) -> Result<()> { + let mut n_removed = 0; + for entry in fs::read_dir(&timeline_dir)? { + let entry = entry?; + let entry_path = entry.path(); + let fname = entry_path.file_name().unwrap(); + + if let Some(fname_str) = fname.to_str() { + /* Ignore files that are not XLOG segments */ + if !IsXLogFileName(fname_str) && !IsPartialXLogFileName(fname_str) { + continue; + } + let (segno, _) = XLogFromFileName(fname_str, wal_seg_size); + if segno <= segno_up_to { + remove_file(entry_path)?; + n_removed += 1; + } + } + } + let segno_from = segno_up_to - n_removed + 1; + info!( + "removed {} WAL segments [{}; {}]", + n_removed, + XLogFileName(PG_TLI, segno_from, wal_seg_size), + XLogFileName(PG_TLI, segno_up_to, wal_seg_size) + ); + Ok(()) } pub struct WalReader { diff --git a/test_runner/batch_others/test_wal_acceptor.py b/test_runner/batch_others/test_wal_acceptor.py index cc9ec9a275..395084af0e 100644 --- a/test_runner/batch_others/test_wal_acceptor.py +++ b/test_runner/batch_others/test_wal_acceptor.py @@ -370,6 +370,55 @@ def test_broker(zenith_env_builder: ZenithEnvBuilder): time.sleep(0.5) +# Test that old WAL consumed by peers and pageserver is removed from safekeepers. +@pytest.mark.skipif(etcd_path() is None, reason="requires etcd which is not present in PATH") +def test_wal_removal(zenith_env_builder: ZenithEnvBuilder): + zenith_env_builder.num_safekeepers = 2 + zenith_env_builder.broker = True + # to advance remote_consistent_llsn + zenith_env_builder.enable_local_fs_remote_storage() + env = zenith_env_builder.init_start() + + env.zenith_cli.create_branch('test_safekeepers_wal_removal') + pg = env.postgres.create_start('test_safekeepers_wal_removal') + + with closing(pg.connect()) as conn: + with conn.cursor() as cur: + # we rely upon autocommit after each statement + # as waiting for acceptors happens there + cur.execute('CREATE TABLE t(key int primary key, value text)') + cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'") + + tenant_id = pg.safe_psql("show zenith.zenith_tenant")[0][0] + timeline_id = pg.safe_psql("show zenith.zenith_timeline")[0][0] + + # force checkpoint to advance remote_consistent_lsn + with closing(env.pageserver.connect()) as psconn: + with psconn.cursor() as pscur: + pscur.execute(f"checkpoint {tenant_id} {timeline_id}") + + # We will wait for first segment removal. Make sure they exist for starter. + first_segments = [ + os.path.join(sk.data_dir(), tenant_id, timeline_id, '000000010000000000000001') + for sk in env.safekeepers + ] + assert all(os.path.exists(p) for p in first_segments) + + http_cli = env.safekeepers[0].http_client() + # Pretend WAL is offloaded to s3. + http_cli.record_safekeeper_info(tenant_id, timeline_id, {'s3_wal_lsn': 'FFFFFFFF/FEFFFFFF'}) + + # wait till first segment is removed on all safekeepers + started_at = time.time() + while True: + if all(not os.path.exists(p) for p in first_segments): + break + elapsed = time.time() - started_at + if elapsed > 20: + raise RuntimeError(f"timed out waiting {elapsed:.0f}s for first segment get removed") + time.sleep(0.5) + + class ProposerPostgres(PgProtocol): """Object for running postgres without ZenithEnv""" def __init__(self, diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index d295a79953..e16d1acf2f 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -1738,6 +1738,9 @@ class Safekeeper: def http_client(self) -> SafekeeperHttpClient: return SafekeeperHttpClient(port=self.port.http) + def data_dir(self) -> str: + return os.path.join(self.env.repo_dir, "safekeepers", f"sk{self.id}") + @dataclass class SafekeeperTimelineStatus: @@ -1770,6 +1773,12 @@ class SafekeeperHttpClient(requests.Session): flush_lsn=resj['flush_lsn'], remote_consistent_lsn=resj['remote_consistent_lsn']) + def record_safekeeper_info(self, tenant_id: str, timeline_id: str, body): + res = self.post( + f"http://localhost:{self.port}/v1/record_safekeeper_info/{tenant_id}/{timeline_id}", + json=body) + res.raise_for_status() + def get_metrics(self) -> SafekeeperMetrics: request_result = self.get(f"http://localhost:{self.port}/metrics") request_result.raise_for_status()