Start and stop per timeline recovery task.

Slightly refactors init: now load_tenant_timelines is also async to properly
init the timeline, but to keep global map lock sync we just acquire it anew for
each timeline.

Recovery task itself is just a stub here.

part of
https://github.com/neondatabase/neon/pull/4875
This commit is contained in:
Arseny Sher
2023-08-21 13:17:12 +03:00
committed by Arseny Sher
parent 39e3fbbeb0
commit 87f7d6bce3
6 changed files with 145 additions and 64 deletions

View File

@@ -341,21 +341,35 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
let (wal_backup_launcher_tx, wal_backup_launcher_rx) = mpsc::channel(100); let (wal_backup_launcher_tx, wal_backup_launcher_rx) = mpsc::channel(100);
// Load all timelines from disk to memory.
GlobalTimelines::init(conf.clone(), wal_backup_launcher_tx)?;
// Keep handles to main tasks to die if any of them disappears. // Keep handles to main tasks to die if any of them disappears.
let mut tasks_handles: FuturesUnordered<BoxFuture<(String, JoinTaskRes)>> = let mut tasks_handles: FuturesUnordered<BoxFuture<(String, JoinTaskRes)>> =
FuturesUnordered::new(); FuturesUnordered::new();
// Start wal backup launcher before loading timelines as we'll notify it
// through the channel about timelines which need offloading, not draining
// the channel would cause deadlock.
let current_thread_rt = conf
.current_thread_runtime
.then(|| Handle::try_current().expect("no runtime in main"));
let conf_ = conf.clone();
let wal_backup_handle = current_thread_rt
.as_ref()
.unwrap_or_else(|| WAL_BACKUP_RUNTIME.handle())
.spawn(wal_backup::wal_backup_launcher_task_main(
conf_,
wal_backup_launcher_rx,
))
.map(|res| ("WAL backup launcher".to_owned(), res));
tasks_handles.push(Box::pin(wal_backup_handle));
// Load all timelines from disk to memory.
GlobalTimelines::init(conf.clone(), wal_backup_launcher_tx).await?;
let conf_ = conf.clone(); let conf_ = conf.clone();
// Run everything in current thread rt, if asked. // Run everything in current thread rt, if asked.
if conf.current_thread_runtime { if conf.current_thread_runtime {
info!("running in current thread runtime"); info!("running in current thread runtime");
} }
let current_thread_rt = conf
.current_thread_runtime
.then(|| Handle::try_current().expect("no runtime in main"));
let wal_service_handle = current_thread_rt let wal_service_handle = current_thread_rt
.as_ref() .as_ref()
@@ -408,17 +422,6 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
.map(|res| ("WAL remover".to_owned(), res)); .map(|res| ("WAL remover".to_owned(), res));
tasks_handles.push(Box::pin(wal_remover_handle)); tasks_handles.push(Box::pin(wal_remover_handle));
let conf_ = conf.clone();
let wal_backup_handle = current_thread_rt
.as_ref()
.unwrap_or_else(|| WAL_BACKUP_RUNTIME.handle())
.spawn(wal_backup::wal_backup_launcher_task_main(
conf_,
wal_backup_launcher_rx,
))
.map(|res| ("WAL backup launcher".to_owned(), res));
tasks_handles.push(Box::pin(wal_backup_handle));
set_build_info_metric(GIT_VERSION); set_build_info_metric(GIT_VERSION);
// TODO: update tokio-stream, convert to real async Stream with // TODO: update tokio-stream, convert to real async Stream with

View File

@@ -19,6 +19,7 @@ pub mod json_ctrl;
pub mod metrics; pub mod metrics;
pub mod pull_timeline; pub mod pull_timeline;
pub mod receive_wal; pub mod receive_wal;
pub mod recovery;
pub mod remove_wal; pub mod remove_wal;
pub mod safekeeper; pub mod safekeeper;
pub mod send_wal; pub mod send_wal;

View File

