Add FullAccessTimeline guard in safekeepers (#7887)

This is a preparation for
https://github.com/neondatabase/neon/issues/6337.

The idea is to add FullAccessTimeline, which will act as a guard for
tasks requiring access to WAL files. Eviction will be blocked on these
tasks and WAL won't be deleted from disk until there is at least one
active FullAccessTimeline.

To get FullAccessTimeline, tasks call `tli.full_access_guard().await?`.
After eviction is implemented, this function will be responsible for
downloading missing WAL file and waiting until the download finishes.

This commit also contains other small refactorings:
- Separate `get_tenant_dir` and `get_timeline_dir` functions for
building a local path. This is useful for looking at usages and finding
tasks requiring access to local filesystem.
- `timeline_manager` is now responsible for spawning all background
tasks
- WAL removal task is now spawned instantly after horizon is updated
This commit is contained in:
Arthur Petukhovsky
2024-05-31 14:19:45 +01:00
committed by GitHub
parent 5a394fde56
commit 16b2e74037
23 changed files with 726 additions and 576 deletions

View File

@@ -29,13 +29,12 @@ use safekeeper::defaults::{
DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES,
DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR,
};
use safekeeper::remove_wal;
use safekeeper::http;
use safekeeper::wal_service;
use safekeeper::GlobalTimelines;
use safekeeper::SafeKeeperConf;
use safekeeper::{broker, WAL_SERVICE_RUNTIME};
use safekeeper::{control_file, BROKER_RUNTIME};
use safekeeper::{http, WAL_REMOVER_RUNTIME};
use safekeeper::{wal_backup, HTTP_RUNTIME};
use storage_broker::DEFAULT_ENDPOINT;
use utils::auth::{JwtAuth, Scope, SwappableJwtAuth};
@@ -441,14 +440,6 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
.map(|res| ("broker main".to_owned(), res));
tasks_handles.push(Box::pin(broker_task_handle));
let conf_ = conf.clone();
let wal_remover_handle = current_thread_rt
.as_ref()
.unwrap_or_else(|| WAL_REMOVER_RUNTIME.handle())
.spawn(remove_wal::task_main(conf_))
.map(|res| ("WAL remover".to_owned(), res));
tasks_handles.push(Box::pin(wal_remover_handle));
set_build_info_metric(GIT_VERSION, BUILD_TAG);
// TODO: update tokio-stream, convert to real async Stream with

View File

@@ -2,7 +2,7 @@
use anyhow::{bail, ensure, Context, Result};
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use camino::Utf8PathBuf;
use camino::{Utf8Path, Utf8PathBuf};
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use utils::crashsafe::durable_rename;
@@ -12,9 +12,9 @@ use std::ops::Deref;
use std::path::Path;
use std::time::Instant;
use crate::control_file_upgrade::upgrade_control_file;
use crate::metrics::PERSIST_CONTROL_FILE_SECONDS;
use crate::state::TimelinePersistentState;
use crate::{control_file_upgrade::upgrade_control_file, timeline::get_timeline_dir};
use utils::{bin_ser::LeSer, id::TenantTimelineId};
use crate::SafeKeeperConf;
@@ -43,7 +43,7 @@ pub trait Storage: Deref<Target = TimelinePersistentState> {
pub struct FileStorage {
// save timeline dir to avoid reconstructing it every time
timeline_dir: Utf8PathBuf,
conf: SafeKeeperConf,
no_sync: bool,
/// Last state persisted to disk.
state: TimelinePersistentState,
@@ -54,13 +54,12 @@ pub struct FileStorage {
impl FileStorage {
/// Initialize storage by loading state from disk.
pub fn restore_new(ttid: &TenantTimelineId, conf: &SafeKeeperConf) -> Result<FileStorage> {
let timeline_dir = conf.timeline_dir(ttid);
let state = Self::load_control_file_conf(conf, ttid)?;
let timeline_dir = get_timeline_dir(conf, ttid);
let state = Self::load_control_file_from_dir(&timeline_dir)?;
Ok(FileStorage {
timeline_dir,
conf: conf.clone(),
no_sync: conf.no_sync,
state,
last_persist_at: Instant::now(),
})
@@ -74,7 +73,7 @@ impl FileStorage {
) -> Result<FileStorage> {
let store = FileStorage {
timeline_dir,
conf: conf.clone(),
no_sync: conf.no_sync,
state,
last_persist_at: Instant::now(),
};
@@ -102,12 +101,9 @@ impl FileStorage {
upgrade_control_file(buf, version)
}
/// Load control file for given ttid at path specified by conf.
pub fn load_control_file_conf(
conf: &SafeKeeperConf,
ttid: &TenantTimelineId,
) -> Result<TimelinePersistentState> {
let path = conf.timeline_dir(ttid).join(CONTROL_FILE_NAME);
/// Load control file from given directory.
pub fn load_control_file_from_dir(timeline_dir: &Utf8Path) -> Result<TimelinePersistentState> {
let path = timeline_dir.join(CONTROL_FILE_NAME);
Self::load_control_file(path)
}
@@ -203,7 +199,7 @@ impl Storage for FileStorage {
})?;
let control_path = self.timeline_dir.join(CONTROL_FILE_NAME);
durable_rename(&control_partial_path, &control_path, !self.conf.no_sync).await?;
durable_rename(&control_partial_path, &control_path, !self.no_sync).await?;
// update internal state
self.state = s.clone();
@@ -233,12 +229,13 @@ mod test {
conf: &SafeKeeperConf,
ttid: &TenantTimelineId,
) -> Result<(FileStorage, TimelinePersistentState)> {
fs::create_dir_all(conf.timeline_dir(ttid))
let timeline_dir = get_timeline_dir(conf, ttid);
fs::create_dir_all(&timeline_dir)
.await
.expect("failed to create timeline dir");
Ok((
FileStorage::restore_new(ttid, conf)?,
FileStorage::load_control_file_conf(conf, ttid)?,
FileStorage::load_control_file_from_dir(&timeline_dir)?,
))
}
@@ -246,11 +243,11 @@ mod test {
conf: &SafeKeeperConf,
ttid: &TenantTimelineId,
) -> Result<(FileStorage, TimelinePersistentState)> {
fs::create_dir_all(conf.timeline_dir(ttid))
let timeline_dir = get_timeline_dir(conf, ttid);
fs::create_dir_all(&timeline_dir)
.await
.expect("failed to create timeline dir");
let state = TimelinePersistentState::empty();
let timeline_dir = conf.timeline_dir(ttid);
let storage = FileStorage::create_new(timeline_dir, conf, state.clone())?;
Ok((storage, state))
}
@@ -291,7 +288,7 @@ mod test {
.await
.expect("failed to persist state");
}
let control_path = conf.timeline_dir(&ttid).join(CONTROL_FILE_NAME);
let control_path = get_timeline_dir(&conf, &ttid).join(CONTROL_FILE_NAME);
let mut data = fs::read(&control_path).await.unwrap();
data[0] += 1; // change the first byte of the file to fail checksum validation
fs::write(&control_path, &data)

View File

@@ -15,10 +15,10 @@ use crate::{
control_file::{FileStorage, Storage},
pull_timeline::{create_temp_timeline_dir, load_temp_timeline, validate_temp_timeline},
state::TimelinePersistentState,
timeline::{Timeline, TimelineError},
timeline::{FullAccessTimeline, Timeline, TimelineError},
wal_backup::copy_s3_segments,
wal_storage::{wal_file_paths, WalReader},
GlobalTimelines, SafeKeeperConf,
GlobalTimelines,
};
// we don't want to have more than 10 segments on disk after copy, because they take space
@@ -46,12 +46,14 @@ pub async fn handle_request(request: Request) -> Result<()> {
}
}
let source_tli = request.source.full_access_guard().await?;
let conf = &GlobalTimelines::get_global_config();
let ttid = request.destination_ttid;
let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?;
let (mem_state, state) = request.source.get_state().await;
let (mem_state, state) = source_tli.get_state().await;
let start_lsn = state.timeline_start_lsn;
if start_lsn == Lsn::INVALID {
bail!("timeline is not initialized");
@@ -60,7 +62,7 @@ pub async fn handle_request(request: Request) -> Result<()> {
{
let commit_lsn = mem_state.commit_lsn;
let flush_lsn = request.source.get_flush_lsn().await;
let flush_lsn = source_tli.get_flush_lsn().await;
info!(
"collected info about source timeline: start_lsn={}, backup_lsn={}, commit_lsn={}, flush_lsn={}",
@@ -127,10 +129,8 @@ pub async fn handle_request(request: Request) -> Result<()> {
.await?;
copy_disk_segments(
conf,
&state,
&source_tli,
wal_seg_size,
&request.source.ttid,
new_backup_lsn,
request.until_lsn,
&tli_dir_path,
@@ -159,21 +159,13 @@ pub async fn handle_request(request: Request) -> Result<()> {
}
async fn copy_disk_segments(
conf: &SafeKeeperConf,
persisted_state: &TimelinePersistentState,
tli: &FullAccessTimeline,
wal_seg_size: usize,
source_ttid: &TenantTimelineId,
start_lsn: Lsn,
end_lsn: Lsn,
tli_dir_path: &Utf8PathBuf,
) -> Result<()> {
let mut wal_reader = WalReader::new(
conf.workdir.clone(),
conf.timeline_dir(source_ttid),
persisted_state,
start_lsn,
true,
)?;
let mut wal_reader = tli.get_walreader(start_lsn).await?;
let mut buf = [0u8; MAX_SEND_SIZE];

View File

@@ -10,6 +10,7 @@ use std::sync::Arc;
use anyhow::bail;
use anyhow::Result;
use camino::Utf8Path;
use camino::Utf8PathBuf;
use chrono::{DateTime, Utc};
use postgres_ffi::XLogSegNo;
use postgres_ffi::MAX_SEND_SIZE;
@@ -26,7 +27,8 @@ use crate::safekeeper::TermHistory;
use crate::send_wal::WalSenderState;
use crate::state::TimelineMemState;
use crate::state::TimelinePersistentState;
use crate::wal_storage::WalReader;
use crate::timeline::get_timeline_dir;
use crate::timeline::FullAccessTimeline;
use crate::GlobalTimelines;
use crate::SafeKeeperConf;
@@ -68,6 +70,7 @@ pub struct Response {
pub struct TimelineDumpSer {
pub tli: Arc<crate::timeline::Timeline>,
pub args: Args,
pub timeline_dir: Utf8PathBuf,
pub runtime: Arc<tokio::runtime::Runtime>,
}
@@ -85,14 +88,20 @@ impl Serialize for TimelineDumpSer {
where
S: serde::Serializer,
{
let dump = self
.runtime
.block_on(build_from_tli_dump(self.tli.clone(), self.args.clone()));
let dump = self.runtime.block_on(build_from_tli_dump(
&self.tli,
&self.args,
&self.timeline_dir,
));
dump.serialize(serializer)
}
}
async fn build_from_tli_dump(timeline: Arc<crate::timeline::Timeline>, args: Args) -> Timeline {
async fn build_from_tli_dump(
timeline: &Arc<crate::timeline::Timeline>,
args: &Args,
timeline_dir: &Utf8Path,
) -> Timeline {
let control_file = if args.dump_control_file {
let mut state = timeline.get_state().await.1;
if !args.dump_term_history {
@@ -112,7 +121,8 @@ async fn build_from_tli_dump(timeline: Arc<crate::timeline::Timeline>, args: Arg
let disk_content = if args.dump_disk_content {
// build_disk_content can fail, but we don't want to fail the whole
// request because of that.
build_disk_content(&timeline.timeline_dir).ok()
// Note: timeline can be in offloaded state, this is not a problem.
build_disk_content(timeline_dir).ok()
} else {
None
};
@@ -186,6 +196,7 @@ pub struct FileInfo {
pub async fn build(args: Args) -> Result<Response> {
let start_time = Utc::now();
let timelines_count = GlobalTimelines::timelines_count();
let config = GlobalTimelines::get_global_config();
let ptrs_snapshot = if args.tenant_id.is_some() && args.timeline_id.is_some() {
// If both tenant_id and timeline_id are specified, we can just get the
@@ -223,12 +234,11 @@ pub async fn build(args: Args) -> Result<Response> {
timelines.push(TimelineDumpSer {
tli,
args: args.clone(),
timeline_dir: get_timeline_dir(&config, &ttid),
runtime: runtime.clone(),
});
}
let config = GlobalTimelines::get_global_config();
Ok(Response {
start_time,
finish_time: Utc::now(),
@@ -316,27 +326,19 @@ pub struct TimelineDigest {
}
pub async fn calculate_digest(
tli: &Arc<crate::timeline::Timeline>,
tli: &FullAccessTimeline,
request: TimelineDigestRequest,
) -> Result<TimelineDigest> {
if request.from_lsn > request.until_lsn {
bail!("from_lsn is greater than until_lsn");
}
let conf = GlobalTimelines::get_global_config();
let (_, persisted_state) = tli.get_state().await;
if persisted_state.timeline_start_lsn > request.from_lsn {
bail!("requested LSN is before the start of the timeline");
}
let mut wal_reader = WalReader::new(
conf.workdir.clone(),
tli.timeline_dir.clone(),
&persisted_state,
request.from_lsn,
true,
)?;
let mut wal_reader = tli.get_walreader(request.from_lsn).await?;
let mut hasher = Sha256::new();
let mut buf = [0u8; MAX_SEND_SIZE];

View File

@@ -249,6 +249,10 @@ async fn timeline_digest_handler(request: Request<Body>) -> Result<Response<Body
};
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
let tli = tli
.full_access_guard()
.await
.map_err(ApiError::InternalServerError)?;
let response = debug_dump::calculate_digest(&tli, request)
.await
@@ -268,8 +272,12 @@ async fn timeline_files_handler(request: Request<Body>) -> Result<Response<Body>
let filename: String = parse_request_param(&request, "filename")?;
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
let tli = tli
.full_access_guard()
.await
.map_err(ApiError::InternalServerError)?;
let filepath = tli.timeline_dir.join(filename);
let filepath = tli.get_timeline_dir().join(filename);
let mut file = File::open(&filepath)
.await
.map_err(|e| ApiError::InternalServerError(e.into()))?;
@@ -287,7 +295,7 @@ async fn timeline_files_handler(request: Request<Body>) -> Result<Response<Body>
.map_err(|e| ApiError::InternalServerError(e.into()))
}
/// Force persist control file and remove old WAL.
/// Force persist control file.
async fn timeline_checkpoint_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?;
@@ -297,13 +305,13 @@ async fn timeline_checkpoint_handler(request: Request<Body>) -> Result<Response<
);
let tli = GlobalTimelines::get(ttid)?;
tli.maybe_persist_control_file(true)
tli.write_shared_state()
.await
.sk
.state
.flush()
.await
.map_err(ApiError::InternalServerError)?;
tli.remove_old_wal()
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, ())
}

View File

@@ -6,8 +6,6 @@
//! modifications in tests.
//!
use std::sync::Arc;
use anyhow::Context;
use bytes::Bytes;
use postgres_backend::QueryError;
@@ -23,7 +21,7 @@ use crate::safekeeper::{
};
use crate::safekeeper::{Term, TermHistory, TermLsn};
use crate::state::TimelinePersistentState;
use crate::timeline::Timeline;
use crate::timeline::FullAccessTimeline;
use crate::GlobalTimelines;
use postgres_backend::PostgresBackend;
use postgres_ffi::encode_logical_message;
@@ -104,8 +102,8 @@ pub async fn handle_json_ctrl<IO: AsyncRead + AsyncWrite + Unpin>(
async fn prepare_safekeeper(
ttid: TenantTimelineId,
pg_version: u32,
) -> anyhow::Result<Arc<Timeline>> {
GlobalTimelines::create(
) -> anyhow::Result<FullAccessTimeline> {
let tli = GlobalTimelines::create(
ttid,
ServerInfo {
pg_version,
@@ -115,10 +113,16 @@ async fn prepare_safekeeper(
Lsn::INVALID,
Lsn::INVALID,
)
.await
.await?;
tli.full_access_guard().await
}
async fn send_proposer_elected(tli: &Arc<Timeline>, term: Term, lsn: Lsn) -> anyhow::Result<()> {
async fn send_proposer_elected(
tli: &FullAccessTimeline,
term: Term,
lsn: Lsn,
) -> anyhow::Result<()> {
// add new term to existing history
let history = tli.get_state().await.1.acceptor_state.term_history;
let history = history.up_to(lsn.checked_sub(1u64).unwrap());
@@ -147,7 +151,7 @@ pub struct InsertedWAL {
/// Extend local WAL with new LogicalMessage record. To do that,
/// create AppendRequest with new WAL and pass it to safekeeper.
pub async fn append_logical_message(
tli: &Arc<Timeline>,
tli: &FullAccessTimeline,
msg: &AppendLogicalMessage,
) -> anyhow::Result<InsertedWAL> {
let wal_data = encode_logical_message(&msg.lm_prefix, &msg.lm_message);

View File

@@ -7,10 +7,7 @@ use tokio::runtime::Runtime;
use std::time::Duration;
use storage_broker::Uri;
use utils::{
auth::SwappableJwtAuth,
id::{NodeId, TenantId, TenantTimelineId},
};
use utils::{auth::SwappableJwtAuth, id::NodeId};
mod auth;
pub mod broker;
@@ -89,15 +86,6 @@ pub struct SafeKeeperConf {
}
impl SafeKeeperConf {
pub fn tenant_dir(&self, tenant_id: &TenantId) -> Utf8PathBuf {
self.workdir.join(tenant_id.to_string())
}
pub fn timeline_dir(&self, ttid: &TenantTimelineId) -> Utf8PathBuf {
self.tenant_dir(&ttid.tenant_id)
.join(ttid.timeline_id.to_string())
}
pub fn is_wal_backup_enabled(&self) -> bool {
self.remote_storage.is_some() && self.wal_backup_enabled
}

View File

@@ -17,7 +17,7 @@ use utils::{
use crate::{
control_file, debug_dump,
http::routes::TimelineStatus,
timeline::{Timeline, TimelineError},
timeline::{get_tenant_dir, get_timeline_dir, Timeline, TimelineError},
wal_storage::{self, Storage},
GlobalTimelines, SafeKeeperConf,
};
@@ -283,13 +283,13 @@ pub async fn load_temp_timeline(
}
// Move timeline dir to the correct location
let timeline_path = conf.timeline_dir(&ttid);
let timeline_path = get_timeline_dir(conf, &ttid);
info!(
"moving timeline {} from {} to {}",
ttid, tmp_path, timeline_path
);
tokio::fs::create_dir_all(conf.tenant_dir(&ttid.tenant_id)).await?;
tokio::fs::create_dir_all(get_tenant_dir(conf, &ttid.tenant_id)).await?;
tokio::fs::rename(tmp_path, &timeline_path).await?;
let tli = GlobalTimelines::load_timeline(&guard, ttid)

View File

@@ -6,7 +6,7 @@ use crate::handler::SafekeeperPostgresHandler;
use crate::safekeeper::AcceptorProposerMessage;
use crate::safekeeper::ProposerAcceptorMessage;
use crate::safekeeper::ServerInfo;
use crate::timeline::Timeline;
use crate::timeline::FullAccessTimeline;
use crate::wal_service::ConnectionId;
use crate::GlobalTimelines;
use anyhow::{anyhow, Context};
@@ -213,7 +213,7 @@ impl SafekeeperPostgresHandler {
&mut self,
pgb: &mut PostgresBackend<IO>,
) -> Result<(), QueryError> {
let mut tli: Option<Arc<Timeline>> = None;
let mut tli: Option<FullAccessTimeline> = None;
if let Err(end) = self.handle_start_wal_push_guts(pgb, &mut tli).await {
// Log the result and probably send it to the client, closing the stream.
let handle_end_fut = pgb.handle_copy_stream_end(end);
@@ -233,7 +233,7 @@ impl SafekeeperPostgresHandler {
pub async fn handle_start_wal_push_guts<IO: AsyncRead + AsyncWrite + Unpin>(
&mut self,
pgb: &mut PostgresBackend<IO>,
tli: &mut Option<Arc<Timeline>>,
tli: &mut Option<FullAccessTimeline>,
) -> Result<(), CopyStreamHandlerEnd> {
// Notify the libpq client that it's allowed to send `CopyData` messages
pgb.write_message(&BeMessage::CopyBothResponse).await?;
@@ -323,7 +323,7 @@ struct NetworkReader<'a, IO> {
impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> {
async fn read_first_message(
&mut self,
) -> Result<(Arc<Timeline>, ProposerAcceptorMessage), CopyStreamHandlerEnd> {
) -> Result<(FullAccessTimeline, ProposerAcceptorMessage), CopyStreamHandlerEnd> {
// Receive information about server to create timeline, if not yet.
let next_msg = read_message(self.pgb_reader).await?;
let tli = match next_msg {
@@ -337,7 +337,10 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> {
system_id: greeting.system_id,
wal_seg_size: greeting.wal_seg_size,
};
GlobalTimelines::create(self.ttid, server_info, Lsn::INVALID, Lsn::INVALID).await?
let tli =
GlobalTimelines::create(self.ttid, server_info, Lsn::INVALID, Lsn::INVALID)
.await?;
tli.full_access_guard().await?
}
_ => {
return Err(CopyStreamHandlerEnd::Other(anyhow::anyhow!(
@@ -353,7 +356,7 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> {
msg_tx: Sender<ProposerAcceptorMessage>,
msg_rx: Receiver<ProposerAcceptorMessage>,
reply_tx: Sender<AcceptorProposerMessage>,
tli: Arc<Timeline>,
tli: FullAccessTimeline,
next_msg: ProposerAcceptorMessage,
) -> Result<(), CopyStreamHandlerEnd> {
*self.acceptor_handle = Some(WalAcceptor::spawn(
@@ -448,7 +451,7 @@ const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1);
/// replies to reply_tx; reading from socket and writing to disk in parallel is
/// beneficial for performance, this struct provides writing to disk part.
pub struct WalAcceptor {
tli: Arc<Timeline>,
tli: FullAccessTimeline,
msg_rx: Receiver<ProposerAcceptorMessage>,
reply_tx: Sender<AcceptorProposerMessage>,
conn_id: Option<ConnectionId>,
@@ -461,7 +464,7 @@ impl WalAcceptor {
///
/// conn_id None means WalAcceptor is used by recovery initiated at this safekeeper.
pub fn spawn(
tli: Arc<Timeline>,
tli: FullAccessTimeline,
msg_rx: Receiver<ProposerAcceptorMessage>,
reply_tx: Sender<AcceptorProposerMessage>,
conn_id: Option<ConnectionId>,

View File

@@ -2,7 +2,7 @@
//! provide it, i.e. safekeeper lags too much.
use std::time::SystemTime;
use std::{fmt, pin::pin, sync::Arc};
use std::{fmt, pin::pin};
use anyhow::{bail, Context};
use futures::StreamExt;
@@ -21,6 +21,7 @@ use utils::{id::NodeId, lsn::Lsn, postgres_client::wal_stream_connection_config}
use crate::receive_wal::{WalAcceptor, REPLY_QUEUE_SIZE};
use crate::safekeeper::{AppendRequest, AppendRequestHeader};
use crate::timeline::FullAccessTimeline;
use crate::{
http::routes::TimelineStatus,
receive_wal::MSG_QUEUE_SIZE,
@@ -28,14 +29,14 @@ use crate::{
AcceptorProposerMessage, ProposerAcceptorMessage, ProposerElected, Term, TermHistory,
TermLsn, VoteRequest,
},
timeline::{PeerInfo, Timeline},
timeline::PeerInfo,
SafeKeeperConf,
};
/// Entrypoint for per timeline task which always runs, checking whether
/// recovery for this safekeeper is needed and starting it if so.
#[instrument(name = "recovery task", skip_all, fields(ttid = %tli.ttid))]
pub async fn recovery_main(tli: Arc<Timeline>, conf: SafeKeeperConf) {
pub async fn recovery_main(tli: FullAccessTimeline, conf: SafeKeeperConf) {
info!("started");
let cancel = tli.cancel.clone();
@@ -47,6 +48,87 @@ pub async fn recovery_main(tli: Arc<Timeline>, conf: SafeKeeperConf) {
}
}
/// Should we start fetching WAL from a peer safekeeper, and if yes, from
/// which? Answer is yes, i.e. .donors is not empty if 1) there is something
/// to fetch, and we can do that without running elections; 2) there is no
/// actively streaming compute, as we don't want to compete with it.
///
/// If donor(s) are choosen, theirs last_log_term is guaranteed to be equal
/// to its last_log_term so we are sure such a leader ever had been elected.
///
/// All possible donors are returned so that we could keep connection to the
/// current one if it is good even if it slightly lags behind.
///
/// Note that term conditions above might be not met, but safekeepers are
/// still not aligned on last flush_lsn. Generally in this case until
/// elections are run it is not possible to say which safekeeper should
/// recover from which one -- history which would be committed is different
/// depending on assembled quorum (e.g. classic picture 8 from Raft paper).
/// Thus we don't try to predict it here.
async fn recovery_needed(
tli: &FullAccessTimeline,
heartbeat_timeout: Duration,
) -> RecoveryNeededInfo {
let ss = tli.read_shared_state().await;
let term = ss.sk.state.acceptor_state.term;
let last_log_term = ss.sk.get_last_log_term();
let flush_lsn = ss.sk.flush_lsn();
// note that peers contain myself, but that's ok -- we are interested only in peers which are strictly ahead of us.
let mut peers = ss.get_peers(heartbeat_timeout);
// Sort by <last log term, lsn> pairs.
peers.sort_by(|p1, p2| {
let tl1 = TermLsn {
term: p1.last_log_term,
lsn: p1.flush_lsn,
};
let tl2 = TermLsn {
term: p2.last_log_term,
lsn: p2.flush_lsn,
};
tl2.cmp(&tl1) // desc
});
let num_streaming_computes = tli.get_walreceivers().get_num_streaming();
let donors = if num_streaming_computes > 0 {
vec![] // If there is a streaming compute, don't try to recover to not intervene.
} else {
peers
.iter()
.filter_map(|candidate| {
// Are we interested in this candidate?
let candidate_tl = TermLsn {
term: candidate.last_log_term,
lsn: candidate.flush_lsn,
};
let my_tl = TermLsn {
term: last_log_term,
lsn: flush_lsn,
};
if my_tl < candidate_tl {
// Yes, we are interested. Can we pull from it without
// (re)running elections? It is possible if 1) his term
// is equal to his last_log_term so we could act on
// behalf of leader of this term (we must be sure he was
// ever elected) and 2) our term is not higher, or we'll refuse data.
if candidate.term == candidate.last_log_term && candidate.term >= term {
Some(Donor::from(candidate))
} else {
None
}
} else {
None
}
})
.collect()
};
RecoveryNeededInfo {
term,
last_log_term,
flush_lsn,
peers,
num_streaming_computes,
donors,
}
}
/// Result of Timeline::recovery_needed, contains donor(s) if recovery needed and
/// fields to explain the choice.
#[derive(Debug)]
@@ -113,10 +195,10 @@ impl From<&PeerInfo> for Donor {
const CHECK_INTERVAL_MS: u64 = 2000;
/// Check regularly whether we need to start recovery.
async fn recovery_main_loop(tli: Arc<Timeline>, conf: SafeKeeperConf) {
async fn recovery_main_loop(tli: FullAccessTimeline, conf: SafeKeeperConf) {
let check_duration = Duration::from_millis(CHECK_INTERVAL_MS);
loop {
let recovery_needed_info = tli.recovery_needed(conf.heartbeat_timeout).await;
let recovery_needed_info = recovery_needed(&tli, conf.heartbeat_timeout).await;
match recovery_needed_info.donors.first() {
Some(donor) => {
info!(
@@ -146,7 +228,7 @@ async fn recovery_main_loop(tli: Arc<Timeline>, conf: SafeKeeperConf) {
/// Recover from the specified donor. Returns message explaining normal finish
/// reason or error.
async fn recover(
tli: Arc<Timeline>,
tli: FullAccessTimeline,
donor: &Donor,
conf: &SafeKeeperConf,
) -> anyhow::Result<String> {
@@ -232,7 +314,7 @@ async fn recover(
// Pull WAL from donor, assuming handshake is already done.
async fn recovery_stream(
tli: Arc<Timeline>,
tli: FullAccessTimeline,
donor: &Donor,
start_streaming_at: Lsn,
conf: &SafeKeeperConf,
@@ -316,7 +398,7 @@ async fn network_io(
physical_stream: ReplicationStream,
msg_tx: Sender<ProposerAcceptorMessage>,
donor: Donor,
tli: Arc<Timeline>,
tli: FullAccessTimeline,
conf: SafeKeeperConf,
) -> anyhow::Result<Option<String>> {
let mut physical_stream = pin!(physical_stream);
@@ -365,7 +447,7 @@ async fn network_io(
}
ReplicationMessage::PrimaryKeepAlive(_) => {
// keepalive means nothing is being streamed for a while. Check whether we need to stop.
let recovery_needed_info = tli.recovery_needed(conf.heartbeat_timeout).await;
let recovery_needed_info = recovery_needed(&tli, conf.heartbeat_timeout).await;
// do current donors still contain one we currently connected to?
if !recovery_needed_info
.donors

View File

@@ -1,41 +1,25 @@
//! Thread removing old WAL.
use utils::lsn::Lsn;
use std::time::Duration;
use crate::timeline_manager::StateSnapshot;
use tokio::time::sleep;
use tracing::*;
/// Get oldest LSN we still need to keep. We hold WAL till it is consumed
/// by all of 1) pageserver (remote_consistent_lsn) 2) peers 3) s3
/// offloading.
/// While it is safe to use inmem values for determining horizon,
/// we use persistent to make possible normal states less surprising.
/// All segments covering LSNs before horizon_lsn can be removed.
pub fn calc_horizon_lsn(state: &StateSnapshot, extra_horizon_lsn: Option<Lsn>) -> Lsn {
use std::cmp::min;
use crate::{GlobalTimelines, SafeKeeperConf};
pub async fn task_main(_conf: SafeKeeperConf) -> anyhow::Result<()> {
let wal_removal_interval = Duration::from_millis(5000);
loop {
let now = tokio::time::Instant::now();
let tlis = GlobalTimelines::get_all();
for tli in &tlis {
let ttid = tli.ttid;
async {
if let Err(e) = tli.maybe_persist_control_file(false).await {
warn!("failed to persist control file: {e}");
}
if let Err(e) = tli.remove_old_wal().await {
error!("failed to remove WAL: {}", e);
}
}
.instrument(info_span!("WAL removal", ttid = %ttid))
.await;
}
let elapsed = now.elapsed();
let total_timelines = tlis.len();
if elapsed > wal_removal_interval {
info!(
"WAL removal is too long, processed {} timelines in {:?}",
total_timelines, elapsed
);
}
sleep(wal_removal_interval).await;
let mut horizon_lsn = min(
state.cfile_remote_consistent_lsn,
state.cfile_peer_horizon_lsn,
);
// we don't want to remove WAL that is not yet offloaded to s3
horizon_lsn = min(horizon_lsn, state.cfile_backup_lsn);
if let Some(extra_horizon_lsn) = extra_horizon_lsn {
horizon_lsn = min(horizon_lsn, extra_horizon_lsn);
}
horizon_lsn
}

View File

@@ -10,7 +10,6 @@ use std::cmp::max;
use std::cmp::min;
use std::fmt;
use std::io::Read;
use std::time::Duration;
use storage_broker::proto::SafekeeperTimelineInfo;
use tracing::*;
@@ -828,24 +827,6 @@ where
Ok(())
}
/// Persist control file if there is something to save and enough time
/// passed after the last save.
pub async fn maybe_persist_inmem_control_file(&mut self, force: bool) -> Result<bool> {
const CF_SAVE_INTERVAL: Duration = Duration::from_secs(300);
if !force && self.state.pers.last_persist_at().elapsed() < CF_SAVE_INTERVAL {
return Ok(false);
}
let need_persist = self.state.inmem.commit_lsn > self.state.commit_lsn
|| self.state.inmem.backup_lsn > self.state.backup_lsn
|| self.state.inmem.peer_horizon_lsn > self.state.peer_horizon_lsn
|| self.state.inmem.remote_consistent_lsn > self.state.remote_consistent_lsn;
if need_persist {
self.state.flush().await?;
trace!("saved control file: {CF_SAVE_INTERVAL:?} passed");
}
Ok(need_persist)
}
/// Handle request to append WAL.
#[allow(clippy::comparison_chain)]
async fn handle_append_request(

View File

@@ -5,7 +5,7 @@ use crate::handler::SafekeeperPostgresHandler;
use crate::metrics::RECEIVED_PS_FEEDBACKS;
use crate::receive_wal::WalReceivers;
use crate::safekeeper::{Term, TermLsn};
use crate::timeline::Timeline;
use crate::timeline::FullAccessTimeline;
use crate::wal_service::ConnectionId;
use crate::wal_storage::WalReader;
use crate::GlobalTimelines;
@@ -387,8 +387,10 @@ impl SafekeeperPostgresHandler {
term: Option<Term>,
) -> Result<(), QueryError> {
let tli = GlobalTimelines::get(self.ttid).map_err(|e| QueryError::Other(e.into()))?;
let full_access = tli.full_access_guard().await?;
if let Err(end) = self
.handle_start_replication_guts(pgb, start_pos, term, tli.clone())
.handle_start_replication_guts(pgb, start_pos, term, full_access)
.await
{
let info = tli.get_safekeeper_info(&self.conf).await;
@@ -405,7 +407,7 @@ impl SafekeeperPostgresHandler {
pgb: &mut PostgresBackend<IO>,
start_pos: Lsn,
term: Option<Term>,
tli: Arc<Timeline>,
tli: FullAccessTimeline,
) -> Result<(), CopyStreamHandlerEnd> {
let appname = self.appname.clone();
@@ -448,14 +450,7 @@ impl SafekeeperPostgresHandler {
// switch to copy
pgb.write_message(&BeMessage::CopyBothResponse).await?;
let (_, persisted_state) = tli.get_state().await;
let wal_reader = WalReader::new(
self.conf.workdir.clone(),
self.conf.timeline_dir(&tli.ttid),
&persisted_state,
start_pos,
self.conf.is_wal_backup_enabled(),
)?;
let wal_reader = tli.get_walreader(start_pos).await?;
// Split to concurrently receive and send data; replies are generally
// not synchronized with sends, so this avoids deadlocks.
@@ -532,7 +527,7 @@ impl EndWatch {
/// A half driving sending WAL.
struct WalSender<'a, IO> {
pgb: &'a mut PostgresBackend<IO>,
tli: Arc<Timeline>,
tli: FullAccessTimeline,
appname: Option<String>,
// Position since which we are sending next chunk.
start_pos: Lsn,
@@ -741,7 +736,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
struct ReplyReader<IO> {
reader: PostgresBackendReader<IO>,
ws_guard: Arc<WalSenderGuard>,
tli: Arc<Timeline>,
tli: FullAccessTimeline,
}
impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {

View File

@@ -3,14 +3,14 @@
use anyhow::{anyhow, bail, Result};
use camino::Utf8PathBuf;
use postgres_ffi::XLogSegNo;
use serde::{Deserialize, Serialize};
use tokio::fs;
use tokio_util::sync::CancellationToken;
use utils::id::TenantId;
use std::cmp::max;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
@@ -26,7 +26,6 @@ use storage_broker::proto::SafekeeperTimelineInfo;
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
use crate::receive_wal::WalReceivers;
use crate::recovery::{recovery_main, Donor, RecoveryNeededInfo};
use crate::safekeeper::{
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, ServerInfo, Term, TermLsn,
INVALID_TERM,
@@ -38,8 +37,8 @@ use crate::wal_backup::{self};
use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};
use crate::metrics::FullTimelineInfo;
use crate::wal_storage::Storage as wal_storage_iface;
use crate::{debug_dump, timeline_manager, wal_backup_partial, wal_storage};
use crate::wal_storage::{Storage as wal_storage_iface, WalReader};
use crate::{debug_dump, timeline_manager, wal_storage};
use crate::{GlobalTimelines, SafeKeeperConf};
/// Things safekeeper should know about timeline state on peers.
@@ -169,7 +168,6 @@ pub struct SharedState {
pub(crate) sk: SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>,
/// In memory list containing state of peers sent in latest messages from them.
pub(crate) peers_info: PeersInfo,
pub(crate) last_removed_segno: XLogSegNo,
}
impl SharedState {
@@ -197,33 +195,33 @@ impl SharedState {
// We don't want to write anything to disk, because we may have existing timeline there.
// These functions should not change anything on disk.
let timeline_dir = conf.timeline_dir(ttid);
let control_store = control_file::FileStorage::create_new(timeline_dir, conf, state)?;
let timeline_dir = get_timeline_dir(conf, ttid);
let control_store =
control_file::FileStorage::create_new(timeline_dir.clone(), conf, state)?;
let wal_store =
wal_storage::PhysicalStorage::new(ttid, conf.timeline_dir(ttid), conf, &control_store)?;
wal_storage::PhysicalStorage::new(ttid, timeline_dir, conf, &control_store)?;
let sk = SafeKeeper::new(control_store, wal_store, conf.my_id)?;
Ok(Self {
sk,
peers_info: PeersInfo(vec![]),
last_removed_segno: 0,
})
}
/// Restore SharedState from control file. If file doesn't exist, bails out.
fn restore(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Result<Self> {
let timeline_dir = get_timeline_dir(conf, ttid);
let control_store = control_file::FileStorage::restore_new(ttid, conf)?;
if control_store.server.wal_seg_size == 0 {
bail!(TimelineError::UninitializedWalSegSize(*ttid));
}
let wal_store =
wal_storage::PhysicalStorage::new(ttid, conf.timeline_dir(ttid), conf, &control_store)?;
wal_storage::PhysicalStorage::new(ttid, timeline_dir, conf, &control_store)?;
Ok(Self {
sk: SafeKeeper::new(control_store, wal_store, conf.my_id)?,
peers_info: PeersInfo(vec![]),
last_removed_segno: 0,
})
}
@@ -275,24 +273,6 @@ impl SharedState {
.cloned()
.collect()
}
/// Get oldest segno we still need to keep. We hold WAL till it is consumed
/// by all of 1) pageserver (remote_consistent_lsn) 2) peers 3) s3
/// offloading.
/// While it is safe to use inmem values for determining horizon,
/// we use persistent to make possible normal states less surprising.
fn get_horizon_segno(&self, extra_horizon_lsn: Option<Lsn>) -> XLogSegNo {
let state = &self.sk.state;
use std::cmp::min;
let mut horizon_lsn = min(state.remote_consistent_lsn, state.peer_horizon_lsn);
// we don't want to remove WAL that is not yet offloaded to s3
horizon_lsn = min(horizon_lsn, state.backup_lsn);
if let Some(extra_horizon_lsn) = extra_horizon_lsn {
horizon_lsn = min(horizon_lsn, extra_horizon_lsn);
}
horizon_lsn.segment_number(state.server.wal_seg_size as usize)
}
}
#[derive(Debug, thiserror::Error)]
@@ -349,22 +329,15 @@ pub struct Timeline {
mutex: RwLock<SharedState>,
walsenders: Arc<WalSenders>,
walreceivers: Arc<WalReceivers>,
timeline_dir: Utf8PathBuf,
/// Delete/cancel will trigger this, background tasks should drop out as soon as it fires
pub(crate) cancel: CancellationToken,
/// Directory where timeline state is stored.
pub timeline_dir: Utf8PathBuf,
/// Should we keep WAL on disk for active replication connections.
/// Especially useful for sharding, when different shards process WAL
/// with different speed.
// TODO: add `Arc<SafeKeeperConf>` here instead of adding each field separately.
walsenders_keep_horizon: bool,
// timeline_manager controlled state
pub(crate) broker_active: AtomicBool,
pub(crate) wal_backup_active: AtomicBool,
pub(crate) last_removed_segno: AtomicU64,
}
impl Timeline {
@@ -394,10 +367,10 @@ impl Timeline {
walsenders: WalSenders::new(walreceivers.clone()),
walreceivers,
cancel: CancellationToken::default(),
timeline_dir: conf.timeline_dir(&ttid),
walsenders_keep_horizon: conf.walsenders_keep_horizon,
timeline_dir: get_timeline_dir(conf, &ttid),
broker_active: AtomicBool::new(false),
wal_backup_active: AtomicBool::new(false),
last_removed_segno: AtomicU64::new(0),
})
}
@@ -430,10 +403,10 @@ impl Timeline {
walsenders: WalSenders::new(walreceivers.clone()),
walreceivers,
cancel: CancellationToken::default(),
timeline_dir: conf.timeline_dir(&ttid),
walsenders_keep_horizon: conf.walsenders_keep_horizon,
timeline_dir: get_timeline_dir(conf, &ttid),
broker_active: AtomicBool::new(false),
wal_backup_active: AtomicBool::new(false),
last_removed_segno: AtomicU64::new(0),
})
}
@@ -494,15 +467,6 @@ impl Timeline {
conf.clone(),
broker_active_set,
));
// Start recovery task which always runs on the timeline.
if conf.peer_recovery_enabled {
tokio::spawn(recovery_main(self.clone(), conf.clone()));
}
// TODO: migrate to timeline_manager
if conf.is_wal_backup_enabled() && conf.partial_backup_enabled {
tokio::spawn(wal_backup_partial::main_task(self.clone(), conf.clone()));
}
}
/// Delete timeline from disk completely, by removing timeline directory.
@@ -555,36 +519,6 @@ impl Timeline {
self.mutex.read().await
}
/// Returns true if walsender should stop sending WAL to pageserver. We
/// terminate it if remote_consistent_lsn reached commit_lsn and there is no
/// computes. While there might be nothing to stream already, we learn about
/// remote_consistent_lsn update through replication feedback, and we want
/// to stop pushing to the broker if pageserver is fully caughtup.
pub async fn should_walsender_stop(&self, reported_remote_consistent_lsn: Lsn) -> bool {
if self.is_cancelled() {
return true;
}
let shared_state = self.read_shared_state().await;
if self.walreceivers.get_num() == 0 {
return shared_state.sk.state.inmem.commit_lsn == Lsn(0) || // no data at all yet
reported_remote_consistent_lsn >= shared_state.sk.state.inmem.commit_lsn;
}
false
}
/// Ensure that current term is t, erroring otherwise, and lock the state.
pub async fn acquire_term(&self, t: Term) -> Result<ReadGuardSharedState> {
let ss = self.read_shared_state().await;
if ss.sk.state.acceptor_state.term != t {
bail!(
"failed to acquire term {}, current term {}",
t,
ss.sk.state.acceptor_state.term
);
}
Ok(ss)
}
/// Returns commit_lsn watch channel.
pub fn get_commit_lsn_watch_rx(&self) -> watch::Receiver<Lsn> {
self.commit_lsn_watch_rx.clone()
@@ -600,28 +534,6 @@ impl Timeline {
self.shared_state_version_rx.clone()
}
/// Pass arrived message to the safekeeper.
pub async fn process_msg(
self: &Arc<Self>,
msg: &ProposerAcceptorMessage,
) -> Result<Option<AcceptorProposerMessage>> {
if self.is_cancelled() {
bail!(TimelineError::Cancelled(self.ttid));
}
let mut rmsg: Option<AcceptorProposerMessage>;
{
let mut shared_state = self.write_shared_state().await;
rmsg = shared_state.sk.process_msg(msg).await?;
// if this is AppendResponse, fill in proper hot standby feedback.
if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg {
resp.hs_feedback = self.walsenders.get_hotstandby().hs_feedback;
}
}
Ok(rmsg)
}
/// Returns wal_seg_size.
pub async fn get_wal_seg_size(&self) -> usize {
self.read_shared_state().await.get_wal_seg_size()
@@ -672,97 +584,11 @@ impl Timeline {
Ok(())
}
/// Update in memory remote consistent lsn.
pub async fn update_remote_consistent_lsn(self: &Arc<Self>, candidate: Lsn) {
let mut shared_state = self.write_shared_state().await;
shared_state.sk.state.inmem.remote_consistent_lsn =
max(shared_state.sk.state.inmem.remote_consistent_lsn, candidate);
}
pub async fn get_peers(&self, conf: &SafeKeeperConf) -> Vec<PeerInfo> {
let shared_state = self.read_shared_state().await;
shared_state.get_peers(conf.heartbeat_timeout)
}
/// Should we start fetching WAL from a peer safekeeper, and if yes, from
/// which? Answer is yes, i.e. .donors is not empty if 1) there is something
/// to fetch, and we can do that without running elections; 2) there is no
/// actively streaming compute, as we don't want to compete with it.
///
/// If donor(s) are choosen, theirs last_log_term is guaranteed to be equal
/// to its last_log_term so we are sure such a leader ever had been elected.
///
/// All possible donors are returned so that we could keep connection to the
/// current one if it is good even if it slightly lags behind.
///
/// Note that term conditions above might be not met, but safekeepers are
/// still not aligned on last flush_lsn. Generally in this case until
/// elections are run it is not possible to say which safekeeper should
/// recover from which one -- history which would be committed is different
/// depending on assembled quorum (e.g. classic picture 8 from Raft paper).
/// Thus we don't try to predict it here.
pub async fn recovery_needed(&self, heartbeat_timeout: Duration) -> RecoveryNeededInfo {
let ss = self.read_shared_state().await;
let term = ss.sk.state.acceptor_state.term;
let last_log_term = ss.sk.get_last_log_term();
let flush_lsn = ss.sk.flush_lsn();
// note that peers contain myself, but that's ok -- we are interested only in peers which are strictly ahead of us.
let mut peers = ss.get_peers(heartbeat_timeout);
// Sort by <last log term, lsn> pairs.
peers.sort_by(|p1, p2| {
let tl1 = TermLsn {
term: p1.last_log_term,
lsn: p1.flush_lsn,
};
let tl2 = TermLsn {
term: p2.last_log_term,
lsn: p2.flush_lsn,
};
tl2.cmp(&tl1) // desc
});
let num_streaming_computes = self.walreceivers.get_num_streaming();
let donors = if num_streaming_computes > 0 {
vec![] // If there is a streaming compute, don't try to recover to not intervene.
} else {
peers
.iter()
.filter_map(|candidate| {
// Are we interested in this candidate?
let candidate_tl = TermLsn {
term: candidate.last_log_term,
lsn: candidate.flush_lsn,
};
let my_tl = TermLsn {
term: last_log_term,
lsn: flush_lsn,
};
if my_tl < candidate_tl {
// Yes, we are interested. Can we pull from it without
// (re)running elections? It is possible if 1) his term
// is equal to his last_log_term so we could act on
// behalf of leader of this term (we must be sure he was
// ever elected) and 2) our term is not higher, or we'll refuse data.
if candidate.term == candidate.last_log_term && candidate.term >= term {
Some(Donor::from(candidate))
} else {
None
}
} else {
None
}
})
.collect()
};
RecoveryNeededInfo {
term,
last_log_term,
flush_lsn,
peers,
num_streaming_computes,
donors,
}
}
pub fn get_walsenders(&self) -> &Arc<WalSenders> {
&self.walsenders
}
@@ -776,58 +602,6 @@ impl Timeline {
self.read_shared_state().await.sk.wal_store.flush_lsn()
}
/// Delete WAL segments from disk that are no longer needed. This is determined
/// based on pageserver's remote_consistent_lsn and local backup_lsn/peer_lsn.
pub async fn remove_old_wal(self: &Arc<Self>) -> Result<()> {
if self.is_cancelled() {
bail!(TimelineError::Cancelled(self.ttid));
}
// If enabled, we use LSN of the most lagging walsender as a WAL removal horizon.
// This allows to get better read speed for pageservers that are lagging behind,
// at the cost of keeping more WAL on disk.
let replication_horizon_lsn = if self.walsenders_keep_horizon {
self.walsenders.laggard_lsn()
} else {
None
};
let horizon_segno: XLogSegNo;
let remover = {
let shared_state = self.read_shared_state().await;
horizon_segno = shared_state.get_horizon_segno(replication_horizon_lsn);
if horizon_segno <= 1 || horizon_segno <= shared_state.last_removed_segno {
return Ok(()); // nothing to do
}
// release the lock before removing
shared_state.sk.wal_store.remove_up_to(horizon_segno - 1)
};
// delete old WAL files
remover.await?;
// update last_removed_segno
let mut shared_state = self.write_shared_state().await;
if shared_state.last_removed_segno != horizon_segno {
shared_state.last_removed_segno = horizon_segno;
} else {
shared_state.skip_update = true;
}
Ok(())
}
/// Persist control file if there is something to save and enough time
/// passed after the last save. This helps to keep remote_consistent_lsn up
/// to date so that storage nodes restart doesn't cause many pageserver ->
/// safekeeper reconnections.
pub async fn maybe_persist_control_file(self: &Arc<Self>, force: bool) -> Result<()> {
let mut guard = self.write_shared_state().await;
let changed = guard.sk.maybe_persist_inmem_control_file(force).await?;
guard.skip_update = !changed;
Ok(())
}
/// Gather timeline data for metrics.
pub async fn info_for_metrics(&self) -> Option<FullTimelineInfo> {
if self.is_cancelled() {
@@ -843,7 +617,7 @@ impl Timeline {
wal_backup_active: self.wal_backup_active.load(Ordering::Relaxed),
timeline_is_active: self.broker_active.load(Ordering::Relaxed),
num_computes: self.walreceivers.get_num() as u32,
last_removed_segno: state.last_removed_segno,
last_removed_segno: self.last_removed_segno.load(Ordering::Relaxed),
epoch_start_lsn: state.sk.term_start_lsn,
mem_state: state.sk.state.inmem.clone(),
persisted_state: state.sk.state.clone(),
@@ -866,7 +640,7 @@ impl Timeline {
wal_backup_active: self.wal_backup_active.load(Ordering::Relaxed),
active: self.broker_active.load(Ordering::Relaxed),
num_computes: self.walreceivers.get_num() as u32,
last_removed_segno: state.last_removed_segno,
last_removed_segno: self.last_removed_segno.load(Ordering::Relaxed),
epoch_start_lsn: state.sk.term_start_lsn,
mem_state: state.sk.state.inmem.clone(),
write_lsn,
@@ -889,6 +663,110 @@ impl Timeline {
state.sk.state.finish_change(&persistent_state).await?;
Ok(res)
}
/// Get the timeline guard for reading/writing WAL files.
/// TODO: if WAL files are not present on disk (evicted), they will be
/// downloaded from S3. Also there will logic for preventing eviction
/// while someone is holding FullAccessTimeline guard.
pub async fn full_access_guard(self: &Arc<Self>) -> Result<FullAccessTimeline> {
if self.is_cancelled() {
bail!(TimelineError::Cancelled(self.ttid));
}
Ok(FullAccessTimeline { tli: self.clone() })
}
}
/// This is a guard that allows to read/write disk timeline state.
/// All tasks that are using the disk should use this guard.
#[derive(Clone)]
pub struct FullAccessTimeline {
pub tli: Arc<Timeline>,
}
impl Deref for FullAccessTimeline {
type Target = Arc<Timeline>;
fn deref(&self) -> &Self::Target {
&self.tli
}
}
impl FullAccessTimeline {
/// Returns true if walsender should stop sending WAL to pageserver. We
/// terminate it if remote_consistent_lsn reached commit_lsn and there is no
/// computes. While there might be nothing to stream already, we learn about
/// remote_consistent_lsn update through replication feedback, and we want
/// to stop pushing to the broker if pageserver is fully caughtup.
pub async fn should_walsender_stop(&self, reported_remote_consistent_lsn: Lsn) -> bool {
if self.is_cancelled() {
return true;
}
let shared_state = self.read_shared_state().await;
if self.walreceivers.get_num() == 0 {
return shared_state.sk.state.inmem.commit_lsn == Lsn(0) || // no data at all yet
reported_remote_consistent_lsn >= shared_state.sk.state.inmem.commit_lsn;
}
false
}
/// Ensure that current term is t, erroring otherwise, and lock the state.
pub async fn acquire_term(&self, t: Term) -> Result<ReadGuardSharedState> {
let ss = self.read_shared_state().await;
if ss.sk.state.acceptor_state.term != t {
bail!(
"failed to acquire term {}, current term {}",
t,
ss.sk.state.acceptor_state.term
);
}
Ok(ss)
}
/// Pass arrived message to the safekeeper.
pub async fn process_msg(
&self,
msg: &ProposerAcceptorMessage,
) -> Result<Option<AcceptorProposerMessage>> {
if self.is_cancelled() {
bail!(TimelineError::Cancelled(self.ttid));
}
let mut rmsg: Option<AcceptorProposerMessage>;
{
let mut shared_state = self.write_shared_state().await;
rmsg = shared_state.sk.process_msg(msg).await?;
// if this is AppendResponse, fill in proper hot standby feedback.
if let Some(AcceptorProposerMessage::AppendResponse(ref mut resp)) = rmsg {
resp.hs_feedback = self.walsenders.get_hotstandby().hs_feedback;
}
}
Ok(rmsg)
}
pub async fn get_walreader(&self, start_lsn: Lsn) -> Result<WalReader> {
let (_, persisted_state) = self.get_state().await;
let enable_remote_read = GlobalTimelines::get_global_config().is_wal_backup_enabled();
WalReader::new(
&self.ttid,
self.timeline_dir.clone(),
&persisted_state,
start_lsn,
enable_remote_read,
)
}
pub fn get_timeline_dir(&self) -> Utf8PathBuf {
self.timeline_dir.clone()
}
/// Update in memory remote consistent lsn.
pub async fn update_remote_consistent_lsn(&self, candidate: Lsn) {
let mut shared_state = self.write_shared_state().await;
shared_state.sk.state.inmem.remote_consistent_lsn =
max(shared_state.sk.state.inmem.remote_consistent_lsn, candidate);
}
}
/// Deletes directory and it's contents. Returns false if directory does not exist.
@@ -899,3 +777,16 @@ async fn delete_dir(path: &Utf8PathBuf) -> Result<bool> {
Err(e) => Err(e.into()),
}
}
/// Get a path to the tenant directory. If you just need to get a timeline directory,
/// use FullAccessTimeline::get_timeline_dir instead.
pub(crate) fn get_tenant_dir(conf: &SafeKeeperConf, tenant_id: &TenantId) -> Utf8PathBuf {
conf.workdir.join(tenant_id.to_string())
}
/// Get a path to the timeline directory. If you need to read WAL files from disk,
/// use FullAccessTimeline::get_timeline_dir instead. This function does not check
/// timeline eviction status and WAL files might not be present on disk.
pub(crate) fn get_timeline_dir(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Utf8PathBuf {
get_tenant_dir(conf, &ttid.tenant_id).join(ttid.timeline_id.to_string())
}

View File

@@ -3,23 +3,42 @@
//! It watches for changes in the timeline state and decides when to spawn or kill background tasks.
//! It also can manage some reactive state, like should the timeline be active for broker pushes or not.
use std::{sync::Arc, time::Duration};
use std::{
sync::Arc,
time::{Duration, Instant},
};
use postgres_ffi::XLogSegNo;
use tokio::task::{JoinError, JoinHandle};
use tracing::{info, instrument, warn};
use utils::lsn::Lsn;
use crate::{
control_file::Storage,
metrics::{MANAGER_ACTIVE_CHANGES, MANAGER_ITERATIONS_TOTAL},
recovery::recovery_main,
remove_wal::calc_horizon_lsn,
send_wal::WalSenders,
timeline::{PeerInfo, ReadGuardSharedState, Timeline},
timelines_set::TimelinesSet,
timelines_set::{TimelineSetGuard, TimelinesSet},
wal_backup::{self, WalBackupTaskHandle},
SafeKeeperConf,
wal_backup_partial, SafeKeeperConf,
};
pub struct StateSnapshot {
// inmem values
pub commit_lsn: Lsn,
pub backup_lsn: Lsn,
pub remote_consistent_lsn: Lsn,
// persistent control file values
pub cfile_peer_horizon_lsn: Lsn,
pub cfile_remote_consistent_lsn: Lsn,
pub cfile_backup_lsn: Lsn,
// misc
pub cfile_last_persist_at: Instant,
pub inmem_flush_pending: bool,
pub peers: Vec<PeerInfo>,
}
@@ -30,17 +49,34 @@ impl StateSnapshot {
commit_lsn: read_guard.sk.state.inmem.commit_lsn,
backup_lsn: read_guard.sk.state.inmem.backup_lsn,
remote_consistent_lsn: read_guard.sk.state.inmem.remote_consistent_lsn,
cfile_peer_horizon_lsn: read_guard.sk.state.peer_horizon_lsn,
cfile_remote_consistent_lsn: read_guard.sk.state.remote_consistent_lsn,
cfile_backup_lsn: read_guard.sk.state.backup_lsn,
cfile_last_persist_at: read_guard.sk.state.pers.last_persist_at(),
inmem_flush_pending: Self::has_unflushed_inmem_state(&read_guard),
peers: read_guard.get_peers(heartbeat_timeout),
}
}
fn has_unflushed_inmem_state(read_guard: &ReadGuardSharedState) -> bool {
let state = &read_guard.sk.state;
state.inmem.commit_lsn > state.commit_lsn
|| state.inmem.backup_lsn > state.backup_lsn
|| state.inmem.peer_horizon_lsn > state.peer_horizon_lsn
|| state.inmem.remote_consistent_lsn > state.remote_consistent_lsn
}
}
/// Control how often the manager task should wake up to check updates.
/// There is no need to check for updates more often than this.
const REFRESH_INTERVAL: Duration = Duration::from_millis(300);
/// How often to save the control file if the is no other activity.
const CF_SAVE_INTERVAL: Duration = Duration::from_secs(300);
/// This task gets spawned alongside each timeline and is responsible for managing the timeline's
/// background tasks.
/// Be careful, this task is not respawned on panic, so it should not panic.
#[instrument(name = "manager", skip_all, fields(ttid = %tli.ttid))]
pub async fn main_task(
tli: Arc<Timeline>,
@@ -55,20 +91,50 @@ pub async fn main_task(
}
};
// sets whether timeline is active for broker pushes or not
let mut tli_broker_active = broker_active_set.guard(tli.clone());
let ttid = tli.ttid;
// configuration & dependencies
let wal_seg_size = tli.get_wal_seg_size().await;
let heartbeat_timeout = conf.heartbeat_timeout;
let mut state_version_rx = tli.get_state_version_rx();
let walsenders = tli.get_walsenders();
let walreceivers = tli.get_walreceivers();
// current state
let mut state_version_rx = tli.get_state_version_rx();
let mut num_computes_rx = walreceivers.get_num_rx();
let mut tli_broker_active = broker_active_set.guard(tli.clone());
let mut last_removed_segno = 0 as XLogSegNo;
// list of background tasks
let mut backup_task: Option<WalBackupTaskHandle> = None;
let mut recovery_task: Option<JoinHandle<()>> = None;
let mut partial_backup_task: Option<JoinHandle<()>> = None;
let mut wal_removal_task: Option<JoinHandle<anyhow::Result<u64>>> = None;
// Start recovery task which always runs on the timeline.
if conf.peer_recovery_enabled {
match tli.full_access_guard().await {
Ok(tli) => {
recovery_task = Some(tokio::spawn(recovery_main(tli, conf.clone())));
}
Err(e) => {
warn!("failed to start recovery task: {:?}", e);
}
}
}
// Start partial backup task which always runs on the timeline.
if conf.is_wal_backup_enabled() && conf.partial_backup_enabled {
match tli.full_access_guard().await {
Ok(tli) => {
partial_backup_task = Some(tokio::spawn(wal_backup_partial::main_task(
tli,
conf.clone(),
)));
}
Err(e) => {
warn!("failed to start partial backup task: {:?}", e);
}
}
}
let last_state = 'outer: loop {
MANAGER_ITERATIONS_TOTAL.inc();
@@ -76,47 +142,36 @@ pub async fn main_task(
let state_snapshot = StateSnapshot::new(tli.read_shared_state().await, heartbeat_timeout);
let num_computes = *num_computes_rx.borrow();
let is_wal_backup_required =
wal_backup::is_wal_backup_required(wal_seg_size, num_computes, &state_snapshot);
let is_wal_backup_required = update_backup(
&conf,
&tli,
wal_seg_size,
num_computes,
&state_snapshot,
&mut backup_task,
)
.await;
if conf.is_wal_backup_enabled() {
wal_backup::update_task(
&conf,
ttid,
is_wal_backup_required,
&state_snapshot,
&mut backup_task,
)
.await;
}
let _is_active = update_is_active(
is_wal_backup_required,
num_computes,
&state_snapshot,
&mut tli_broker_active,
&tli,
);
let is_active = is_wal_backup_required
|| num_computes > 0
|| state_snapshot.remote_consistent_lsn < state_snapshot.commit_lsn;
let next_cfile_save = update_control_file_save(&state_snapshot, &tli).await;
// update the broker timeline set
if tli_broker_active.set(is_active) {
// write log if state has changed
info!(
"timeline active={} now, remote_consistent_lsn={}, commit_lsn={}",
is_active, state_snapshot.remote_consistent_lsn, state_snapshot.commit_lsn,
);
MANAGER_ACTIVE_CHANGES.inc();
if !is_active {
// TODO: maybe use tokio::spawn?
if let Err(e) = tli.maybe_persist_control_file(false).await {
warn!("control file save in update_status failed: {:?}", e);
}
}
}
// update the state in Arc<Timeline>
tli.wal_backup_active
.store(backup_task.is_some(), std::sync::atomic::Ordering::Relaxed);
tli.broker_active
.store(is_active, std::sync::atomic::Ordering::Relaxed);
update_wal_removal(
&conf,
walsenders,
&tli,
wal_seg_size,
&state_snapshot,
last_removed_segno,
&mut wal_removal_task,
)
.await;
// wait until something changes. tx channels are stored under Arc, so they will not be
// dropped until the manager task is finished.
@@ -135,11 +190,189 @@ pub async fn main_task(
_ = num_computes_rx.changed() => {
// number of connected computes was updated
}
_ = async {
if let Some(timeout) = next_cfile_save {
tokio::time::sleep_until(timeout).await
} else {
futures::future::pending().await
}
} => {
// it's time to save the control file
}
res = async {
if let Some(task) = &mut wal_removal_task {
task.await
} else {
futures::future::pending().await
}
} => {
// WAL removal task finished
wal_removal_task = None;
update_wal_removal_end(res, &tli, &mut last_removed_segno);
}
}
};
// shutdown background tasks
if conf.is_wal_backup_enabled() {
wal_backup::update_task(&conf, ttid, false, &last_state, &mut backup_task).await;
wal_backup::update_task(&conf, &tli, false, &last_state, &mut backup_task).await;
}
if let Some(recovery_task) = recovery_task {
if let Err(e) = recovery_task.await {
warn!("recovery task failed: {:?}", e);
}
}
if let Some(partial_backup_task) = partial_backup_task {
if let Err(e) = partial_backup_task.await {
warn!("partial backup task failed: {:?}", e);
}
}
if let Some(wal_removal_task) = wal_removal_task {
let res = wal_removal_task.await;
update_wal_removal_end(res, &tli, &mut last_removed_segno);
}
}
/// Spawns/kills backup task and returns true if backup is required.
async fn update_backup(
conf: &SafeKeeperConf,
tli: &Arc<Timeline>,
wal_seg_size: usize,
num_computes: usize,
state: &StateSnapshot,
backup_task: &mut Option<WalBackupTaskHandle>,
) -> bool {
let is_wal_backup_required =
wal_backup::is_wal_backup_required(wal_seg_size, num_computes, state);
if conf.is_wal_backup_enabled() {
wal_backup::update_task(conf, tli, is_wal_backup_required, state, backup_task).await;
}
// update the state in Arc<Timeline>
tli.wal_backup_active
.store(backup_task.is_some(), std::sync::atomic::Ordering::Relaxed);
is_wal_backup_required
}
/// Update is_active flag and returns its value.
fn update_is_active(
is_wal_backup_required: bool,
num_computes: usize,
state: &StateSnapshot,
tli_broker_active: &mut TimelineSetGuard,
tli: &Arc<Timeline>,
) -> bool {
let is_active = is_wal_backup_required
|| num_computes > 0
|| state.remote_consistent_lsn < state.commit_lsn;
// update the broker timeline set
if tli_broker_active.set(is_active) {
// write log if state has changed
info!(
"timeline active={} now, remote_consistent_lsn={}, commit_lsn={}",
is_active, state.remote_consistent_lsn, state.commit_lsn,
);
MANAGER_ACTIVE_CHANGES.inc();
}
// update the state in Arc<Timeline>
tli.broker_active
.store(is_active, std::sync::atomic::Ordering::Relaxed);
is_active
}
/// Save control file if needed. Returns Instant if we should persist the control file in the future.
async fn update_control_file_save(
state: &StateSnapshot,
tli: &Arc<Timeline>,
) -> Option<tokio::time::Instant> {
if !state.inmem_flush_pending {
return None;
}
if state.cfile_last_persist_at.elapsed() > CF_SAVE_INTERVAL {
let mut write_guard = tli.write_shared_state().await;
// this can be done in the background because it blocks manager task, but flush() should
// be fast enough not to be a problem now
if let Err(e) = write_guard.sk.state.flush().await {
warn!("failed to save control file: {:?}", e);
}
None
} else {
// we should wait until next CF_SAVE_INTERVAL
Some((state.cfile_last_persist_at + CF_SAVE_INTERVAL).into())
}
}
/// Spawns WAL removal task if needed.
async fn update_wal_removal(
conf: &SafeKeeperConf,
walsenders: &Arc<WalSenders>,
tli: &Arc<Timeline>,
wal_seg_size: usize,
state: &StateSnapshot,
last_removed_segno: u64,
wal_removal_task: &mut Option<JoinHandle<anyhow::Result<u64>>>,
) {
if wal_removal_task.is_some() {
// WAL removal is already in progress
return;
}
// If enabled, we use LSN of the most lagging walsender as a WAL removal horizon.
// This allows to get better read speed for pageservers that are lagging behind,
// at the cost of keeping more WAL on disk.
let replication_horizon_lsn = if conf.walsenders_keep_horizon {
walsenders.laggard_lsn()
} else {
None
};
let removal_horizon_lsn = calc_horizon_lsn(state, replication_horizon_lsn);
let removal_horizon_segno = removal_horizon_lsn
.segment_number(wal_seg_size)
.saturating_sub(1);
if removal_horizon_segno > last_removed_segno {
// we need to remove WAL
let remover = crate::wal_storage::Storage::remove_up_to(
&tli.read_shared_state().await.sk.wal_store,
removal_horizon_segno,
);
*wal_removal_task = Some(tokio::spawn(async move {
remover.await?;
Ok(removal_horizon_segno)
}));
}
}
/// Update the state after WAL removal task finished.
fn update_wal_removal_end(
res: Result<anyhow::Result<u64>, JoinError>,
tli: &Arc<Timeline>,
last_removed_segno: &mut u64,
) {
let new_last_removed_segno = match res {
Ok(Ok(segno)) => segno,
Err(e) => {
warn!("WAL removal task failed: {:?}", e);
return;
}
Ok(Err(e)) => {
warn!("WAL removal task failed: {:?}", e);
return;
}
};
*last_removed_segno = new_last_removed_segno;
// update the state in Arc<Timeline>
tli.last_removed_segno
.store(new_last_removed_segno, std::sync::atomic::Ordering::Relaxed);
}

View File

@@ -3,7 +3,7 @@
//! all from the disk on startup and keeping them in memory.
use crate::safekeeper::ServerInfo;
use crate::timeline::{Timeline, TimelineError};
use crate::timeline::{get_tenant_dir, get_timeline_dir, Timeline, TimelineError};
use crate::timelines_set::TimelinesSet;
use crate::SafeKeeperConf;
use anyhow::{bail, Context, Result};
@@ -127,7 +127,7 @@ impl GlobalTimelines {
state.get_dependencies()
};
let timelines_dir = conf.tenant_dir(&tenant_id);
let timelines_dir = get_tenant_dir(&conf, &tenant_id);
for timelines_dir_entry in std::fs::read_dir(&timelines_dir)
.with_context(|| format!("failed to list timelines dir {}", timelines_dir))?
{
@@ -348,11 +348,7 @@ impl GlobalTimelines {
}
Err(_) => {
// Timeline is not memory, but it may still exist on disk in broken state.
let dir_path = TIMELINES_STATE
.lock()
.unwrap()
.get_conf()
.timeline_dir(ttid);
let dir_path = get_timeline_dir(TIMELINES_STATE.lock().unwrap().get_conf(), ttid);
let dir_existed = delete_dir(dir_path)?;
Ok(TimelineDeleteForceResult {
@@ -401,13 +397,10 @@ impl GlobalTimelines {
// Note that we could concurrently create new timelines while we were deleting them,
// so the directory may be not empty. In this case timelines will have bad state
// and timeline background jobs can panic.
delete_dir(
TIMELINES_STATE
.lock()
.unwrap()
.get_conf()
.tenant_dir(tenant_id),
)?;
delete_dir(get_tenant_dir(
TIMELINES_STATE.lock().unwrap().get_conf(),
tenant_id,
))?;
// FIXME: we temporarily disabled removing timelines from the map, see `delete_force`
// let tlis_after_delete = Self::get_all_for_tenant(*tenant_id);

View File

@@ -30,9 +30,9 @@ use tracing::*;
use utils::{id::TenantTimelineId, lsn::Lsn};
use crate::metrics::{BACKED_UP_SEGMENTS, BACKUP_ERRORS, WAL_BACKUP_TASKS};
use crate::timeline::{PeerInfo, Timeline};
use crate::timeline::{FullAccessTimeline, PeerInfo, Timeline};
use crate::timeline_manager::StateSnapshot;
use crate::{GlobalTimelines, SafeKeeperConf, WAL_BACKUP_RUNTIME};
use crate::{SafeKeeperConf, WAL_BACKUP_RUNTIME};
use once_cell::sync::OnceCell;
@@ -63,13 +63,13 @@ pub fn is_wal_backup_required(
/// is running, kill it.
pub async fn update_task(
conf: &SafeKeeperConf,
ttid: TenantTimelineId,
tli: &Arc<Timeline>,
need_backup: bool,
state: &StateSnapshot,
entry: &mut Option<WalBackupTaskHandle>,
) {
let (offloader, election_dbg_str) =
determine_offloader(&state.peers, state.backup_lsn, ttid, conf);
determine_offloader(&state.peers, state.backup_lsn, tli.ttid, conf);
let elected_me = Some(conf.my_id) == offloader;
let should_task_run = need_backup && elected_me;
@@ -80,15 +80,8 @@ pub async fn update_task(
info!("elected for backup: {}", election_dbg_str);
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
let timeline_dir = conf.timeline_dir(&ttid);
let async_task = backup_task_main(
ttid,
timeline_dir,
conf.workdir.clone(),
conf.backup_parallel_jobs,
shutdown_rx,
);
let async_task = backup_task_main(tli.clone(), conf.backup_parallel_jobs, shutdown_rx);
let handle = if conf.current_thread_runtime {
tokio::spawn(async_task)
@@ -198,39 +191,32 @@ pub fn init_remote_storage(conf: &SafeKeeperConf) {
}
struct WalBackupTask {
timeline: Arc<Timeline>,
timeline: FullAccessTimeline,
timeline_dir: Utf8PathBuf,
workspace_dir: Utf8PathBuf,
wal_seg_size: usize,
parallel_jobs: usize,
commit_lsn_watch_rx: watch::Receiver<Lsn>,
}
/// Offload single timeline.
#[instrument(name = "WAL backup", skip_all, fields(ttid = %ttid))]
async fn backup_task_main(
ttid: TenantTimelineId,
timeline_dir: Utf8PathBuf,
workspace_dir: Utf8PathBuf,
parallel_jobs: usize,
mut shutdown_rx: Receiver<()>,
) {
#[instrument(name = "WAL backup", skip_all, fields(ttid = %tli.ttid))]
async fn backup_task_main(tli: Arc<Timeline>, parallel_jobs: usize, mut shutdown_rx: Receiver<()>) {
let _guard = WAL_BACKUP_TASKS.guard();
let tli = match tli.full_access_guard().await {
Ok(tli) => tli,
Err(e) => {
error!("backup error: {}", e);
return;
}
};
info!("started");
let res = GlobalTimelines::get(ttid);
if let Err(e) = res {
error!("backup error: {}", e);
return;
}
let tli = res.unwrap();
let mut wb = WalBackupTask {
wal_seg_size: tli.get_wal_seg_size().await,
commit_lsn_watch_rx: tli.get_commit_lsn_watch_rx(),
timeline_dir: tli.get_timeline_dir(),
timeline: tli,
timeline_dir,
workspace_dir,
parallel_jobs,
};
@@ -297,7 +283,6 @@ impl WalBackupTask {
commit_lsn,
self.wal_seg_size,
&self.timeline_dir,
&self.workspace_dir,
self.parallel_jobs,
)
.await
@@ -319,18 +304,18 @@ impl WalBackupTask {
}
async fn backup_lsn_range(
timeline: &Arc<Timeline>,
timeline: &FullAccessTimeline,
backup_lsn: &mut Lsn,
end_lsn: Lsn,
wal_seg_size: usize,
timeline_dir: &Utf8Path,
workspace_dir: &Utf8Path,
parallel_jobs: usize,
) -> Result<()> {
if parallel_jobs < 1 {
anyhow::bail!("parallel_jobs must be >= 1");
}
let remote_timeline_path = remote_timeline_path(&timeline.ttid)?;
let start_lsn = *backup_lsn;
let segments = get_segments(start_lsn, end_lsn, wal_seg_size);
@@ -343,7 +328,11 @@ async fn backup_lsn_range(
loop {
let added_task = match iter.next() {
Some(s) => {
uploads.push_back(backup_single_segment(s, timeline_dir, workspace_dir));
uploads.push_back(backup_single_segment(
s,
timeline_dir,
&remote_timeline_path,
));
true
}
None => false,
@@ -381,18 +370,10 @@ async fn backup_lsn_range(
async fn backup_single_segment(
seg: &Segment,
timeline_dir: &Utf8Path,
workspace_dir: &Utf8Path,
remote_timeline_path: &RemotePath,
) -> Result<Segment> {
let segment_file_path = seg.file_path(timeline_dir)?;
let remote_segment_path = segment_file_path
.strip_prefix(workspace_dir)
.context("Failed to strip workspace dir prefix")
.and_then(RemotePath::new)
.with_context(|| {
format!(
"Failed to resolve remote part of path {segment_file_path:?} for base {workspace_dir:?}",
)
})?;
let remote_segment_path = seg.remote_path(remote_timeline_path);
let res = backup_object(&segment_file_path, &remote_segment_path, seg.size()).await;
if res.is_ok() {
@@ -430,6 +411,10 @@ impl Segment {
Ok(timeline_dir.join(self.object_name()))
}
pub fn remote_path(self, remote_timeline_path: &RemotePath) -> RemotePath {
remote_timeline_path.join(self.object_name())
}
pub fn size(self) -> usize {
(u64::from(self.end_lsn) - u64::from(self.start_lsn)) as usize
}
@@ -530,8 +515,7 @@ pub async fn read_object(
/// when called.
pub async fn delete_timeline(ttid: &TenantTimelineId) -> Result<()> {
let storage = get_configured_remote_storage();
let ttid_path = Utf8Path::new(&ttid.tenant_id.to_string()).join(ttid.timeline_id.to_string());
let remote_path = RemotePath::new(&ttid_path)?;
let remote_path = remote_timeline_path(ttid)?;
// see DEFAULT_MAX_KEYS_PER_LIST_RESPONSE
// const Option unwrap is not stable, otherwise it would be const.
@@ -613,15 +597,17 @@ pub async fn copy_s3_segments(
.as_ref()
.unwrap();
let relative_dst_path =
Utf8Path::new(&dst_ttid.tenant_id.to_string()).join(dst_ttid.timeline_id.to_string());
let remote_path = RemotePath::new(&relative_dst_path)?;
let remote_dst_path = remote_timeline_path(dst_ttid)?;
let cancel = CancellationToken::new();
let files = storage
.list(Some(&remote_path), ListingMode::NoDelimiter, None, &cancel)
.list(
Some(&remote_dst_path),
ListingMode::NoDelimiter,
None,
&cancel,
)
.await?
.keys;
@@ -635,9 +621,6 @@ pub async fn copy_s3_segments(
uploaded_segments
);
let relative_src_path =
Utf8Path::new(&src_ttid.tenant_id.to_string()).join(src_ttid.timeline_id.to_string());
for segno in from_segment..to_segment {
if segno % SEGMENTS_PROGRESS_REPORT_INTERVAL == 0 {
info!("copied all segments from {} until {}", from_segment, segno);
@@ -649,8 +632,8 @@ pub async fn copy_s3_segments(
}
debug!("copying segment {}", segment_name);
let from = RemotePath::new(&relative_src_path.join(&segment_name))?;
let to = RemotePath::new(&relative_dst_path.join(&segment_name))?;
let from = remote_timeline_path(src_ttid)?.join(&segment_name);
let to = remote_dst_path.join(&segment_name);
storage.copy_object(&from, &to, &cancel).await?;
}
@@ -661,3 +644,8 @@ pub async fn copy_s3_segments(
);
Ok(())
}
/// Get S3 (remote_storage) prefix path used for timeline files.
pub fn remote_timeline_path(ttid: &TenantTimelineId) -> Result<RemotePath> {
RemotePath::new(&Utf8Path::new(&ttid.tenant_id.to_string()).join(ttid.timeline_id.to_string()))
}

View File

@@ -18,8 +18,6 @@
//! This way control file stores information about all potentially existing
//! remote partial segments and can clean them up after uploading a newer version.
use std::sync::Arc;
use camino::Utf8PathBuf;
use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI};
use rand::Rng;
@@ -32,8 +30,9 @@ use utils::lsn::Lsn;
use crate::{
metrics::{PARTIAL_BACKUP_UPLOADED_BYTES, PARTIAL_BACKUP_UPLOADS},
safekeeper::Term,
timeline::Timeline,
wal_backup, SafeKeeperConf,
timeline::FullAccessTimeline,
wal_backup::{self, remote_timeline_path},
SafeKeeperConf,
};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
@@ -83,10 +82,10 @@ impl State {
struct PartialBackup {
wal_seg_size: usize,
tli: Arc<Timeline>,
tli: FullAccessTimeline,
conf: SafeKeeperConf,
local_prefix: Utf8PathBuf,
remote_prefix: Utf8PathBuf,
remote_timeline_path: RemotePath,
state: State,
}
@@ -153,7 +152,7 @@ impl PartialBackup {
let backup_bytes = flush_lsn.segment_offset(self.wal_seg_size);
let local_path = self.local_prefix.join(self.local_segment_name(segno));
let remote_path = RemotePath::new(self.remote_prefix.join(&prepared.name).as_ref())?;
let remote_path = self.remote_timeline_path.join(&prepared.name);
// Upload first `backup_bytes` bytes of the segment to the remote storage.
wal_backup::backup_partial_segment(&local_path, &remote_path, backup_bytes).await?;
@@ -253,7 +252,7 @@ impl PartialBackup {
info!("deleting objects: {:?}", segments_to_delete);
let mut objects_to_delete = vec![];
for seg in segments_to_delete.iter() {
let remote_path = RemotePath::new(self.remote_prefix.join(seg).as_ref())?;
let remote_path = self.remote_timeline_path.join(seg);
objects_to_delete.push(remote_path);
}
@@ -273,7 +272,7 @@ impl PartialBackup {
}
#[instrument(name = "Partial backup", skip_all, fields(ttid = %tli.ttid))]
pub async fn main_task(tli: Arc<Timeline>, conf: SafeKeeperConf) {
pub async fn main_task(tli: FullAccessTimeline, conf: SafeKeeperConf) {
debug!("started");
let await_duration = conf.partial_backup_timeout;
@@ -289,11 +288,11 @@ pub async fn main_task(tli: Arc<Timeline>, conf: SafeKeeperConf) {
let mut flush_lsn_rx = tli.get_term_flush_lsn_watch_rx();
let wal_seg_size = tli.get_wal_seg_size().await;
let local_prefix = tli.timeline_dir.clone();
let remote_prefix = match tli.timeline_dir.strip_prefix(&conf.workdir) {
Ok(path) => path.to_owned(),
let local_prefix = tli.get_timeline_dir();
let remote_timeline_path = match remote_timeline_path(&tli.ttid) {
Ok(path) => path,
Err(e) => {
error!("failed to strip workspace dir prefix: {:?}", e);
error!("failed to create remote path: {:?}", e);
return;
}
};
@@ -304,7 +303,7 @@ pub async fn main_task(tli: Arc<Timeline>, conf: SafeKeeperConf) {
state: persistent_state.partial_backup,
conf,
local_prefix,
remote_prefix,
remote_timeline_path,
};
debug!("state: {:?}", backup.state);

View File

@@ -25,7 +25,7 @@ use utils::crashsafe::durable_rename;
use crate::metrics::{time_io_closure, WalStorageMetrics, REMOVED_WAL_SEGMENTS};
use crate::state::TimelinePersistentState;
use crate::wal_backup::read_object;
use crate::wal_backup::{read_object, remote_timeline_path};
use crate::SafeKeeperConf;
use postgres_ffi::waldecoder::WalStreamDecoder;
use postgres_ffi::XLogFileName;
@@ -536,7 +536,7 @@ async fn remove_segments_from_disk(
}
pub struct WalReader {
workdir: Utf8PathBuf,
remote_path: RemotePath,
timeline_dir: Utf8PathBuf,
wal_seg_size: usize,
pos: Lsn,
@@ -558,7 +558,7 @@ pub struct WalReader {
impl WalReader {
pub fn new(
workdir: Utf8PathBuf,
ttid: &TenantTimelineId,
timeline_dir: Utf8PathBuf,
state: &TimelinePersistentState,
start_pos: Lsn,
@@ -586,7 +586,7 @@ impl WalReader {
}
Ok(Self {
workdir,
remote_path: remote_timeline_path(ttid)?,
timeline_dir,
wal_seg_size: state.server.wal_seg_size as usize,
pos: start_pos,
@@ -684,7 +684,7 @@ impl WalReader {
let xlogoff = self.pos.segment_offset(self.wal_seg_size);
let segno = self.pos.segment_number(self.wal_seg_size);
let wal_file_name = XLogFileName(PG_TLI, segno, self.wal_seg_size);
let wal_file_path = self.timeline_dir.join(wal_file_name);
let wal_file_path = self.timeline_dir.join(&wal_file_name);
// Try to open local file, if we may have WAL locally
if self.pos >= self.local_start_lsn {
@@ -712,16 +712,7 @@ impl WalReader {
// Try to open remote file, if remote reads are enabled
if self.enable_remote_read {
let remote_wal_file_path = wal_file_path
.strip_prefix(&self.workdir)
.context("Failed to strip workdir prefix")
.and_then(RemotePath::new)
.with_context(|| {
format!(
"Failed to resolve remote part of path {:?} for base {:?}",
wal_file_path, self.workdir,
)
})?;
let remote_wal_file_path = self.remote_path.join(&wal_file_name);
return read_object(&remote_wal_file_path, xlogoff as u64).await;
}