diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index f9ba4c763b..288c13a056 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -960,15 +960,19 @@ where /// Get oldest segno we still need to keep. We hold WAL till it is consumed /// by all of 1) pageserver (remote_consistent_lsn) 2) peers 3) s3 /// offloading. - /// While it is safe to use inmem values for determining horizon, - /// we use persistent to make possible normal states less surprising. - pub fn get_horizon_segno(&self, wal_backup_enabled: bool) -> XLogSegNo { - let mut horizon_lsn = min( - self.state.remote_consistent_lsn, - self.state.peer_horizon_lsn, - ); + /// + /// Use inmem values. It rarely might create situation when we try accessing + /// removed WAL segment (e.g. offload already offloaded and removed locally + /// WAL segment), but this avoids out of space deadlock when removing WAL + /// requires control file update on disc. + pub fn get_horizon_segno( + &self, + wal_backup_enabled: bool, + remote_consistent_lsn: Lsn, + ) -> XLogSegNo { + let mut horizon_lsn = min(remote_consistent_lsn, self.inmem.peer_horizon_lsn); if wal_backup_enabled { - horizon_lsn = min(horizon_lsn, self.state.backup_lsn); + horizon_lsn = min(horizon_lsn, self.inmem.backup_lsn); } horizon_lsn.segment_number(self.state.server.wal_seg_size as usize) } diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 64ca6967df..a801cb433b 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -654,7 +654,10 @@ impl Timeline { let remover: Box Result<(), anyhow::Error>>; { let shared_state = self.write_shared_state(); - horizon_segno = shared_state.sk.get_horizon_segno(wal_backup_enabled); + horizon_segno = shared_state.sk.get_horizon_segno( + wal_backup_enabled, + self.walsenders.get_remote_consistent_lsn(), + ); remover = shared_state.sk.wal_store.remove_up_to(); if horizon_segno <= 1 || horizon_segno <= shared_state.last_removed_segno { return Ok(()); diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index 163ac99be8..39e3412b37 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -1,10 +1,11 @@ -use anyhow::{Context, Result}; +use anyhow::{bail, Context, Result}; use tokio::task::JoinHandle; use utils::id::NodeId; use std::cmp::min; use std::collections::HashMap; +use std::io::ErrorKind; use std::path::{Path, PathBuf}; use std::pin::Pin; use std::sync::Arc; @@ -452,15 +453,31 @@ async fn backup_object(source_file: &Path, target_file: &RemotePath, size: usize .as_ref() .unwrap(); - let file = tokio::io::BufReader::new(File::open(&source_file).await.with_context(|| { - format!( - "Failed to open file {} for wal backup", - source_file.display() - ) - })?); + let local_file = match File::open(&source_file).await { + Ok(file) => file, + // If segment is not found locally, check whether it is already in s3. + Err(error) => { + match error.kind() { + ErrorKind::NotFound => match storage.download(target_file).await { + Ok(_) => { + info!("segment {:?} found in remote storage", target_file); + return Ok(()); + } + Err(e) => { + bail!("segment {:?} doesn't exist locally and could not be found remotely: {:#}", source_file, e); + } + }, + _ => { + return Err(error.into()); + } + } + } + }; + + let local_file = tokio::io::BufReader::new(local_file); storage - .upload_storage_object(Box::new(file), size, target_file) + .upload_storage_object(Box::new(local_file), size, target_file) .await }