@@ -227,7 +227,9 @@ async fn pull_timeline(status: TimelineStatus, host: String) -> Result<Response>
tokio::fs::create_dir_all(conf.tenant_dir(&ttid.tenant_id)).await?; tokio::fs::create_dir_all(conf.tenant_dir(&ttid.tenant_id)).await?;
tokio::fs::rename(tli_dir_path, &timeline_path).await?; tokio::fs::rename(tli_dir_path, &timeline_path).await?;
let tli = GlobalTimelines::load_timeline(ttid).context("Failed to load timeline after copy")?; let tli = GlobalTimelines::load_timeline(ttid)
.await
.context("Failed to load timeline after copy")?;
info!( info!(
"Loaded timeline {}, flush_lsn={}", "Loaded timeline {}, flush_lsn={}",

View File

@@ -0,0 +1,40 @@
//! This module implements pulling WAL from peer safekeepers if compute can't
//! provide it, i.e. safekeeper lags too much.
use std::sync::Arc;
use tokio::{select, time::sleep, time::Duration};
use tracing::{info, instrument};
use crate::{timeline::Timeline, SafeKeeperConf};
/// Entrypoint for per timeline task which always runs, checking whether
/// recovery for this safekeeper is needed and starting it if so.
#[instrument(name = "recovery task", skip_all, fields(ttid = %tli.ttid))]
pub async fn recovery_main(tli: Arc<Timeline>, _conf: SafeKeeperConf) {
info!("started");
let mut cancellation_rx = match tli.get_cancellation_rx() {
Ok(rx) => rx,
Err(_) => {
info!("timeline canceled during task start");
return;
}
};
select! {
_ = recovery_main_loop(tli) => { unreachable!() }
_ = cancellation_rx.changed() => {
info!("stopped");
}
}
}
const CHECK_INTERVAL_MS: u64 = 2000;
/// Check regularly whether we need to start recovery.
async fn recovery_main_loop(_tli: Arc<Timeline>) {
let check_duration = Duration::from_millis(CHECK_INTERVAL_MS);
loop {
sleep(check_duration).await;
}
}

View File

@@ -27,6 +27,7 @@ use storage_broker::proto::SafekeeperTimelineInfo;
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId; use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
use crate::receive_wal::WalReceivers; use crate::receive_wal::WalReceivers;
use crate::recovery::recovery_main;
use crate::safekeeper::{ use crate::safekeeper::{
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState, AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState,
SafekeeperMemState, ServerInfo, Term, SafekeeperMemState, ServerInfo, Term,
@@ -327,13 +328,13 @@ pub struct Timeline {
impl Timeline { impl Timeline {
/// Load existing timeline from disk. /// Load existing timeline from disk.
pub fn load_timeline( pub fn load_timeline(
conf: SafeKeeperConf, conf: &SafeKeeperConf,
ttid: TenantTimelineId, ttid: TenantTimelineId,
wal_backup_launcher_tx: Sender<TenantTimelineId>, wal_backup_launcher_tx: Sender<TenantTimelineId>,
) -> Result<Timeline> { ) -> Result<Timeline> {
let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered(); let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
let shared_state = SharedState::restore(&conf, &ttid)?; let shared_state = SharedState::restore(conf, &ttid)?;
let rcl = shared_state.sk.state.remote_consistent_lsn; let rcl = shared_state.sk.state.remote_consistent_lsn;
let (commit_lsn_watch_tx, commit_lsn_watch_rx) = let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
watch::channel(shared_state.sk.state.commit_lsn); watch::channel(shared_state.sk.state.commit_lsn);
@@ -355,7 +356,7 @@ impl Timeline {
/// Create a new timeline, which is not yet persisted to disk. /// Create a new timeline, which is not yet persisted to disk.
pub fn create_empty( pub fn create_empty(
conf: SafeKeeperConf, conf: &SafeKeeperConf,
ttid: TenantTimelineId, ttid: TenantTimelineId,
wal_backup_launcher_tx: Sender<TenantTimelineId>, wal_backup_launcher_tx: Sender<TenantTimelineId>,
server_info: ServerInfo, server_info: ServerInfo,
@@ -371,7 +372,7 @@ impl Timeline {
wal_backup_launcher_tx, wal_backup_launcher_tx,
commit_lsn_watch_tx, commit_lsn_watch_tx,
commit_lsn_watch_rx, commit_lsn_watch_rx,
mutex: Mutex::new(SharedState::create_new(&conf, &ttid, state)?), mutex: Mutex::new(SharedState::create_new(conf, &ttid, state)?),
walsenders: WalSenders::new(Lsn(0)), walsenders: WalSenders::new(Lsn(0)),
walreceivers: WalReceivers::new(), walreceivers: WalReceivers::new(),
cancellation_rx, cancellation_rx,
@@ -380,12 +381,16 @@ impl Timeline {
}) })
} }
/// Initialize fresh timeline on disk and start background tasks. If bootstrap /// Initialize fresh timeline on disk and start background tasks. If init
/// fails, timeline is cancelled and cannot be used anymore. /// fails, timeline is cancelled and cannot be used anymore.
/// ///
/// Bootstrap is transactional, so if it fails, created files will be deleted, /// Init is transactional, so if it fails, created files will be deleted,
/// and state on disk should remain unchanged. /// and state on disk should remain unchanged.
pub async fn bootstrap(&self, shared_state: &mut MutexGuard<'_, SharedState>) -> Result<()> { pub async fn init_new(
self: &Arc<Timeline>,
shared_state: &mut MutexGuard<'_, SharedState>,
conf: &SafeKeeperConf,
) -> Result<()> {
match fs::metadata(&self.timeline_dir).await { match fs::metadata(&self.timeline_dir).await {
Ok(_) => { Ok(_) => {
// Timeline directory exists on disk, we should leave state unchanged // Timeline directory exists on disk, we should leave state unchanged
@@ -401,7 +406,7 @@ impl Timeline {
// Create timeline directory. // Create timeline directory.
fs::create_dir_all(&self.timeline_dir).await?; fs::create_dir_all(&self.timeline_dir).await?;
// Write timeline to disk and TODO: start background tasks. // Write timeline to disk and start background tasks.
if let Err(e) = shared_state.sk.persist().await { if let Err(e) = shared_state.sk.persist().await {
// Bootstrap failed, cancel timeline and remove timeline directory. // Bootstrap failed, cancel timeline and remove timeline directory.
self.cancel(shared_state); self.cancel(shared_state);
@@ -415,12 +420,16 @@ impl Timeline {
return Err(e); return Err(e);
} }
self.bootstrap(conf);
// TODO: add more initialization steps here
self.update_status(shared_state);
Ok(()) Ok(())
} }
/// Bootstrap new or existing timeline starting background stasks.
pub fn bootstrap(self: &Arc<Timeline>, conf: &SafeKeeperConf) {
// Start recovery task which always runs on the timeline.
tokio::spawn(recovery_main(self.clone(), conf.clone()));
}
/// Delete timeline from disk completely, by removing timeline directory. Background /// Delete timeline from disk completely, by removing timeline directory. Background
/// timeline activities will stop eventually. /// timeline activities will stop eventually.
pub async fn delete_from_disk( pub async fn delete_from_disk(
@@ -454,6 +463,16 @@ impl Timeline {
*self.cancellation_rx.borrow() *self.cancellation_rx.borrow()
} }
/// Returns watch channel which gets value when timeline is cancelled. It is
/// guaranteed to have not cancelled value observed (errors otherwise).
pub fn get_cancellation_rx(&self) -> Result<watch::Receiver<bool>> {
let rx = self.cancellation_rx.clone();
if *rx.borrow() {
bail!(TimelineError::Cancelled(self.ttid));
}
Ok(rx)
}
/// Take a writing mutual exclusive lock on timeline shared_state. /// Take a writing mutual exclusive lock on timeline shared_state.
pub async fn write_shared_state(&self) -> MutexGuard<SharedState> { pub async fn write_shared_state(&self) -> MutexGuard<SharedState> {
self.mutex.lock().await self.mutex.lock().await

View File

@@ -11,7 +11,7 @@ use serde::Serialize;
use std::collections::HashMap; use std::collections::HashMap;
use std::path::PathBuf; use std::path::PathBuf;
use std::str::FromStr; use std::str::FromStr;
use std::sync::{Arc, Mutex, MutexGuard}; use std::sync::{Arc, Mutex};
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use tracing::*; use tracing::*;
use utils::id::{TenantId, TenantTimelineId, TimelineId}; use utils::id::{TenantId, TenantTimelineId, TimelineId};
@@ -71,19 +71,23 @@ pub struct GlobalTimelines;
impl GlobalTimelines { impl GlobalTimelines {
/// Inject dependencies needed for the timeline constructors and load all timelines to memory. /// Inject dependencies needed for the timeline constructors and load all timelines to memory.
pub fn init( pub async fn init(
conf: SafeKeeperConf, conf: SafeKeeperConf,
wal_backup_launcher_tx: Sender<TenantTimelineId>, wal_backup_launcher_tx: Sender<TenantTimelineId>,
) -> Result<()> { ) -> Result<()> {
let mut state = TIMELINES_STATE.lock().unwrap(); // clippy isn't smart enough to understand that drop(state) releases the
assert!(state.wal_backup_launcher_tx.is_none()); // lock, so use explicit block
state.wal_backup_launcher_tx = Some(wal_backup_launcher_tx); let tenants_dir = {
state.conf = Some(conf); let mut state = TIMELINES_STATE.lock().unwrap();
assert!(state.wal_backup_launcher_tx.is_none());
state.wal_backup_launcher_tx = Some(wal_backup_launcher_tx);
state.conf = Some(conf);
// Iterate through all directories and load tenants for all directories // Iterate through all directories and load tenants for all directories
// named as a valid tenant_id. // named as a valid tenant_id.
state.get_conf().workdir.clone()
};
let mut tenant_count = 0; let mut tenant_count = 0;
let tenants_dir = state.get_conf().workdir.clone();
for tenants_dir_entry in std::fs::read_dir(&tenants_dir) for tenants_dir_entry in std::fs::read_dir(&tenants_dir)
.with_context(|| format!("failed to list tenants dir {}", tenants_dir.display()))? .with_context(|| format!("failed to list tenants dir {}", tenants_dir.display()))?
{ {
@@ -93,7 +97,7 @@ impl GlobalTimelines {
TenantId::from_str(tenants_dir_entry.file_name().to_str().unwrap_or("")) TenantId::from_str(tenants_dir_entry.file_name().to_str().unwrap_or(""))
{ {
tenant_count += 1; tenant_count += 1;
GlobalTimelines::load_tenant_timelines(&mut state, tenant_id)?; GlobalTimelines::load_tenant_timelines(tenant_id).await?;
} }
} }
Err(e) => error!( Err(e) => error!(
@@ -108,7 +112,7 @@ impl GlobalTimelines {
info!( info!(
"found {} tenants directories, successfully loaded {} timelines", "found {} tenants directories, successfully loaded {} timelines",
tenant_count, tenant_count,
state.timelines.len() TIMELINES_STATE.lock().unwrap().timelines.len()
); );
Ok(()) Ok(())
} }
@@ -116,17 +120,21 @@ impl GlobalTimelines {
/// Loads all timelines for the given tenant to memory. Returns fs::read_dir /// Loads all timelines for the given tenant to memory. Returns fs::read_dir
/// errors if any. /// errors if any.
/// ///
/// Note: This function (and all reading/loading below) is sync because /// It is async for update_status_notify sake. Since TIMELINES_STATE lock is
/// timelines are loaded while holding GlobalTimelinesState lock. Which is /// sync and there is no important reason to make it async (it is always
/// fine as this is called only from single threaded main runtime on boot, /// held for a short while) we just lock and unlock it for each timeline --
/// but clippy complains anyway, and suppressing that isn't trivial as async /// this function is called during init when nothing else is running, so
/// is the keyword, ha. That only other user is pull_timeline.rs for which /// this is fine.
/// being blocked is not that bad, and we can do spawn_blocking. async fn load_tenant_timelines(tenant_id: TenantId) -> Result<()> {
fn load_tenant_timelines( let (conf, wal_backup_launcher_tx) = {
state: &mut MutexGuard<'_, GlobalTimelinesState>, let state = TIMELINES_STATE.lock().unwrap();
tenant_id: TenantId, (
) -> Result<()> { state.get_conf().clone(),
let timelines_dir = state.get_conf().tenant_dir(&tenant_id); state.wal_backup_launcher_tx.as_ref().unwrap().clone(),
)
};
let timelines_dir = conf.tenant_dir(&tenant_id);
for timelines_dir_entry in std::fs::read_dir(&timelines_dir) for timelines_dir_entry in std::fs::read_dir(&timelines_dir)
.with_context(|| format!("failed to list timelines dir {}", timelines_dir.display()))? .with_context(|| format!("failed to list timelines dir {}", timelines_dir.display()))?
{ {
@@ -136,13 +144,16 @@ impl GlobalTimelines {
TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or("")) TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or(""))
{ {
let ttid = TenantTimelineId::new(tenant_id, timeline_id); let ttid = TenantTimelineId::new(tenant_id, timeline_id);
match Timeline::load_timeline( match Timeline::load_timeline(&conf, ttid, wal_backup_launcher_tx.clone()) {
state.get_conf().clone(),
ttid,
state.wal_backup_launcher_tx.as_ref().unwrap().clone(),
) {
Ok(timeline) => { Ok(timeline) => {
state.timelines.insert(ttid, Arc::new(timeline)); let tli = Arc::new(timeline);
TIMELINES_STATE
.lock()
.unwrap()
.timelines
.insert(ttid, tli.clone());
tli.bootstrap(&conf);
tli.update_status_notify().await.unwrap();
} }
// If we can't load a timeline, it's most likely because of a corrupted // If we can't load a timeline, it's most likely because of a corrupted
// directory. We will log an error and won't allow to delete/recreate // directory. We will log an error and won't allow to delete/recreate
@@ -168,18 +179,22 @@ impl GlobalTimelines {
} }
/// Load timeline from disk to the memory. /// Load timeline from disk to the memory.
pub fn load_timeline(ttid: TenantTimelineId) -> Result<Arc<Timeline>> { pub async fn load_timeline(ttid: TenantTimelineId) -> Result<Arc<Timeline>> {
let (conf, wal_backup_launcher_tx) = TIMELINES_STATE.lock().unwrap().get_dependencies(); let (conf, wal_backup_launcher_tx) = TIMELINES_STATE.lock().unwrap().get_dependencies();
match Timeline::load_timeline(conf, ttid, wal_backup_launcher_tx) { match Timeline::load_timeline(&conf, ttid, wal_backup_launcher_tx) {
Ok(timeline) => { Ok(timeline) => {
let tli = Arc::new(timeline); let tli = Arc::new(timeline);
// TODO: prevent concurrent timeline creation/loading // TODO: prevent concurrent timeline creation/loading
TIMELINES_STATE TIMELINES_STATE
.lock() .lock()
.unwrap() .unwrap()
.timelines .timelines
.insert(ttid, tli.clone()); .insert(ttid, tli.clone());
tli.bootstrap(&conf);
Ok(tli) Ok(tli)
} }
// If we can't load a timeline, it's bad. Caller will figure it out. // If we can't load a timeline, it's bad. Caller will figure it out.
@@ -217,7 +232,7 @@ impl GlobalTimelines {
info!("creating new timeline {}", ttid); info!("creating new timeline {}", ttid);
let timeline = Arc::new(Timeline::create_empty( let timeline = Arc::new(Timeline::create_empty(
conf, &conf,
ttid, ttid,
wal_backup_launcher_tx, wal_backup_launcher_tx,
server_info, server_info,
@@ -240,23 +255,24 @@ impl GlobalTimelines {
// Write the new timeline to the disk and start background workers. // Write the new timeline to the disk and start background workers.
// Bootstrap is transactional, so if it fails, the timeline will be deleted, // Bootstrap is transactional, so if it fails, the timeline will be deleted,
// and the state on disk should remain unchanged. // and the state on disk should remain unchanged.
if let Err(e) = timeline.bootstrap(&mut shared_state).await { if let Err(e) = timeline.init_new(&mut shared_state, &conf).await {
// Note: the most likely reason for bootstrap failure is that the timeline // Note: the most likely reason for init failure is that the timeline
// directory already exists on disk. This happens when timeline is corrupted // directory already exists on disk. This happens when timeline is corrupted
// and wasn't loaded from disk on startup because of that. We want to preserve // and wasn't loaded from disk on startup because of that. We want to preserve
// the timeline directory in this case, for further inspection. // the timeline directory in this case, for further inspection.
// TODO: this is an unusual error, perhaps we should send it to sentry // TODO: this is an unusual error, perhaps we should send it to sentry
// TODO: compute will try to create timeline every second, we should add backoff // TODO: compute will try to create timeline every second, we should add backoff
error!("failed to bootstrap timeline {}: {}", ttid, e); error!("failed to init new timeline {}: {}", ttid, e);
// Timeline failed to bootstrap, it cannot be used. Remove it from the map. // Timeline failed to init, it cannot be used. Remove it from the map.
TIMELINES_STATE.lock().unwrap().timelines.remove(&ttid); TIMELINES_STATE.lock().unwrap().timelines.remove(&ttid);
return Err(e); return Err(e);
} }
// We are done with bootstrap, release the lock, return the timeline. // We are done with bootstrap, release the lock, return the timeline.
// {} block forces release before .await // {} block forces release before .await
} }
timeline.update_status_notify().await?;
timeline.wal_backup_launcher_tx.send(timeline.ttid).await?; timeline.wal_backup_launcher_tx.send(timeline.ttid).await?;
Ok(timeline) Ok(timeline)
} }