diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 708662f20f..cb3df0985d 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -121,8 +121,8 @@ impl RemotePath { self.0.file_name() } - pub fn join(&self, segment: &Utf8Path) -> Self { - Self(self.0.join(segment)) + pub fn join(&self, path: impl AsRef) -> Self { + Self(self.0.join(path)) } pub fn get_path(&self) -> &Utf8PathBuf { diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 8790a9b0a8..3960fc1b99 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -311,7 +311,7 @@ impl DeletionList { result.extend( timeline_layers .into_iter() - .map(|l| timeline_remote_path.join(&Utf8PathBuf::from(l))), + .map(|l| timeline_remote_path.join(Utf8PathBuf::from(l))), ); } } diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index aee3898ac7..7476654426 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -29,13 +29,12 @@ use safekeeper::defaults::{ DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES, DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR, }; -use safekeeper::remove_wal; +use safekeeper::http; use safekeeper::wal_service; use safekeeper::GlobalTimelines; use safekeeper::SafeKeeperConf; use safekeeper::{broker, WAL_SERVICE_RUNTIME}; use safekeeper::{control_file, BROKER_RUNTIME}; -use safekeeper::{http, WAL_REMOVER_RUNTIME}; use safekeeper::{wal_backup, HTTP_RUNTIME}; use storage_broker::DEFAULT_ENDPOINT; use utils::auth::{JwtAuth, Scope, SwappableJwtAuth}; @@ -441,14 +440,6 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> { .map(|res| ("broker main".to_owned(), res)); tasks_handles.push(Box::pin(broker_task_handle)); - let conf_ = conf.clone(); - let wal_remover_handle = current_thread_rt - .as_ref() - .unwrap_or_else(|| WAL_REMOVER_RUNTIME.handle()) - .spawn(remove_wal::task_main(conf_)) - .map(|res| ("WAL remover".to_owned(), res)); - tasks_handles.push(Box::pin(wal_remover_handle)); - set_build_info_metric(GIT_VERSION, BUILD_TAG); // TODO: update tokio-stream, convert to real async Stream with diff --git a/safekeeper/src/control_file.rs b/safekeeper/src/control_file.rs index fe9f2e6899..e9bb5202da 100644 --- a/safekeeper/src/control_file.rs +++ b/safekeeper/src/control_file.rs @@ -2,7 +2,7 @@ use anyhow::{bail, ensure, Context, Result}; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; -use camino::Utf8PathBuf; +use camino::{Utf8Path, Utf8PathBuf}; use tokio::fs::File; use tokio::io::AsyncWriteExt; use utils::crashsafe::durable_rename; @@ -12,9 +12,9 @@ use std::ops::Deref; use std::path::Path; use std::time::Instant; -use crate::control_file_upgrade::upgrade_control_file; use crate::metrics::PERSIST_CONTROL_FILE_SECONDS; use crate::state::TimelinePersistentState; +use crate::{control_file_upgrade::upgrade_control_file, timeline::get_timeline_dir}; use utils::{bin_ser::LeSer, id::TenantTimelineId}; use crate::SafeKeeperConf; @@ -43,7 +43,7 @@ pub trait Storage: Deref { pub struct FileStorage { // save timeline dir to avoid reconstructing it every time timeline_dir: Utf8PathBuf, - conf: SafeKeeperConf, + no_sync: bool, /// Last state persisted to disk. state: TimelinePersistentState, @@ -54,13 +54,12 @@ pub struct FileStorage { impl FileStorage { /// Initialize storage by loading state from disk. pub fn restore_new(ttid: &TenantTimelineId, conf: &SafeKeeperConf) -> Result { - let timeline_dir = conf.timeline_dir(ttid); - - let state = Self::load_control_file_conf(conf, ttid)?; + let timeline_dir = get_timeline_dir(conf, ttid); + let state = Self::load_control_file_from_dir(&timeline_dir)?; Ok(FileStorage { timeline_dir, - conf: conf.clone(), + no_sync: conf.no_sync, state, last_persist_at: Instant::now(), }) @@ -74,7 +73,7 @@ impl FileStorage { ) -> Result { let store = FileStorage { timeline_dir, - conf: conf.clone(), + no_sync: conf.no_sync, state, last_persist_at: Instant::now(), }; @@ -102,12 +101,9 @@ impl FileStorage { upgrade_control_file(buf, version) } - /// Load control file for given ttid at path specified by conf. - pub fn load_control_file_conf( - conf: &SafeKeeperConf, - ttid: &TenantTimelineId, - ) -> Result { - let path = conf.timeline_dir(ttid).join(CONTROL_FILE_NAME); + /// Load control file from given directory. + pub fn load_control_file_from_dir(timeline_dir: &Utf8Path) -> Result { + let path = timeline_dir.join(CONTROL_FILE_NAME); Self::load_control_file(path) } @@ -203,7 +199,7 @@ impl Storage for FileStorage { })?; let control_path = self.timeline_dir.join(CONTROL_FILE_NAME); - durable_rename(&control_partial_path, &control_path, !self.conf.no_sync).await?; + durable_rename(&control_partial_path, &control_path, !self.no_sync).await?; // update internal state self.state = s.clone(); @@ -233,12 +229,13 @@ mod test { conf: &SafeKeeperConf, ttid: &TenantTimelineId, ) -> Result<(FileStorage, TimelinePersistentState)> { - fs::create_dir_all(conf.timeline_dir(ttid)) + let timeline_dir = get_timeline_dir(conf, ttid); + fs::create_dir_all(&timeline_dir) .await .expect("failed to create timeline dir"); Ok(( FileStorage::restore_new(ttid, conf)?, - FileStorage::load_control_file_conf(conf, ttid)?, + FileStorage::load_control_file_from_dir(&timeline_dir)?, )) } @@ -246,11 +243,11 @@ mod test { conf: &SafeKeeperConf, ttid: &TenantTimelineId, ) -> Result<(FileStorage, TimelinePersistentState)> { - fs::create_dir_all(conf.timeline_dir(ttid)) + let timeline_dir = get_timeline_dir(conf, ttid); + fs::create_dir_all(&timeline_dir) .await .expect("failed to create timeline dir"); let state = TimelinePersistentState::empty(); - let timeline_dir = conf.timeline_dir(ttid); let storage = FileStorage::create_new(timeline_dir, conf, state.clone())?; Ok((storage, state)) } @@ -291,7 +288,7 @@ mod test { .await .expect("failed to persist state"); } - let control_path = conf.timeline_dir(&ttid).join(CONTROL_FILE_NAME); + let control_path = get_timeline_dir(&conf, &ttid).join(CONTROL_FILE_NAME); let mut data = fs::read(&control_path).await.unwrap(); data[0] += 1; // change the first byte of the file to fail checksum validation fs::write(&control_path, &data) diff --git a/safekeeper/src/copy_timeline.rs b/safekeeper/src/copy_timeline.rs index 3023d4e2cb..51cf4db6b5 100644 --- a/safekeeper/src/copy_timeline.rs +++ b/safekeeper/src/copy_timeline.rs @@ -15,10 +15,10 @@ use crate::{ control_file::{FileStorage, Storage}, pull_timeline::{create_temp_timeline_dir, load_temp_timeline, validate_temp_timeline}, state::TimelinePersistentState, - timeline::{Timeline, TimelineError}, + timeline::{FullAccessTimeline, Timeline, TimelineError}, wal_backup::copy_s3_segments, wal_storage::{wal_file_paths, WalReader}, - GlobalTimelines, SafeKeeperConf, + GlobalTimelines, }; // we don't want to have more than 10 segments on disk after copy, because they take space @@ -46,12 +46,14 @@ pub async fn handle_request(request: Request) -> Result<()> { } } + let source_tli = request.source.full_access_guard().await?; + let conf = &GlobalTimelines::get_global_config(); let ttid = request.destination_ttid; let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?; - let (mem_state, state) = request.source.get_state().await; + let (mem_state, state) = source_tli.get_state().await; let start_lsn = state.timeline_start_lsn; if start_lsn == Lsn::INVALID { bail!("timeline is not initialized"); @@ -60,7 +62,7 @@ pub async fn handle_request(request: Request) -> Result<()> { { let commit_lsn = mem_state.commit_lsn; - let flush_lsn = request.source.get_flush_lsn().await; + let flush_lsn = source_tli.get_flush_lsn().await; info!( "collected info about source timeline: start_lsn={}, backup_lsn={}, commit_lsn={}, flush_lsn={}", @@ -127,10 +129,8 @@ pub async fn handle_request(request: Request) -> Result<()> { .await?; copy_disk_segments( - conf, - &state, + &source_tli, wal_seg_size, - &request.source.ttid, new_backup_lsn, request.until_lsn, &tli_dir_path, @@ -159,21 +159,13 @@ pub async fn handle_request(request: Request) -> Result<()> { } async fn copy_disk_segments( - conf: &SafeKeeperConf, - persisted_state: &TimelinePersistentState, + tli: &FullAccessTimeline, wal_seg_size: usize, - source_ttid: &TenantTimelineId, start_lsn: Lsn, end_lsn: Lsn, tli_dir_path: &Utf8PathBuf, ) -> Result<()> { - let mut wal_reader = WalReader::new( - conf.workdir.clone(), - conf.timeline_dir(source_ttid), - persisted_state, - start_lsn, - true, - )?; + let mut wal_reader = tli.get_walreader(start_lsn).await?; let mut buf = [0u8; MAX_SEND_SIZE]; diff --git a/safekeeper/src/debug_dump.rs b/safekeeper/src/debug_dump.rs index b50f2e1158..062ff4b3db 100644 --- a/safekeeper/src/debug_dump.rs +++ b/safekeeper/src/debug_dump.rs @@ -10,6 +10,7 @@ use std::sync::Arc; use anyhow::bail; use anyhow::Result; use camino::Utf8Path; +use camino::Utf8PathBuf; use chrono::{DateTime, Utc}; use postgres_ffi::XLogSegNo; use postgres_ffi::MAX_SEND_SIZE; @@ -26,7 +27,8 @@ use crate::safekeeper::TermHistory; use crate::send_wal::WalSenderState; use crate::state::TimelineMemState; use crate::state::TimelinePersistentState; -use crate::wal_storage::WalReader; +use crate::timeline::get_timeline_dir; +use crate::timeline::FullAccessTimeline; use crate::GlobalTimelines; use crate::SafeKeeperConf; @@ -68,6 +70,7 @@ pub struct Response { pub struct TimelineDumpSer { pub tli: Arc, pub args: Args, + pub timeline_dir: Utf8PathBuf, pub runtime: Arc, } @@ -85,14 +88,20 @@ impl Serialize for TimelineDumpSer { where S: serde::Serializer, { - let dump = self - .runtime - .block_on(build_from_tli_dump(self.tli.clone(), self.args.clone())); + let dump = self.runtime.block_on(build_from_tli_dump( + &self.tli, + &self.args, + &self.timeline_dir, + )); dump.serialize(serializer) } } -async fn build_from_tli_dump(timeline: Arc, args: Args) -> Timeline { +async fn build_from_tli_dump( + timeline: &Arc, + args: &Args, + timeline_dir: &Utf8Path, +) -> Timeline { let control_file = if args.dump_control_file { let mut state = timeline.get_state().await.1; if !args.dump_term_history { @@ -112,7 +121,8 @@ async fn build_from_tli_dump(timeline: Arc, args: Arg let disk_content = if args.dump_disk_content { // build_disk_content can fail, but we don't want to fail the whole // request because of that. - build_disk_content(&timeline.timeline_dir).ok() + // Note: timeline can be in offloaded state, this is not a problem. + build_disk_content(timeline_dir).ok() } else { None }; @@ -186,6 +196,7 @@ pub struct FileInfo { pub async fn build(args: Args) -> Result { let start_time = Utc::now(); let timelines_count = GlobalTimelines::timelines_count(); + let config = GlobalTimelines::get_global_config(); let ptrs_snapshot = if args.tenant_id.is_some() && args.timeline_id.is_some() { // If both tenant_id and timeline_id are specified, we can just get the @@ -223,12 +234,11 @@ pub async fn build(args: Args) -> Result { timelines.push(TimelineDumpSer { tli, args: args.clone(), + timeline_dir: get_timeline_dir(&config, &ttid), runtime: runtime.clone(), }); } - let config = GlobalTimelines::get_global_config(); - Ok(Response { start_time, finish_time: Utc::now(), @@ -316,27 +326,19 @@ pub struct TimelineDigest { } pub async fn calculate_digest( - tli: &Arc, + tli: &FullAccessTimeline, request: TimelineDigestRequest, ) -> Result { if request.from_lsn > request.until_lsn { bail!("from_lsn is greater than until_lsn"); } - let conf = GlobalTimelines::get_global_config(); let (_, persisted_state) = tli.get_state().await; - if persisted_state.timeline_start_lsn > request.from_lsn { bail!("requested LSN is before the start of the timeline"); } - let mut wal_reader = WalReader::new( - conf.workdir.clone(), - tli.timeline_dir.clone(), - &persisted_state, - request.from_lsn, - true, - )?; + let mut wal_reader = tli.get_walreader(request.from_lsn).await?; let mut hasher = Sha256::new(); let mut buf = [0u8; MAX_SEND_SIZE]; diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index 593e102e35..1e29b21fac 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -249,6 +249,10 @@ async fn timeline_digest_handler(request: Request) -> Result) -> Result let filename: String = parse_request_param(&request, "filename")?; let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?; + let tli = tli + .full_access_guard() + .await + .map_err(ApiError::InternalServerError)?; - let filepath = tli.timeline_dir.join(filename); + let filepath = tli.get_timeline_dir().join(filename); let mut file = File::open(&filepath) .await .map_err(|e| ApiError::InternalServerError(e.into()))?; @@ -287,7 +295,7 @@ async fn timeline_files_handler(request: Request) -> Result .map_err(|e| ApiError::InternalServerError(e.into())) } -/// Force persist control file and remove old WAL. +/// Force persist control file. async fn timeline_checkpoint_handler(request: Request) -> Result, ApiError> { check_permission(&request, None)?; @@ -297,13 +305,13 @@ async fn timeline_checkpoint_handler(request: Request) -> Result( async fn prepare_safekeeper( ttid: TenantTimelineId, pg_version: u32, -) -> anyhow::Result> { - GlobalTimelines::create( +) -> anyhow::Result { + let tli = GlobalTimelines::create( ttid, ServerInfo { pg_version, @@ -115,10 +113,16 @@ async fn prepare_safekeeper( Lsn::INVALID, Lsn::INVALID, ) - .await + .await?; + + tli.full_access_guard().await } -async fn send_proposer_elected(tli: &Arc, term: Term, lsn: Lsn) -> anyhow::Result<()> { +async fn send_proposer_elected( + tli: &FullAccessTimeline, + term: Term, + lsn: Lsn, +) -> anyhow::Result<()> { // add new term to existing history let history = tli.get_state().await.1.acceptor_state.term_history; let history = history.up_to(lsn.checked_sub(1u64).unwrap()); @@ -147,7 +151,7 @@ pub struct InsertedWAL { /// Extend local WAL with new LogicalMessage record. To do that, /// create AppendRequest with new WAL and pass it to safekeeper. pub async fn append_logical_message( - tli: &Arc, + tli: &FullAccessTimeline, msg: &AppendLogicalMessage, ) -> anyhow::Result { let wal_data = encode_logical_message(&msg.lm_prefix, &msg.lm_message); diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index 8d8d2cf23e..1a56ff736c 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -7,10 +7,7 @@ use tokio::runtime::Runtime; use std::time::Duration; use storage_broker::Uri; -use utils::{ - auth::SwappableJwtAuth, - id::{NodeId, TenantId, TenantTimelineId}, -}; +use utils::{auth::SwappableJwtAuth, id::NodeId}; mod auth; pub mod broker; @@ -89,15 +86,6 @@ pub struct SafeKeeperConf { } impl SafeKeeperConf { - pub fn tenant_dir(&self, tenant_id: &TenantId) -> Utf8PathBuf { - self.workdir.join(tenant_id.to_string()) - } - - pub fn timeline_dir(&self, ttid: &TenantTimelineId) -> Utf8PathBuf { - self.tenant_dir(&ttid.tenant_id) - .join(ttid.timeline_id.to_string()) - } - pub fn is_wal_backup_enabled(&self) -> bool { self.remote_storage.is_some() && self.wal_backup_enabled } diff --git a/safekeeper/src/pull_timeline.rs b/safekeeper/src/pull_timeline.rs index f7cc40f58a..7b41c98cb8 100644 --- a/safekeeper/src/pull_timeline.rs +++ b/safekeeper/src/pull_timeline.rs @@ -17,7 +17,7 @@ use utils::{ use crate::{ control_file, debug_dump, http::routes::TimelineStatus, - timeline::{Timeline, TimelineError}, + timeline::{get_tenant_dir, get_timeline_dir, Timeline, TimelineError}, wal_storage::{self, Storage}, GlobalTimelines, SafeKeeperConf, }; @@ -283,13 +283,13 @@ pub async fn load_temp_timeline( } // Move timeline dir to the correct location - let timeline_path = conf.timeline_dir(&ttid); + let timeline_path = get_timeline_dir(conf, &ttid); info!( "moving timeline {} from {} to {}", ttid, tmp_path, timeline_path ); - tokio::fs::create_dir_all(conf.tenant_dir(&ttid.tenant_id)).await?; + tokio::fs::create_dir_all(get_tenant_dir(conf, &ttid.tenant_id)).await?; tokio::fs::rename(tmp_path, &timeline_path).await?; let tli = GlobalTimelines::load_timeline(&guard, ttid) diff --git a/safekeeper/src/receive_wal.rs b/safekeeper/src/receive_wal.rs index 03cfa882c4..7943a2fd86 100644 --- a/safekeeper/src/receive_wal.rs +++ b/safekeeper/src/receive_wal.rs @@ -6,7 +6,7 @@ use crate::handler::SafekeeperPostgresHandler; use crate::safekeeper::AcceptorProposerMessage; use crate::safekeeper::ProposerAcceptorMessage; use crate::safekeeper::ServerInfo; -use crate::timeline::Timeline; +use crate::timeline::FullAccessTimeline; use crate::wal_service::ConnectionId; use crate::GlobalTimelines; use anyhow::{anyhow, Context}; @@ -213,7 +213,7 @@ impl SafekeeperPostgresHandler { &mut self, pgb: &mut PostgresBackend, ) -> Result<(), QueryError> { - let mut tli: Option> = None; + let mut tli: Option = None; if let Err(end) = self.handle_start_wal_push_guts(pgb, &mut tli).await { // Log the result and probably send it to the client, closing the stream. let handle_end_fut = pgb.handle_copy_stream_end(end); @@ -233,7 +233,7 @@ impl SafekeeperPostgresHandler { pub async fn handle_start_wal_push_guts( &mut self, pgb: &mut PostgresBackend, - tli: &mut Option>, + tli: &mut Option, ) -> Result<(), CopyStreamHandlerEnd> { // Notify the libpq client that it's allowed to send `CopyData` messages pgb.write_message(&BeMessage::CopyBothResponse).await?; @@ -323,7 +323,7 @@ struct NetworkReader<'a, IO> { impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> { async fn read_first_message( &mut self, - ) -> Result<(Arc, ProposerAcceptorMessage), CopyStreamHandlerEnd> { + ) -> Result<(FullAccessTimeline, ProposerAcceptorMessage), CopyStreamHandlerEnd> { // Receive information about server to create timeline, if not yet. let next_msg = read_message(self.pgb_reader).await?; let tli = match next_msg { @@ -337,7 +337,10 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> { system_id: greeting.system_id, wal_seg_size: greeting.wal_seg_size, }; - GlobalTimelines::create(self.ttid, server_info, Lsn::INVALID, Lsn::INVALID).await? + let tli = + GlobalTimelines::create(self.ttid, server_info, Lsn::INVALID, Lsn::INVALID) + .await?; + tli.full_access_guard().await? } _ => { return Err(CopyStreamHandlerEnd::Other(anyhow::anyhow!( @@ -353,7 +356,7 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> { msg_tx: Sender, msg_rx: Receiver, reply_tx: Sender, - tli: Arc, + tli: FullAccessTimeline, next_msg: ProposerAcceptorMessage, ) -> Result<(), CopyStreamHandlerEnd> { *self.acceptor_handle = Some(WalAcceptor::spawn( @@ -448,7 +451,7 @@ const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1); /// replies to reply_tx; reading from socket and writing to disk in parallel is /// beneficial for performance, this struct provides writing to disk part. pub struct WalAcceptor { - tli: Arc, + tli: FullAccessTimeline, msg_rx: Receiver, reply_tx: Sender, conn_id: Option, @@ -461,7 +464,7 @@ impl WalAcceptor { /// /// conn_id None means WalAcceptor is used by recovery initiated at this safekeeper. pub fn spawn( - tli: Arc, + tli: FullAccessTimeline, msg_rx: Receiver, reply_tx: Sender, conn_id: Option, diff --git a/safekeeper/src/recovery.rs b/safekeeper/src/recovery.rs index 568a512c4a..80a630b1e1 100644 --- a/safekeeper/src/recovery.rs +++ b/safekeeper/src/recovery.rs @@ -2,7 +2,7 @@ //! provide it, i.e. safekeeper lags too much. use std::time::SystemTime; -use std::{fmt, pin::pin, sync::Arc}; +use std::{fmt, pin::pin}; use anyhow::{bail, Context}; use futures::StreamExt; @@ -21,6 +21,7 @@ use utils::{id::NodeId, lsn::Lsn, postgres_client::wal_stream_connection_config} use crate::receive_wal::{WalAcceptor, REPLY_QUEUE_SIZE}; use crate::safekeeper::{AppendRequest, AppendRequestHeader}; +use crate::timeline::FullAccessTimeline; use crate::{ http::routes::TimelineStatus, receive_wal::MSG_QUEUE_SIZE, @@ -28,14 +29,14 @@ use crate::{ AcceptorProposerMessage, ProposerAcceptorMessage, ProposerElected, Term, TermHistory, TermLsn, VoteRequest, }, - timeline::{PeerInfo, Timeline}, + timeline::PeerInfo, SafeKeeperConf, }; /// Entrypoint for per timeline task which always runs, checking whether /// recovery for this safekeeper is needed and starting it if so. #[instrument(name = "recovery task", skip_all, fields(ttid = %tli.ttid))] -pub async fn recovery_main(tli: Arc, conf: SafeKeeperConf) { +pub async fn recovery_main(tli: FullAccessTimeline, conf: SafeKeeperConf) { info!("started"); let cancel = tli.cancel.clone(); @@ -47,6 +48,87 @@ pub async fn recovery_main(tli: Arc, conf: SafeKeeperConf) { } } +/// Should we start fetching WAL from a peer safekeeper, and if yes, from +/// which? Answer is yes, i.e. .donors is not empty if 1) there is something +/// to fetch, and we can do that without running elections; 2) there is no +/// actively streaming compute, as we don't want to compete with it. +/// +/// If donor(s) are choosen, theirs last_log_term is guaranteed to be equal +/// to its last_log_term so we are sure such a leader ever had been elected. +/// +/// All possible donors are returned so that we could keep connection to the +/// current one if it is good even if it slightly lags behind. +/// +/// Note that term conditions above might be not met, but safekeepers are +/// still not aligned on last flush_lsn. Generally in this case until +/// elections are run it is not possible to say which safekeeper should +/// recover from which one -- history which would be committed is different +/// depending on assembled quorum (e.g. classic picture 8 from Raft paper). +/// Thus we don't try to predict it here. +async fn recovery_needed( + tli: &FullAccessTimeline, + heartbeat_timeout: Duration, +) -> RecoveryNeededInfo { + let ss = tli.read_shared_state().await; + let term = ss.sk.state.acceptor_state.term; + let last_log_term = ss.sk.get_last_log_term(); + let flush_lsn = ss.sk.flush_lsn(); + // note that peers contain myself, but that's ok -- we are interested only in peers which are strictly ahead of us. + let mut peers = ss.get_peers(heartbeat_timeout); + // Sort by pairs. + peers.sort_by(|p1, p2| { + let tl1 = TermLsn { + term: p1.last_log_term, + lsn: p1.flush_lsn, + }; + let tl2 = TermLsn { + term: p2.last_log_term, + lsn: p2.flush_lsn, + }; + tl2.cmp(&tl1) // desc + }); + let num_streaming_computes = tli.get_walreceivers().get_num_streaming(); + let donors = if num_streaming_computes > 0 { + vec![] // If there is a streaming compute, don't try to recover to not intervene. + } else { + peers + .iter() + .filter_map(|candidate| { + // Are we interested in this candidate? + let candidate_tl = TermLsn { + term: candidate.last_log_term, + lsn: candidate.flush_lsn, + }; + let my_tl = TermLsn { + term: last_log_term, + lsn: flush_lsn, + }; + if my_tl < candidate_tl { + // Yes, we are interested. Can we pull from it without + // (re)running elections? It is possible if 1) his term + // is equal to his last_log_term so we could act on + // behalf of leader of this term (we must be sure he was + // ever elected) and 2) our term is not higher, or we'll refuse data. + if candidate.term == candidate.last_log_term && candidate.term >= term { + Some(Donor::from(candidate)) + } else { + None + } + } else { + None + } + }) + .collect() + }; + RecoveryNeededInfo { + term, + last_log_term, + flush_lsn, + peers, + num_streaming_computes, + donors, + } +} /// Result of Timeline::recovery_needed, contains donor(s) if recovery needed and /// fields to explain the choice. #[derive(Debug)] @@ -113,10 +195,10 @@ impl From<&PeerInfo> for Donor { const CHECK_INTERVAL_MS: u64 = 2000; /// Check regularly whether we need to start recovery. -async fn recovery_main_loop(tli: Arc, conf: SafeKeeperConf) { +async fn recovery_main_loop(tli: FullAccessTimeline, conf: SafeKeeperConf) { let check_duration = Duration::from_millis(CHECK_INTERVAL_MS); loop { - let recovery_needed_info = tli.recovery_needed(conf.heartbeat_timeout).await; + let recovery_needed_info = recovery_needed(&tli, conf.heartbeat_timeout).await; match recovery_needed_info.donors.first() { Some(donor) => { info!( @@ -146,7 +228,7 @@ async fn recovery_main_loop(tli: Arc, conf: SafeKeeperConf) { /// Recover from the specified donor. Returns message explaining normal finish /// reason or error. async fn recover( - tli: Arc, + tli: FullAccessTimeline, donor: &Donor, conf: &SafeKeeperConf, ) -> anyhow::Result { @@ -232,7 +314,7 @@ async fn recover( // Pull WAL from donor, assuming handshake is already done. async fn recovery_stream( - tli: Arc, + tli: FullAccessTimeline, donor: &Donor, start_streaming_at: Lsn, conf: &SafeKeeperConf, @@ -316,7 +398,7 @@ async fn network_io( physical_stream: ReplicationStream, msg_tx: Sender, donor: Donor, - tli: Arc, + tli: FullAccessTimeline, conf: SafeKeeperConf, ) -> anyhow::Result> { let mut physical_stream = pin!(physical_stream); @@ -365,7 +447,7 @@ async fn network_io( } ReplicationMessage::PrimaryKeepAlive(_) => { // keepalive means nothing is being streamed for a while. Check whether we need to stop. - let recovery_needed_info = tli.recovery_needed(conf.heartbeat_timeout).await; + let recovery_needed_info = recovery_needed(&tli, conf.heartbeat_timeout).await; // do current donors still contain one we currently connected to? if !recovery_needed_info .donors diff --git a/safekeeper/src/remove_wal.rs b/safekeeper/src/remove_wal.rs index 3400eee9b7..b661e48cb5 100644 --- a/safekeeper/src/remove_wal.rs +++ b/safekeeper/src/remove_wal.rs @@ -1,41 +1,25 @@ -//! Thread removing old WAL. +use utils::lsn::Lsn; -use std::time::Duration; +use crate::timeline_manager::StateSnapshot; -use tokio::time::sleep; -use tracing::*; +/// Get oldest LSN we still need to keep. We hold WAL till it is consumed +/// by all of 1) pageserver (remote_consistent_lsn) 2) peers 3) s3 +/// offloading. +/// While it is safe to use inmem values for determining horizon, +/// we use persistent to make possible normal states less surprising. +/// All segments covering LSNs before horizon_lsn can be removed. +pub fn calc_horizon_lsn(state: &StateSnapshot, extra_horizon_lsn: Option) -> Lsn { + use std::cmp::min; -use crate::{GlobalTimelines, SafeKeeperConf}; - -pub async fn task_main(_conf: SafeKeeperConf) -> anyhow::Result<()> { - let wal_removal_interval = Duration::from_millis(5000); - loop { - let now = tokio::time::Instant::now(); - let tlis = GlobalTimelines::get_all(); - for tli in &tlis { - let ttid = tli.ttid; - async { - if let Err(e) = tli.maybe_persist_control_file(false).await { - warn!("failed to persist control file: {e}"); - } - if let Err(e) = tli.remove_old_wal().await { - error!("failed to remove WAL: {}", e); - } - } - .instrument(info_span!("WAL removal", ttid = %ttid)) - .await; - } - - let elapsed = now.elapsed(); - let total_timelines = tlis.len(); - - if elapsed > wal_removal_interval { - info!( - "WAL removal is too long, processed {} timelines in {:?}", - total_timelines, elapsed - ); - } - - sleep(wal_removal_interval).await; + let mut horizon_lsn = min( + state.cfile_remote_consistent_lsn, + state.cfile_peer_horizon_lsn, + ); + // we don't want to remove WAL that is not yet offloaded to s3 + horizon_lsn = min(horizon_lsn, state.cfile_backup_lsn); + if let Some(extra_horizon_lsn) = extra_horizon_lsn { + horizon_lsn = min(horizon_lsn, extra_horizon_lsn); } + + horizon_lsn } diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 4686c9aa8e..563dbbe315 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -10,7 +10,6 @@ use std::cmp::max; use std::cmp::min; use std::fmt; use std::io::Read; -use std::time::Duration; use storage_broker::proto::SafekeeperTimelineInfo; use tracing::*; @@ -828,24 +827,6 @@ where Ok(()) } - /// Persist control file if there is something to save and enough time - /// passed after the last save. - pub async fn maybe_persist_inmem_control_file(&mut self, force: bool) -> Result { - const CF_SAVE_INTERVAL: Duration = Duration::from_secs(300); - if !force && self.state.pers.last_persist_at().elapsed() < CF_SAVE_INTERVAL { - return Ok(false); - } - let need_persist = self.state.inmem.commit_lsn > self.state.commit_lsn - || self.state.inmem.backup_lsn > self.state.backup_lsn - || self.state.inmem.peer_horizon_lsn > self.state.peer_horizon_lsn - || self.state.inmem.remote_consistent_lsn > self.state.remote_consistent_lsn; - if need_persist { - self.state.flush().await?; - trace!("saved control file: {CF_SAVE_INTERVAL:?} passed"); - } - Ok(need_persist) - } - /// Handle request to append WAL. #[allow(clippy::comparison_chain)] async fn handle_append_request( diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 5a9745e1c9..df75893838 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -5,7 +5,7 @@ use crate::handler::SafekeeperPostgresHandler; use crate::metrics::RECEIVED_PS_FEEDBACKS; use crate::receive_wal::WalReceivers; use crate::safekeeper::{Term, TermLsn}; -use crate::timeline::Timeline; +use crate::timeline::FullAccessTimeline; use crate::wal_service::ConnectionId; use crate::wal_storage::WalReader; use crate::GlobalTimelines; @@ -387,8 +387,10 @@ impl SafekeeperPostgresHandler { term: Option, ) -> Result<(), QueryError> { let tli = GlobalTimelines::get(self.ttid).map_err(|e| QueryError::Other(e.into()))?; + let full_access = tli.full_access_guard().await?; + if let Err(end) = self - .handle_start_replication_guts(pgb, start_pos, term, tli.clone()) + .handle_start_replication_guts(pgb, start_pos, term, full_access) .await { let info = tli.get_safekeeper_info(&self.conf).await; @@ -405,7 +407,7 @@ impl SafekeeperPostgresHandler { pgb: &mut PostgresBackend, start_pos: Lsn, term: Option, - tli: Arc, + tli: FullAccessTimeline, ) -> Result<(), CopyStreamHandlerEnd> { let appname = self.appname.clone(); @@ -448,14 +450,7 @@ impl SafekeeperPostgresHandler { // switch to copy pgb.write_message(&BeMessage::CopyBothResponse).await?; - let (_, persisted_state) = tli.get_state().await; - let wal_reader = WalReader::new( - self.conf.workdir.clone(), - self.conf.timeline_dir(&tli.ttid), - &persisted_state, - start_pos, - self.conf.is_wal_backup_enabled(), - )?; + let wal_reader = tli.get_walreader(start_pos).await?; // Split to concurrently receive and send data; replies are generally // not synchronized with sends, so this avoids deadlocks. @@ -532,7 +527,7 @@ impl EndWatch { /// A half driving sending WAL. struct WalSender<'a, IO> { pgb: &'a mut PostgresBackend, - tli: Arc, + tli: FullAccessTimeline, appname: Option, // Position since which we are sending next chunk. start_pos: Lsn, @@ -741,7 +736,7 @@ impl WalSender<'_, IO> { struct ReplyReader { reader: PostgresBackendReader, ws_guard: Arc, - tli: Arc, + tli: FullAccessTimeline, } impl ReplyReader { diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index aa9ccfc21e..148a7e90bd 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -3,14 +3,14 @@ use anyhow::{anyhow, bail, Result}; use camino::Utf8PathBuf; -use postgres_ffi::XLogSegNo; use serde::{Deserialize, Serialize}; use tokio::fs; use tokio_util::sync::CancellationToken; +use utils::id::TenantId; use std::cmp::max; use std::ops::{Deref, DerefMut}; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::Arc; use std::time::Duration; use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; @@ -26,7 +26,6 @@ use storage_broker::proto::SafekeeperTimelineInfo; use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId; use crate::receive_wal::WalReceivers; -use crate::recovery::{recovery_main, Donor, RecoveryNeededInfo}; use crate::safekeeper::{ AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, ServerInfo, Term, TermLsn, INVALID_TERM, @@ -38,8 +37,8 @@ use crate::wal_backup::{self}; use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION}; use crate::metrics::FullTimelineInfo; -use crate::wal_storage::Storage as wal_storage_iface; -use crate::{debug_dump, timeline_manager, wal_backup_partial, wal_storage}; +use crate::wal_storage::{Storage as wal_storage_iface, WalReader}; +use crate::{debug_dump, timeline_manager, wal_storage}; use crate::{GlobalTimelines, SafeKeeperConf}; /// Things safekeeper should know about timeline state on peers. @@ -169,7 +168,6 @@ pub struct SharedState { pub(crate) sk: SafeKeeper, /// In memory list containing state of peers sent in latest messages from them. pub(crate) peers_info: PeersInfo, - pub(crate) last_removed_segno: XLogSegNo, } impl SharedState { @@ -197,33 +195,33 @@ impl SharedState { // We don't want to write anything to disk, because we may have existing timeline there. // These functions should not change anything on disk. - let timeline_dir = conf.timeline_dir(ttid); - let control_store = control_file::FileStorage::create_new(timeline_dir, conf, state)?; + let timeline_dir = get_timeline_dir(conf, ttid); + let control_store = + control_file::FileStorage::create_new(timeline_dir.clone(), conf, state)?; let wal_store = - wal_storage::PhysicalStorage::new(ttid, conf.timeline_dir(ttid), conf, &control_store)?; + wal_storage::PhysicalStorage::new(ttid, timeline_dir, conf, &control_store)?; let sk = SafeKeeper::new(control_store, wal_store, conf.my_id)?; Ok(Self { sk, peers_info: PeersInfo(vec![]), - last_removed_segno: 0, }) } /// Restore SharedState from control file. If file doesn't exist, bails out. fn restore(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Result { + let timeline_dir = get_timeline_dir(conf, ttid); let control_store = control_file::FileStorage::restore_new(ttid, conf)?; if control_store.server.wal_seg_size == 0 { bail!(TimelineError::UninitializedWalSegSize(*ttid)); } let wal_store = - wal_storage::PhysicalStorage::new(ttid, conf.timeline_dir(ttid), conf, &control_store)?; + wal_storage::PhysicalStorage::new(ttid, timeline_dir, conf, &control_store)?; Ok(Self { sk: SafeKeeper::new(control_store, wal_store, conf.my_id)?, peers_info: PeersInfo(vec![]), - last_removed_segno: 0, }) } @@ -275,24 +273,6 @@ impl SharedState { .cloned() .collect() } - - /// Get oldest segno we still need to keep. We hold WAL till it is consumed - /// by all of 1) pageserver (remote_consistent_lsn) 2) peers 3) s3 - /// offloading. - /// While it is safe to use inmem values for determining horizon, - /// we use persistent to make possible normal states less surprising. - fn get_horizon_segno(&self, extra_horizon_lsn: Option) -> XLogSegNo { - let state = &self.sk.state; - - use std::cmp::min; - let mut horizon_lsn = min(state.remote_consistent_lsn, state.peer_horizon_lsn); - // we don't want to remove WAL that is not yet offloaded to s3 - horizon_lsn = min(horizon_lsn, state.backup_lsn); - if let Some(extra_horizon_lsn) = extra_horizon_lsn { - horizon_lsn = min(horizon_lsn, extra_horizon_lsn); - } - horizon_lsn.segment_number(state.server.wal_seg_size as usize) - } } #[derive(Debug, thiserror::Error)] @@ -349,22 +329,15 @@ pub struct Timeline { mutex: RwLock, walsenders: Arc, walreceivers: Arc, + timeline_dir: Utf8PathBuf, /// Delete/cancel will trigger this, background tasks should drop out as soon as it fires pub(crate) cancel: CancellationToken, - /// Directory where timeline state is stored. - pub timeline_dir: Utf8PathBuf, - - /// Should we keep WAL on disk for active replication connections. - /// Especially useful for sharding, when different shards process WAL - /// with different speed. - // TODO: add `Arc` here instead of adding each field separately. - walsenders_keep_horizon: bool, - // timeline_manager controlled state pub(crate) broker_active: AtomicBool, pub(crate) wal_backup_active: AtomicBool, + pub(crate) last_removed_segno: AtomicU64, } impl Timeline { @@ -394,10 +367,10 @@ impl Timeline { walsenders: WalSenders::new(walreceivers.clone()), walreceivers, cancel: CancellationToken::default(), - timeline_dir: conf.timeline_dir(&ttid), - walsenders_keep_horizon: conf.walsenders_keep_horizon, + timeline_dir: get_timeline_dir(conf, &ttid), broker_active: AtomicBool::new(false), wal_backup_active: AtomicBool::new(false), + last_removed_segno: AtomicU64::new(0), }) } @@ -430,10 +403,10 @@ impl Timeline { walsenders: WalSenders::new(walreceivers.clone()), walreceivers, cancel: CancellationToken::default(), - timeline_dir: conf.timeline_dir(&ttid), - walsenders_keep_horizon: conf.walsenders_keep_horizon, + timeline_dir: get_timeline_dir(conf, &ttid), broker_active: AtomicBool::new(false), wal_backup_active: AtomicBool::new(false), + last_removed_segno: AtomicU64::new(0), }) } @@ -494,15 +467,6 @@ impl Timeline { conf.clone(), broker_active_set, )); - - // Start recovery task which always runs on the timeline. - if conf.peer_recovery_enabled { - tokio::spawn(recovery_main(self.clone(), conf.clone())); - } - // TODO: migrate to timeline_manager - if conf.is_wal_backup_enabled() && conf.partial_backup_enabled { - tokio::spawn(wal_backup_partial::main_task(self.clone(), conf.clone())); - } } /// Delete timeline from disk completely, by removing timeline directory. @@ -555,36 +519,6 @@ impl Timeline { self.mutex.read().await } - /// Returns true if walsender should stop sending WAL to pageserver. We - /// terminate it if remote_consistent_lsn reached commit_lsn and there is no - /// computes. While there might be nothing to stream already, we learn about - /// remote_consistent_lsn update through replication feedback, and we want - /// to stop pushing to the broker if pageserver is fully caughtup. - pub async fn should_walsender_stop(&self, reported_remote_consistent_lsn: Lsn) -> bool { - if self.is_cancelled() { - return true; - } - let shared_state = self.read_shared_state().await; - if self.walreceivers.get_num() == 0 { - return shared_state.sk.state.inmem.commit_lsn == Lsn(0) || // no data at all yet - reported_remote_consistent_lsn >= shared_state.sk.state.inmem.commit_lsn; - } - false - } - - /// Ensure that current term is t, erroring otherwise, and lock the state. - pub async fn acquire_term(&self, t: Term) -> Result { - let ss = self.read_shared_state().await; - if ss.sk.state.acceptor_state.term != t { - bail!( - "failed to acquire term {}, current term {}", - t, - ss.sk.state.acceptor_state.term - ); - } - Ok(ss) - } - /// Returns commit_lsn watch channel. pub fn get_commit_lsn_watch_rx(&self) -> watch::Receiver { self.commit_lsn_watch_rx.clone() @@ -600,28 +534,6 @@ impl Timeline { self.shared_state_version_rx.clone() } - /// Pass arrived message to the safekeeper. - pub async fn process_msg( - self: &Arc, - msg: &ProposerAcceptorMessage, - ) -> Result> { - if self.is_cancelled() { - bail!(TimelineError::Cancelled(self.ttid)); - } - - let mut rmsg: Option; - { - let mut shared_state = self.write_shared_state().await; - rmsg = shared_state.sk.process_msg(msg).await?; - - // if this is AppendResponse, fill in proper hot standby feedback. - if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg { - resp.hs_feedback = self.walsenders.get_hotstandby().hs_feedback; - } - } - Ok(rmsg) - } - /// Returns wal_seg_size. pub async fn get_wal_seg_size(&self) -> usize { self.read_shared_state().await.get_wal_seg_size() @@ -672,97 +584,11 @@ impl Timeline { Ok(()) } - /// Update in memory remote consistent lsn. - pub async fn update_remote_consistent_lsn(self: &Arc, candidate: Lsn) { - let mut shared_state = self.write_shared_state().await; - shared_state.sk.state.inmem.remote_consistent_lsn = - max(shared_state.sk.state.inmem.remote_consistent_lsn, candidate); - } - pub async fn get_peers(&self, conf: &SafeKeeperConf) -> Vec { let shared_state = self.read_shared_state().await; shared_state.get_peers(conf.heartbeat_timeout) } - /// Should we start fetching WAL from a peer safekeeper, and if yes, from - /// which? Answer is yes, i.e. .donors is not empty if 1) there is something - /// to fetch, and we can do that without running elections; 2) there is no - /// actively streaming compute, as we don't want to compete with it. - /// - /// If donor(s) are choosen, theirs last_log_term is guaranteed to be equal - /// to its last_log_term so we are sure such a leader ever had been elected. - /// - /// All possible donors are returned so that we could keep connection to the - /// current one if it is good even if it slightly lags behind. - /// - /// Note that term conditions above might be not met, but safekeepers are - /// still not aligned on last flush_lsn. Generally in this case until - /// elections are run it is not possible to say which safekeeper should - /// recover from which one -- history which would be committed is different - /// depending on assembled quorum (e.g. classic picture 8 from Raft paper). - /// Thus we don't try to predict it here. - pub async fn recovery_needed(&self, heartbeat_timeout: Duration) -> RecoveryNeededInfo { - let ss = self.read_shared_state().await; - let term = ss.sk.state.acceptor_state.term; - let last_log_term = ss.sk.get_last_log_term(); - let flush_lsn = ss.sk.flush_lsn(); - // note that peers contain myself, but that's ok -- we are interested only in peers which are strictly ahead of us. - let mut peers = ss.get_peers(heartbeat_timeout); - // Sort by pairs. - peers.sort_by(|p1, p2| { - let tl1 = TermLsn { - term: p1.last_log_term, - lsn: p1.flush_lsn, - }; - let tl2 = TermLsn { - term: p2.last_log_term, - lsn: p2.flush_lsn, - }; - tl2.cmp(&tl1) // desc - }); - let num_streaming_computes = self.walreceivers.get_num_streaming(); - let donors = if num_streaming_computes > 0 { - vec![] // If there is a streaming compute, don't try to recover to not intervene. - } else { - peers - .iter() - .filter_map(|candidate| { - // Are we interested in this candidate? - let candidate_tl = TermLsn { - term: candidate.last_log_term, - lsn: candidate.flush_lsn, - }; - let my_tl = TermLsn { - term: last_log_term, - lsn: flush_lsn, - }; - if my_tl < candidate_tl { - // Yes, we are interested. Can we pull from it without - // (re)running elections? It is possible if 1) his term - // is equal to his last_log_term so we could act on - // behalf of leader of this term (we must be sure he was - // ever elected) and 2) our term is not higher, or we'll refuse data. - if candidate.term == candidate.last_log_term && candidate.term >= term { - Some(Donor::from(candidate)) - } else { - None - } - } else { - None - } - }) - .collect() - }; - RecoveryNeededInfo { - term, - last_log_term, - flush_lsn, - peers, - num_streaming_computes, - donors, - } - } - pub fn get_walsenders(&self) -> &Arc { &self.walsenders } @@ -776,58 +602,6 @@ impl Timeline { self.read_shared_state().await.sk.wal_store.flush_lsn() } - /// Delete WAL segments from disk that are no longer needed. This is determined - /// based on pageserver's remote_consistent_lsn and local backup_lsn/peer_lsn. - pub async fn remove_old_wal(self: &Arc) -> Result<()> { - if self.is_cancelled() { - bail!(TimelineError::Cancelled(self.ttid)); - } - - // If enabled, we use LSN of the most lagging walsender as a WAL removal horizon. - // This allows to get better read speed for pageservers that are lagging behind, - // at the cost of keeping more WAL on disk. - let replication_horizon_lsn = if self.walsenders_keep_horizon { - self.walsenders.laggard_lsn() - } else { - None - }; - - let horizon_segno: XLogSegNo; - let remover = { - let shared_state = self.read_shared_state().await; - horizon_segno = shared_state.get_horizon_segno(replication_horizon_lsn); - if horizon_segno <= 1 || horizon_segno <= shared_state.last_removed_segno { - return Ok(()); // nothing to do - } - - // release the lock before removing - shared_state.sk.wal_store.remove_up_to(horizon_segno - 1) - }; - - // delete old WAL files - remover.await?; - - // update last_removed_segno - let mut shared_state = self.write_shared_state().await; - if shared_state.last_removed_segno != horizon_segno { - shared_state.last_removed_segno = horizon_segno; - } else { - shared_state.skip_update = true; - } - Ok(()) - } - - /// Persist control file if there is something to save and enough time - /// passed after the last save. This helps to keep remote_consistent_lsn up - /// to date so that storage nodes restart doesn't cause many pageserver -> - /// safekeeper reconnections. - pub async fn maybe_persist_control_file(self: &Arc, force: bool) -> Result<()> { - let mut guard = self.write_shared_state().await; - let changed = guard.sk.maybe_persist_inmem_control_file(force).await?; - guard.skip_update = !changed; - Ok(()) - } - /// Gather timeline data for metrics. pub async fn info_for_metrics(&self) -> Option { if self.is_cancelled() { @@ -843,7 +617,7 @@ impl Timeline { wal_backup_active: self.wal_backup_active.load(Ordering::Relaxed), timeline_is_active: self.broker_active.load(Ordering::Relaxed), num_computes: self.walreceivers.get_num() as u32, - last_removed_segno: state.last_removed_segno, + last_removed_segno: self.last_removed_segno.load(Ordering::Relaxed), epoch_start_lsn: state.sk.term_start_lsn, mem_state: state.sk.state.inmem.clone(), persisted_state: state.sk.state.clone(), @@ -866,7 +640,7 @@ impl Timeline { wal_backup_active: self.wal_backup_active.load(Ordering::Relaxed), active: self.broker_active.load(Ordering::Relaxed), num_computes: self.walreceivers.get_num() as u32, - last_removed_segno: state.last_removed_segno, + last_removed_segno: self.last_removed_segno.load(Ordering::Relaxed), epoch_start_lsn: state.sk.term_start_lsn, mem_state: state.sk.state.inmem.clone(), write_lsn, @@ -889,6 +663,110 @@ impl Timeline { state.sk.state.finish_change(&persistent_state).await?; Ok(res) } + + /// Get the timeline guard for reading/writing WAL files. + /// TODO: if WAL files are not present on disk (evicted), they will be + /// downloaded from S3. Also there will logic for preventing eviction + /// while someone is holding FullAccessTimeline guard. + pub async fn full_access_guard(self: &Arc) -> Result { + if self.is_cancelled() { + bail!(TimelineError::Cancelled(self.ttid)); + } + Ok(FullAccessTimeline { tli: self.clone() }) + } +} + +/// This is a guard that allows to read/write disk timeline state. +/// All tasks that are using the disk should use this guard. +#[derive(Clone)] +pub struct FullAccessTimeline { + pub tli: Arc, +} + +impl Deref for FullAccessTimeline { + type Target = Arc; + + fn deref(&self) -> &Self::Target { + &self.tli + } +} + +impl FullAccessTimeline { + /// Returns true if walsender should stop sending WAL to pageserver. We + /// terminate it if remote_consistent_lsn reached commit_lsn and there is no + /// computes. While there might be nothing to stream already, we learn about + /// remote_consistent_lsn update through replication feedback, and we want + /// to stop pushing to the broker if pageserver is fully caughtup. + pub async fn should_walsender_stop(&self, reported_remote_consistent_lsn: Lsn) -> bool { + if self.is_cancelled() { + return true; + } + let shared_state = self.read_shared_state().await; + if self.walreceivers.get_num() == 0 { + return shared_state.sk.state.inmem.commit_lsn == Lsn(0) || // no data at all yet + reported_remote_consistent_lsn >= shared_state.sk.state.inmem.commit_lsn; + } + false + } + + /// Ensure that current term is t, erroring otherwise, and lock the state. + pub async fn acquire_term(&self, t: Term) -> Result { + let ss = self.read_shared_state().await; + if ss.sk.state.acceptor_state.term != t { + bail!( + "failed to acquire term {}, current term {}", + t, + ss.sk.state.acceptor_state.term + ); + } + Ok(ss) + } + + /// Pass arrived message to the safekeeper. + pub async fn process_msg( + &self, + msg: &ProposerAcceptorMessage, + ) -> Result> { + if self.is_cancelled() { + bail!(TimelineError::Cancelled(self.ttid)); + } + + let mut rmsg: Option; + { + let mut shared_state = self.write_shared_state().await; + rmsg = shared_state.sk.process_msg(msg).await?; + + // if this is AppendResponse, fill in proper hot standby feedback. + if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg { + resp.hs_feedback = self.walsenders.get_hotstandby().hs_feedback; + } + } + Ok(rmsg) + } + + pub async fn get_walreader(&self, start_lsn: Lsn) -> Result { + let (_, persisted_state) = self.get_state().await; + let enable_remote_read = GlobalTimelines::get_global_config().is_wal_backup_enabled(); + + WalReader::new( + &self.ttid, + self.timeline_dir.clone(), + &persisted_state, + start_lsn, + enable_remote_read, + ) + } + + pub fn get_timeline_dir(&self) -> Utf8PathBuf { + self.timeline_dir.clone() + } + + /// Update in memory remote consistent lsn. + pub async fn update_remote_consistent_lsn(&self, candidate: Lsn) { + let mut shared_state = self.write_shared_state().await; + shared_state.sk.state.inmem.remote_consistent_lsn = + max(shared_state.sk.state.inmem.remote_consistent_lsn, candidate); + } } /// Deletes directory and it's contents. Returns false if directory does not exist. @@ -899,3 +777,16 @@ async fn delete_dir(path: &Utf8PathBuf) -> Result { Err(e) => Err(e.into()), } } + +/// Get a path to the tenant directory. If you just need to get a timeline directory, +/// use FullAccessTimeline::get_timeline_dir instead. +pub(crate) fn get_tenant_dir(conf: &SafeKeeperConf, tenant_id: &TenantId) -> Utf8PathBuf { + conf.workdir.join(tenant_id.to_string()) +} + +/// Get a path to the timeline directory. If you need to read WAL files from disk, +/// use FullAccessTimeline::get_timeline_dir instead. This function does not check +/// timeline eviction status and WAL files might not be present on disk. +pub(crate) fn get_timeline_dir(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Utf8PathBuf { + get_tenant_dir(conf, &ttid.tenant_id).join(ttid.timeline_id.to_string()) +} diff --git a/safekeeper/src/timeline_manager.rs b/safekeeper/src/timeline_manager.rs index ed544352f9..84862207d5 100644 --- a/safekeeper/src/timeline_manager.rs +++ b/safekeeper/src/timeline_manager.rs @@ -3,23 +3,42 @@ //! It watches for changes in the timeline state and decides when to spawn or kill background tasks. //! It also can manage some reactive state, like should the timeline be active for broker pushes or not. -use std::{sync::Arc, time::Duration}; +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; +use postgres_ffi::XLogSegNo; +use tokio::task::{JoinError, JoinHandle}; use tracing::{info, instrument, warn}; use utils::lsn::Lsn; use crate::{ + control_file::Storage, metrics::{MANAGER_ACTIVE_CHANGES, MANAGER_ITERATIONS_TOTAL}, + recovery::recovery_main, + remove_wal::calc_horizon_lsn, + send_wal::WalSenders, timeline::{PeerInfo, ReadGuardSharedState, Timeline}, - timelines_set::TimelinesSet, + timelines_set::{TimelineSetGuard, TimelinesSet}, wal_backup::{self, WalBackupTaskHandle}, - SafeKeeperConf, + wal_backup_partial, SafeKeeperConf, }; pub struct StateSnapshot { + // inmem values pub commit_lsn: Lsn, pub backup_lsn: Lsn, pub remote_consistent_lsn: Lsn, + + // persistent control file values + pub cfile_peer_horizon_lsn: Lsn, + pub cfile_remote_consistent_lsn: Lsn, + pub cfile_backup_lsn: Lsn, + + // misc + pub cfile_last_persist_at: Instant, + pub inmem_flush_pending: bool, pub peers: Vec, } @@ -30,17 +49,34 @@ impl StateSnapshot { commit_lsn: read_guard.sk.state.inmem.commit_lsn, backup_lsn: read_guard.sk.state.inmem.backup_lsn, remote_consistent_lsn: read_guard.sk.state.inmem.remote_consistent_lsn, + cfile_peer_horizon_lsn: read_guard.sk.state.peer_horizon_lsn, + cfile_remote_consistent_lsn: read_guard.sk.state.remote_consistent_lsn, + cfile_backup_lsn: read_guard.sk.state.backup_lsn, + cfile_last_persist_at: read_guard.sk.state.pers.last_persist_at(), + inmem_flush_pending: Self::has_unflushed_inmem_state(&read_guard), peers: read_guard.get_peers(heartbeat_timeout), } } + + fn has_unflushed_inmem_state(read_guard: &ReadGuardSharedState) -> bool { + let state = &read_guard.sk.state; + state.inmem.commit_lsn > state.commit_lsn + || state.inmem.backup_lsn > state.backup_lsn + || state.inmem.peer_horizon_lsn > state.peer_horizon_lsn + || state.inmem.remote_consistent_lsn > state.remote_consistent_lsn + } } /// Control how often the manager task should wake up to check updates. /// There is no need to check for updates more often than this. const REFRESH_INTERVAL: Duration = Duration::from_millis(300); +/// How often to save the control file if the is no other activity. +const CF_SAVE_INTERVAL: Duration = Duration::from_secs(300); + /// This task gets spawned alongside each timeline and is responsible for managing the timeline's /// background tasks. +/// Be careful, this task is not respawned on panic, so it should not panic. #[instrument(name = "manager", skip_all, fields(ttid = %tli.ttid))] pub async fn main_task( tli: Arc, @@ -55,20 +91,50 @@ pub async fn main_task( } }; - // sets whether timeline is active for broker pushes or not - let mut tli_broker_active = broker_active_set.guard(tli.clone()); - - let ttid = tli.ttid; + // configuration & dependencies let wal_seg_size = tli.get_wal_seg_size().await; let heartbeat_timeout = conf.heartbeat_timeout; - - let mut state_version_rx = tli.get_state_version_rx(); - + let walsenders = tli.get_walsenders(); let walreceivers = tli.get_walreceivers(); + + // current state + let mut state_version_rx = tli.get_state_version_rx(); let mut num_computes_rx = walreceivers.get_num_rx(); + let mut tli_broker_active = broker_active_set.guard(tli.clone()); + let mut last_removed_segno = 0 as XLogSegNo; // list of background tasks let mut backup_task: Option = None; + let mut recovery_task: Option> = None; + let mut partial_backup_task: Option> = None; + let mut wal_removal_task: Option>> = None; + + // Start recovery task which always runs on the timeline. + if conf.peer_recovery_enabled { + match tli.full_access_guard().await { + Ok(tli) => { + recovery_task = Some(tokio::spawn(recovery_main(tli, conf.clone()))); + } + Err(e) => { + warn!("failed to start recovery task: {:?}", e); + } + } + } + + // Start partial backup task which always runs on the timeline. + if conf.is_wal_backup_enabled() && conf.partial_backup_enabled { + match tli.full_access_guard().await { + Ok(tli) => { + partial_backup_task = Some(tokio::spawn(wal_backup_partial::main_task( + tli, + conf.clone(), + ))); + } + Err(e) => { + warn!("failed to start partial backup task: {:?}", e); + } + } + } let last_state = 'outer: loop { MANAGER_ITERATIONS_TOTAL.inc(); @@ -76,47 +142,36 @@ pub async fn main_task( let state_snapshot = StateSnapshot::new(tli.read_shared_state().await, heartbeat_timeout); let num_computes = *num_computes_rx.borrow(); - let is_wal_backup_required = - wal_backup::is_wal_backup_required(wal_seg_size, num_computes, &state_snapshot); + let is_wal_backup_required = update_backup( + &conf, + &tli, + wal_seg_size, + num_computes, + &state_snapshot, + &mut backup_task, + ) + .await; - if conf.is_wal_backup_enabled() { - wal_backup::update_task( - &conf, - ttid, - is_wal_backup_required, - &state_snapshot, - &mut backup_task, - ) - .await; - } + let _is_active = update_is_active( + is_wal_backup_required, + num_computes, + &state_snapshot, + &mut tli_broker_active, + &tli, + ); - let is_active = is_wal_backup_required - || num_computes > 0 - || state_snapshot.remote_consistent_lsn < state_snapshot.commit_lsn; + let next_cfile_save = update_control_file_save(&state_snapshot, &tli).await; - // update the broker timeline set - if tli_broker_active.set(is_active) { - // write log if state has changed - info!( - "timeline active={} now, remote_consistent_lsn={}, commit_lsn={}", - is_active, state_snapshot.remote_consistent_lsn, state_snapshot.commit_lsn, - ); - - MANAGER_ACTIVE_CHANGES.inc(); - - if !is_active { - // TODO: maybe use tokio::spawn? - if let Err(e) = tli.maybe_persist_control_file(false).await { - warn!("control file save in update_status failed: {:?}", e); - } - } - } - - // update the state in Arc - tli.wal_backup_active - .store(backup_task.is_some(), std::sync::atomic::Ordering::Relaxed); - tli.broker_active - .store(is_active, std::sync::atomic::Ordering::Relaxed); + update_wal_removal( + &conf, + walsenders, + &tli, + wal_seg_size, + &state_snapshot, + last_removed_segno, + &mut wal_removal_task, + ) + .await; // wait until something changes. tx channels are stored under Arc, so they will not be // dropped until the manager task is finished. @@ -135,11 +190,189 @@ pub async fn main_task( _ = num_computes_rx.changed() => { // number of connected computes was updated } + _ = async { + if let Some(timeout) = next_cfile_save { + tokio::time::sleep_until(timeout).await + } else { + futures::future::pending().await + } + } => { + // it's time to save the control file + } + res = async { + if let Some(task) = &mut wal_removal_task { + task.await + } else { + futures::future::pending().await + } + } => { + // WAL removal task finished + wal_removal_task = None; + update_wal_removal_end(res, &tli, &mut last_removed_segno); + } } }; // shutdown background tasks if conf.is_wal_backup_enabled() { - wal_backup::update_task(&conf, ttid, false, &last_state, &mut backup_task).await; + wal_backup::update_task(&conf, &tli, false, &last_state, &mut backup_task).await; + } + + if let Some(recovery_task) = recovery_task { + if let Err(e) = recovery_task.await { + warn!("recovery task failed: {:?}", e); + } + } + + if let Some(partial_backup_task) = partial_backup_task { + if let Err(e) = partial_backup_task.await { + warn!("partial backup task failed: {:?}", e); + } + } + + if let Some(wal_removal_task) = wal_removal_task { + let res = wal_removal_task.await; + update_wal_removal_end(res, &tli, &mut last_removed_segno); } } + +/// Spawns/kills backup task and returns true if backup is required. +async fn update_backup( + conf: &SafeKeeperConf, + tli: &Arc, + wal_seg_size: usize, + num_computes: usize, + state: &StateSnapshot, + backup_task: &mut Option, +) -> bool { + let is_wal_backup_required = + wal_backup::is_wal_backup_required(wal_seg_size, num_computes, state); + + if conf.is_wal_backup_enabled() { + wal_backup::update_task(conf, tli, is_wal_backup_required, state, backup_task).await; + } + + // update the state in Arc + tli.wal_backup_active + .store(backup_task.is_some(), std::sync::atomic::Ordering::Relaxed); + is_wal_backup_required +} + +/// Update is_active flag and returns its value. +fn update_is_active( + is_wal_backup_required: bool, + num_computes: usize, + state: &StateSnapshot, + tli_broker_active: &mut TimelineSetGuard, + tli: &Arc, +) -> bool { + let is_active = is_wal_backup_required + || num_computes > 0 + || state.remote_consistent_lsn < state.commit_lsn; + + // update the broker timeline set + if tli_broker_active.set(is_active) { + // write log if state has changed + info!( + "timeline active={} now, remote_consistent_lsn={}, commit_lsn={}", + is_active, state.remote_consistent_lsn, state.commit_lsn, + ); + + MANAGER_ACTIVE_CHANGES.inc(); + } + + // update the state in Arc + tli.broker_active + .store(is_active, std::sync::atomic::Ordering::Relaxed); + is_active +} + +/// Save control file if needed. Returns Instant if we should persist the control file in the future. +async fn update_control_file_save( + state: &StateSnapshot, + tli: &Arc, +) -> Option { + if !state.inmem_flush_pending { + return None; + } + + if state.cfile_last_persist_at.elapsed() > CF_SAVE_INTERVAL { + let mut write_guard = tli.write_shared_state().await; + // this can be done in the background because it blocks manager task, but flush() should + // be fast enough not to be a problem now + if let Err(e) = write_guard.sk.state.flush().await { + warn!("failed to save control file: {:?}", e); + } + + None + } else { + // we should wait until next CF_SAVE_INTERVAL + Some((state.cfile_last_persist_at + CF_SAVE_INTERVAL).into()) + } +} + +/// Spawns WAL removal task if needed. +async fn update_wal_removal( + conf: &SafeKeeperConf, + walsenders: &Arc, + tli: &Arc, + wal_seg_size: usize, + state: &StateSnapshot, + last_removed_segno: u64, + wal_removal_task: &mut Option>>, +) { + if wal_removal_task.is_some() { + // WAL removal is already in progress + return; + } + + // If enabled, we use LSN of the most lagging walsender as a WAL removal horizon. + // This allows to get better read speed for pageservers that are lagging behind, + // at the cost of keeping more WAL on disk. + let replication_horizon_lsn = if conf.walsenders_keep_horizon { + walsenders.laggard_lsn() + } else { + None + }; + + let removal_horizon_lsn = calc_horizon_lsn(state, replication_horizon_lsn); + let removal_horizon_segno = removal_horizon_lsn + .segment_number(wal_seg_size) + .saturating_sub(1); + + if removal_horizon_segno > last_removed_segno { + // we need to remove WAL + let remover = crate::wal_storage::Storage::remove_up_to( + &tli.read_shared_state().await.sk.wal_store, + removal_horizon_segno, + ); + *wal_removal_task = Some(tokio::spawn(async move { + remover.await?; + Ok(removal_horizon_segno) + })); + } +} + +/// Update the state after WAL removal task finished. +fn update_wal_removal_end( + res: Result, JoinError>, + tli: &Arc, + last_removed_segno: &mut u64, +) { + let new_last_removed_segno = match res { + Ok(Ok(segno)) => segno, + Err(e) => { + warn!("WAL removal task failed: {:?}", e); + return; + } + Ok(Err(e)) => { + warn!("WAL removal task failed: {:?}", e); + return; + } + }; + + *last_removed_segno = new_last_removed_segno; + // update the state in Arc + tli.last_removed_segno + .store(new_last_removed_segno, std::sync::atomic::Ordering::Relaxed); +} diff --git a/safekeeper/src/timelines_global_map.rs b/safekeeper/src/timelines_global_map.rs index 8d37bd6371..45e08ede3c 100644 --- a/safekeeper/src/timelines_global_map.rs +++ b/safekeeper/src/timelines_global_map.rs @@ -3,7 +3,7 @@ //! all from the disk on startup and keeping them in memory. use crate::safekeeper::ServerInfo; -use crate::timeline::{Timeline, TimelineError}; +use crate::timeline::{get_tenant_dir, get_timeline_dir, Timeline, TimelineError}; use crate::timelines_set::TimelinesSet; use crate::SafeKeeperConf; use anyhow::{bail, Context, Result}; @@ -127,7 +127,7 @@ impl GlobalTimelines { state.get_dependencies() }; - let timelines_dir = conf.tenant_dir(&tenant_id); + let timelines_dir = get_tenant_dir(&conf, &tenant_id); for timelines_dir_entry in std::fs::read_dir(&timelines_dir) .with_context(|| format!("failed to list timelines dir {}", timelines_dir))? { @@ -348,11 +348,7 @@ impl GlobalTimelines { } Err(_) => { // Timeline is not memory, but it may still exist on disk in broken state. - let dir_path = TIMELINES_STATE - .lock() - .unwrap() - .get_conf() - .timeline_dir(ttid); + let dir_path = get_timeline_dir(TIMELINES_STATE.lock().unwrap().get_conf(), ttid); let dir_existed = delete_dir(dir_path)?; Ok(TimelineDeleteForceResult { @@ -401,13 +397,10 @@ impl GlobalTimelines { // Note that we could concurrently create new timelines while we were deleting them, // so the directory may be not empty. In this case timelines will have bad state // and timeline background jobs can panic. - delete_dir( - TIMELINES_STATE - .lock() - .unwrap() - .get_conf() - .tenant_dir(tenant_id), - )?; + delete_dir(get_tenant_dir( + TIMELINES_STATE.lock().unwrap().get_conf(), + tenant_id, + ))?; // FIXME: we temporarily disabled removing timelines from the map, see `delete_force` // let tlis_after_delete = Self::get_all_for_tenant(*tenant_id); diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index 84680557f9..58591aecfa 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -30,9 +30,9 @@ use tracing::*; use utils::{id::TenantTimelineId, lsn::Lsn}; use crate::metrics::{BACKED_UP_SEGMENTS, BACKUP_ERRORS, WAL_BACKUP_TASKS}; -use crate::timeline::{PeerInfo, Timeline}; +use crate::timeline::{FullAccessTimeline, PeerInfo, Timeline}; use crate::timeline_manager::StateSnapshot; -use crate::{GlobalTimelines, SafeKeeperConf, WAL_BACKUP_RUNTIME}; +use crate::{SafeKeeperConf, WAL_BACKUP_RUNTIME}; use once_cell::sync::OnceCell; @@ -63,13 +63,13 @@ pub fn is_wal_backup_required( /// is running, kill it. pub async fn update_task( conf: &SafeKeeperConf, - ttid: TenantTimelineId, + tli: &Arc, need_backup: bool, state: &StateSnapshot, entry: &mut Option, ) { let (offloader, election_dbg_str) = - determine_offloader(&state.peers, state.backup_lsn, ttid, conf); + determine_offloader(&state.peers, state.backup_lsn, tli.ttid, conf); let elected_me = Some(conf.my_id) == offloader; let should_task_run = need_backup && elected_me; @@ -80,15 +80,8 @@ pub async fn update_task( info!("elected for backup: {}", election_dbg_str); let (shutdown_tx, shutdown_rx) = mpsc::channel(1); - let timeline_dir = conf.timeline_dir(&ttid); - let async_task = backup_task_main( - ttid, - timeline_dir, - conf.workdir.clone(), - conf.backup_parallel_jobs, - shutdown_rx, - ); + let async_task = backup_task_main(tli.clone(), conf.backup_parallel_jobs, shutdown_rx); let handle = if conf.current_thread_runtime { tokio::spawn(async_task) @@ -198,39 +191,32 @@ pub fn init_remote_storage(conf: &SafeKeeperConf) { } struct WalBackupTask { - timeline: Arc, + timeline: FullAccessTimeline, timeline_dir: Utf8PathBuf, - workspace_dir: Utf8PathBuf, wal_seg_size: usize, parallel_jobs: usize, commit_lsn_watch_rx: watch::Receiver, } /// Offload single timeline. -#[instrument(name = "WAL backup", skip_all, fields(ttid = %ttid))] -async fn backup_task_main( - ttid: TenantTimelineId, - timeline_dir: Utf8PathBuf, - workspace_dir: Utf8PathBuf, - parallel_jobs: usize, - mut shutdown_rx: Receiver<()>, -) { +#[instrument(name = "WAL backup", skip_all, fields(ttid = %tli.ttid))] +async fn backup_task_main(tli: Arc, parallel_jobs: usize, mut shutdown_rx: Receiver<()>) { let _guard = WAL_BACKUP_TASKS.guard(); + let tli = match tli.full_access_guard().await { + Ok(tli) => tli, + Err(e) => { + error!("backup error: {}", e); + return; + } + }; info!("started"); - let res = GlobalTimelines::get(ttid); - if let Err(e) = res { - error!("backup error: {}", e); - return; - } - let tli = res.unwrap(); let mut wb = WalBackupTask { wal_seg_size: tli.get_wal_seg_size().await, commit_lsn_watch_rx: tli.get_commit_lsn_watch_rx(), + timeline_dir: tli.get_timeline_dir(), timeline: tli, - timeline_dir, - workspace_dir, parallel_jobs, }; @@ -297,7 +283,6 @@ impl WalBackupTask { commit_lsn, self.wal_seg_size, &self.timeline_dir, - &self.workspace_dir, self.parallel_jobs, ) .await @@ -319,18 +304,18 @@ impl WalBackupTask { } async fn backup_lsn_range( - timeline: &Arc, + timeline: &FullAccessTimeline, backup_lsn: &mut Lsn, end_lsn: Lsn, wal_seg_size: usize, timeline_dir: &Utf8Path, - workspace_dir: &Utf8Path, parallel_jobs: usize, ) -> Result<()> { if parallel_jobs < 1 { anyhow::bail!("parallel_jobs must be >= 1"); } + let remote_timeline_path = remote_timeline_path(&timeline.ttid)?; let start_lsn = *backup_lsn; let segments = get_segments(start_lsn, end_lsn, wal_seg_size); @@ -343,7 +328,11 @@ async fn backup_lsn_range( loop { let added_task = match iter.next() { Some(s) => { - uploads.push_back(backup_single_segment(s, timeline_dir, workspace_dir)); + uploads.push_back(backup_single_segment( + s, + timeline_dir, + &remote_timeline_path, + )); true } None => false, @@ -381,18 +370,10 @@ async fn backup_lsn_range( async fn backup_single_segment( seg: &Segment, timeline_dir: &Utf8Path, - workspace_dir: &Utf8Path, + remote_timeline_path: &RemotePath, ) -> Result { let segment_file_path = seg.file_path(timeline_dir)?; - let remote_segment_path = segment_file_path - .strip_prefix(workspace_dir) - .context("Failed to strip workspace dir prefix") - .and_then(RemotePath::new) - .with_context(|| { - format!( - "Failed to resolve remote part of path {segment_file_path:?} for base {workspace_dir:?}", - ) - })?; + let remote_segment_path = seg.remote_path(remote_timeline_path); let res = backup_object(&segment_file_path, &remote_segment_path, seg.size()).await; if res.is_ok() { @@ -430,6 +411,10 @@ impl Segment { Ok(timeline_dir.join(self.object_name())) } + pub fn remote_path(self, remote_timeline_path: &RemotePath) -> RemotePath { + remote_timeline_path.join(self.object_name()) + } + pub fn size(self) -> usize { (u64::from(self.end_lsn) - u64::from(self.start_lsn)) as usize } @@ -530,8 +515,7 @@ pub async fn read_object( /// when called. pub async fn delete_timeline(ttid: &TenantTimelineId) -> Result<()> { let storage = get_configured_remote_storage(); - let ttid_path = Utf8Path::new(&ttid.tenant_id.to_string()).join(ttid.timeline_id.to_string()); - let remote_path = RemotePath::new(&ttid_path)?; + let remote_path = remote_timeline_path(ttid)?; // see DEFAULT_MAX_KEYS_PER_LIST_RESPONSE // const Option unwrap is not stable, otherwise it would be const. @@ -613,15 +597,17 @@ pub async fn copy_s3_segments( .as_ref() .unwrap(); - let relative_dst_path = - Utf8Path::new(&dst_ttid.tenant_id.to_string()).join(dst_ttid.timeline_id.to_string()); - - let remote_path = RemotePath::new(&relative_dst_path)?; + let remote_dst_path = remote_timeline_path(dst_ttid)?; let cancel = CancellationToken::new(); let files = storage - .list(Some(&remote_path), ListingMode::NoDelimiter, None, &cancel) + .list( + Some(&remote_dst_path), + ListingMode::NoDelimiter, + None, + &cancel, + ) .await? .keys; @@ -635,9 +621,6 @@ pub async fn copy_s3_segments( uploaded_segments ); - let relative_src_path = - Utf8Path::new(&src_ttid.tenant_id.to_string()).join(src_ttid.timeline_id.to_string()); - for segno in from_segment..to_segment { if segno % SEGMENTS_PROGRESS_REPORT_INTERVAL == 0 { info!("copied all segments from {} until {}", from_segment, segno); @@ -649,8 +632,8 @@ pub async fn copy_s3_segments( } debug!("copying segment {}", segment_name); - let from = RemotePath::new(&relative_src_path.join(&segment_name))?; - let to = RemotePath::new(&relative_dst_path.join(&segment_name))?; + let from = remote_timeline_path(src_ttid)?.join(&segment_name); + let to = remote_dst_path.join(&segment_name); storage.copy_object(&from, &to, &cancel).await?; } @@ -661,3 +644,8 @@ pub async fn copy_s3_segments( ); Ok(()) } + +/// Get S3 (remote_storage) prefix path used for timeline files. +pub fn remote_timeline_path(ttid: &TenantTimelineId) -> Result { + RemotePath::new(&Utf8Path::new(&ttid.tenant_id.to_string()).join(ttid.timeline_id.to_string())) +} diff --git a/safekeeper/src/wal_backup_partial.rs b/safekeeper/src/wal_backup_partial.rs index 29e944bff3..a320be3bad 100644 --- a/safekeeper/src/wal_backup_partial.rs +++ b/safekeeper/src/wal_backup_partial.rs @@ -18,8 +18,6 @@ //! This way control file stores information about all potentially existing //! remote partial segments and can clean them up after uploading a newer version. -use std::sync::Arc; - use camino::Utf8PathBuf; use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI}; use rand::Rng; @@ -32,8 +30,9 @@ use utils::lsn::Lsn; use crate::{ metrics::{PARTIAL_BACKUP_UPLOADED_BYTES, PARTIAL_BACKUP_UPLOADS}, safekeeper::Term, - timeline::Timeline, - wal_backup, SafeKeeperConf, + timeline::FullAccessTimeline, + wal_backup::{self, remote_timeline_path}, + SafeKeeperConf, }; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] @@ -83,10 +82,10 @@ impl State { struct PartialBackup { wal_seg_size: usize, - tli: Arc, + tli: FullAccessTimeline, conf: SafeKeeperConf, local_prefix: Utf8PathBuf, - remote_prefix: Utf8PathBuf, + remote_timeline_path: RemotePath, state: State, } @@ -153,7 +152,7 @@ impl PartialBackup { let backup_bytes = flush_lsn.segment_offset(self.wal_seg_size); let local_path = self.local_prefix.join(self.local_segment_name(segno)); - let remote_path = RemotePath::new(self.remote_prefix.join(&prepared.name).as_ref())?; + let remote_path = self.remote_timeline_path.join(&prepared.name); // Upload first `backup_bytes` bytes of the segment to the remote storage. wal_backup::backup_partial_segment(&local_path, &remote_path, backup_bytes).await?; @@ -253,7 +252,7 @@ impl PartialBackup { info!("deleting objects: {:?}", segments_to_delete); let mut objects_to_delete = vec![]; for seg in segments_to_delete.iter() { - let remote_path = RemotePath::new(self.remote_prefix.join(seg).as_ref())?; + let remote_path = self.remote_timeline_path.join(seg); objects_to_delete.push(remote_path); } @@ -273,7 +272,7 @@ impl PartialBackup { } #[instrument(name = "Partial backup", skip_all, fields(ttid = %tli.ttid))] -pub async fn main_task(tli: Arc, conf: SafeKeeperConf) { +pub async fn main_task(tli: FullAccessTimeline, conf: SafeKeeperConf) { debug!("started"); let await_duration = conf.partial_backup_timeout; @@ -289,11 +288,11 @@ pub async fn main_task(tli: Arc, conf: SafeKeeperConf) { let mut flush_lsn_rx = tli.get_term_flush_lsn_watch_rx(); let wal_seg_size = tli.get_wal_seg_size().await; - let local_prefix = tli.timeline_dir.clone(); - let remote_prefix = match tli.timeline_dir.strip_prefix(&conf.workdir) { - Ok(path) => path.to_owned(), + let local_prefix = tli.get_timeline_dir(); + let remote_timeline_path = match remote_timeline_path(&tli.ttid) { + Ok(path) => path, Err(e) => { - error!("failed to strip workspace dir prefix: {:?}", e); + error!("failed to create remote path: {:?}", e); return; } }; @@ -304,7 +303,7 @@ pub async fn main_task(tli: Arc, conf: SafeKeeperConf) { state: persistent_state.partial_backup, conf, local_prefix, - remote_prefix, + remote_timeline_path, }; debug!("state: {:?}", backup.state); diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index 6bc8c7c3f9..45e27e1951 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -25,7 +25,7 @@ use utils::crashsafe::durable_rename; use crate::metrics::{time_io_closure, WalStorageMetrics, REMOVED_WAL_SEGMENTS}; use crate::state::TimelinePersistentState; -use crate::wal_backup::read_object; +use crate::wal_backup::{read_object, remote_timeline_path}; use crate::SafeKeeperConf; use postgres_ffi::waldecoder::WalStreamDecoder; use postgres_ffi::XLogFileName; @@ -536,7 +536,7 @@ async fn remove_segments_from_disk( } pub struct WalReader { - workdir: Utf8PathBuf, + remote_path: RemotePath, timeline_dir: Utf8PathBuf, wal_seg_size: usize, pos: Lsn, @@ -558,7 +558,7 @@ pub struct WalReader { impl WalReader { pub fn new( - workdir: Utf8PathBuf, + ttid: &TenantTimelineId, timeline_dir: Utf8PathBuf, state: &TimelinePersistentState, start_pos: Lsn, @@ -586,7 +586,7 @@ impl WalReader { } Ok(Self { - workdir, + remote_path: remote_timeline_path(ttid)?, timeline_dir, wal_seg_size: state.server.wal_seg_size as usize, pos: start_pos, @@ -684,7 +684,7 @@ impl WalReader { let xlogoff = self.pos.segment_offset(self.wal_seg_size); let segno = self.pos.segment_number(self.wal_seg_size); let wal_file_name = XLogFileName(PG_TLI, segno, self.wal_seg_size); - let wal_file_path = self.timeline_dir.join(wal_file_name); + let wal_file_path = self.timeline_dir.join(&wal_file_name); // Try to open local file, if we may have WAL locally if self.pos >= self.local_start_lsn { @@ -712,16 +712,7 @@ impl WalReader { // Try to open remote file, if remote reads are enabled if self.enable_remote_read { - let remote_wal_file_path = wal_file_path - .strip_prefix(&self.workdir) - .context("Failed to strip workdir prefix") - .and_then(RemotePath::new) - .with_context(|| { - format!( - "Failed to resolve remote part of path {:?} for base {:?}", - wal_file_path, self.workdir, - ) - })?; + let remote_wal_file_path = self.remote_path.join(&wal_file_name); return read_object(&remote_wal_file_path, xlogoff as u64).await; } diff --git a/test_runner/fixtures/common_types.py b/test_runner/fixtures/common_types.py index e9be765669..147264762c 100644 --- a/test_runner/fixtures/common_types.py +++ b/test_runner/fixtures/common_types.py @@ -72,6 +72,18 @@ class Lsn: def segment_lsn(self, seg_sz: int = DEFAULT_WAL_SEG_SIZE) -> "Lsn": return Lsn(self.lsn_int - (self.lsn_int % seg_sz)) + def segno(self, seg_sz: int = DEFAULT_WAL_SEG_SIZE) -> int: + return self.lsn_int // seg_sz + + def segment_name(self, seg_sz: int = DEFAULT_WAL_SEG_SIZE) -> str: + segno = self.segno(seg_sz) + # The filename format is 00000001XXXXXXXX000000YY, where XXXXXXXXYY is segno in hex. + # XXXXXXXX is the higher 8 hex digits of segno + high_bits = segno >> 8 + # YY is the lower 2 hex digits of segno + low_bits = segno & 0xFF + return f"00000001{high_bits:08X}000000{low_bits:02X}" + @dataclass(frozen=True) class Key: diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index b8ef63faa9..0004745bf0 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -973,6 +973,9 @@ class NeonEnvBuilder: for pageserver in self.env.pageservers: pageserver.assert_no_errors() + for safekeeper in self.env.safekeepers: + safekeeper.assert_no_errors() + self.env.storage_controller.assert_no_errors() try: @@ -3813,6 +3816,9 @@ class Safekeeper(LogUtils): self.running = False return self + def assert_no_errors(self): + assert not self.log_contains("manager task finished prematurely") + def append_logical_message( self, tenant_id: TenantId, timeline_id: TimelineId, request: Dict[str, Any] ) -> Dict[str, Any]: @@ -3898,6 +3904,15 @@ class Safekeeper(LogUtils): """ cli = self.http_client() + target_segment_file = lsn.segment_name() + + def are_segments_removed(): + segments = self.list_segments(tenant_id, timeline_id) + log.info( + f"waiting for all segments before {target_segment_file} to be removed from sk {self.id}, current segments: {segments}" + ) + assert all(target_segment_file <= s for s in segments) + def are_lsns_advanced(): stat = cli.timeline_status(tenant_id, timeline_id) log.info( @@ -3909,6 +3924,7 @@ class Safekeeper(LogUtils): # pageserver to this safekeeper wait_until(30, 1, are_lsns_advanced) cli.checkpoint(tenant_id, timeline_id) + wait_until(30, 1, are_segments_removed) def wait_until_paused(self, failpoint: str): msg = f"at failpoint {failpoint}"