mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-09 06:22:57 +00:00
refactor: make timeline activation infallible (#4319)
Timeline::activate() was only fallible because `launch_wal_receiver` was. `launch_wal_receiver` was fallible only because of some preliminary checks in `WalReceiver::start`. Turns out these checks can be shifted to the type system by delaying creatinon of the `WalReceiver` struct to the point where we activate the timeline. The changes in this PR were enabled by my previous refactoring that funneled the broker_client from pageserver startup to the activate() call sites. Patch series: - #4316 - #4317 - #4318 - #4319
This commit is contained in:
committed by
GitHub
parent
ae805b985d
commit
057cceb559
@@ -266,7 +266,7 @@ impl UninitializedTimeline<'_> {
|
||||
// updated it for the layers that we created during the import.
|
||||
let mut timelines = self.owning_tenant.timelines.lock().unwrap();
|
||||
let tl = self.initialize_with_lock(ctx, &mut timelines, false)?;
|
||||
tl.activate(broker_client, ctx)?;
|
||||
tl.activate(broker_client, ctx);
|
||||
Ok(tl)
|
||||
}
|
||||
|
||||
@@ -1333,7 +1333,7 @@ impl Tenant {
|
||||
}
|
||||
};
|
||||
|
||||
loaded_timeline.activate(broker_client, ctx)?;
|
||||
loaded_timeline.activate(broker_client, ctx);
|
||||
|
||||
if let Some(remote_client) = loaded_timeline.remote_client.as_ref() {
|
||||
// Wait for the upload of the 'index_part.json` file to finish, so that when we return
|
||||
@@ -1481,7 +1481,10 @@ impl Tenant {
|
||||
|
||||
// Stop the walreceiver first.
|
||||
debug!("waiting for wal receiver to shutdown");
|
||||
timeline.walreceiver.stop().await;
|
||||
let maybe_started_walreceiver = { timeline.walreceiver.lock().unwrap().take() };
|
||||
if let Some(walreceiver) = maybe_started_walreceiver {
|
||||
walreceiver.stop().await;
|
||||
}
|
||||
debug!("wal receiver shutdown confirmed");
|
||||
|
||||
// Prevent new uploads from starting.
|
||||
@@ -1678,30 +1681,10 @@ impl Tenant {
|
||||
tasks::start_background_loops(self);
|
||||
|
||||
let mut activated_timelines = 0;
|
||||
let mut timelines_broken_during_activation = 0;
|
||||
|
||||
for timeline in not_broken_timelines {
|
||||
match timeline
|
||||
.activate(broker_client.clone(), ctx)
|
||||
.context("timeline activation for activating tenant")
|
||||
{
|
||||
Ok(()) => {
|
||||
activated_timelines += 1;
|
||||
}
|
||||
Err(e) => {
|
||||
error!(
|
||||
"Failed to activate timeline {}: {:#}",
|
||||
timeline.timeline_id, e
|
||||
);
|
||||
timeline.set_state(TimelineState::Broken);
|
||||
*current_state = TenantState::broken_from_reason(format!(
|
||||
"failed to activate timeline {}: {}",
|
||||
timeline.timeline_id, e
|
||||
));
|
||||
|
||||
timelines_broken_during_activation += 1;
|
||||
}
|
||||
}
|
||||
timeline.activate(broker_client.clone(), ctx);
|
||||
activated_timelines += 1;
|
||||
}
|
||||
|
||||
let elapsed = self.loading_started_at.elapsed();
|
||||
@@ -1713,7 +1696,6 @@ impl Tenant {
|
||||
since_creation_millis = elapsed.as_millis(),
|
||||
tenant_id = %self.tenant_id,
|
||||
activated_timelines,
|
||||
timelines_broken_during_activation,
|
||||
total_timelines,
|
||||
post_state = <&'static str>::from(&*current_state),
|
||||
"activation attempt finished"
|
||||
|
||||
@@ -226,7 +226,7 @@ pub struct Timeline {
|
||||
/// or None if WAL receiver has not received anything for this timeline
|
||||
/// yet.
|
||||
pub last_received_wal: Mutex<Option<WalReceiverInfo>>,
|
||||
pub walreceiver: WalReceiver,
|
||||
pub walreceiver: Mutex<Option<WalReceiver>>,
|
||||
|
||||
/// Relation size cache
|
||||
pub rel_size_cache: RwLock<HashMap<RelTag, (Lsn, BlockNumber)>>,
|
||||
@@ -621,17 +621,27 @@ impl Timeline {
|
||||
.await
|
||||
{
|
||||
Ok(()) => Ok(()),
|
||||
seqwait_error => {
|
||||
Err(e) => {
|
||||
// don't count the time spent waiting for lock below, and also in walreceiver.status(), towards the wait_lsn_time_histo
|
||||
drop(_timer);
|
||||
let walreceiver_status = self.walreceiver.status().await;
|
||||
seqwait_error.with_context(|| format!(
|
||||
"Timed out while waiting for WAL record at LSN {} to arrive, last_record_lsn {} disk consistent LSN={}, {}",
|
||||
lsn,
|
||||
self.get_last_record_lsn(),
|
||||
self.get_disk_consistent_lsn(),
|
||||
walreceiver_status.map(|status| status.to_human_readable_string())
|
||||
.unwrap_or_else(|| "WalReceiver status: Not active".to_string()),
|
||||
))
|
||||
let walreceiver_status = {
|
||||
match &*self.walreceiver.lock().unwrap() {
|
||||
None => "stopping or stopped".to_string(),
|
||||
Some(walreceiver) => match walreceiver.status() {
|
||||
Some(status) => status.to_human_readable_string(),
|
||||
None => "Not active".to_string(),
|
||||
},
|
||||
}
|
||||
};
|
||||
Err(anyhow::Error::new(e).context({
|
||||
format!(
|
||||
"Timed out while waiting for WAL record at LSN {} to arrive, last_record_lsn {} disk consistent LSN={}, WalReceiver status: {}",
|
||||
lsn,
|
||||
self.get_last_record_lsn(),
|
||||
self.get_disk_consistent_lsn(),
|
||||
walreceiver_status,
|
||||
)
|
||||
}))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -906,15 +916,10 @@ impl Timeline {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn activate(
|
||||
self: &Arc<Self>,
|
||||
broker_client: BrokerClientChannel,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
self.launch_wal_receiver(ctx, broker_client)?;
|
||||
pub fn activate(self: &Arc<Self>, broker_client: BrokerClientChannel, ctx: &RequestContext) {
|
||||
self.launch_wal_receiver(ctx, broker_client);
|
||||
self.set_state(TimelineState::Active);
|
||||
self.launch_eviction_task();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn set_state(&self, new_state: TimelineState) {
|
||||
@@ -1323,15 +1328,7 @@ impl Timeline {
|
||||
let (layer_flush_done_tx, _) = tokio::sync::watch::channel((0, Ok(())));
|
||||
|
||||
let tenant_conf_guard = tenant_conf.read().unwrap();
|
||||
let wal_connect_timeout = tenant_conf_guard
|
||||
.walreceiver_connect_timeout
|
||||
.unwrap_or(conf.default_tenant_conf.walreceiver_connect_timeout);
|
||||
let lagging_wal_timeout = tenant_conf_guard
|
||||
.lagging_wal_timeout
|
||||
.unwrap_or(conf.default_tenant_conf.lagging_wal_timeout);
|
||||
let max_lsn_wal_lag = tenant_conf_guard
|
||||
.max_lsn_wal_lag
|
||||
.unwrap_or(conf.default_tenant_conf.max_lsn_wal_lag);
|
||||
|
||||
let evictions_low_residence_duration_metric_threshold =
|
||||
Self::get_evictions_low_residence_duration_metric_threshold(
|
||||
&tenant_conf_guard,
|
||||
@@ -1340,18 +1337,6 @@ impl Timeline {
|
||||
drop(tenant_conf_guard);
|
||||
|
||||
Arc::new_cyclic(|myself| {
|
||||
let walreceiver = WalReceiver::new(
|
||||
TenantTimelineId::new(tenant_id, timeline_id),
|
||||
Weak::clone(myself),
|
||||
WalReceiverConf {
|
||||
wal_connect_timeout,
|
||||
lagging_wal_timeout,
|
||||
max_lsn_wal_lag,
|
||||
auth_token: crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(),
|
||||
availability_zone: conf.availability_zone.clone(),
|
||||
},
|
||||
);
|
||||
|
||||
let mut result = Timeline {
|
||||
conf,
|
||||
tenant_conf,
|
||||
@@ -1363,7 +1348,7 @@ impl Timeline {
|
||||
wanted_image_layers: Mutex::new(None),
|
||||
|
||||
walredo_mgr,
|
||||
walreceiver,
|
||||
walreceiver: Mutex::new(None),
|
||||
|
||||
remote_client: remote_client.map(Arc::new),
|
||||
|
||||
@@ -1483,17 +1468,49 @@ impl Timeline {
|
||||
*flush_loop_state = FlushLoopState::Running;
|
||||
}
|
||||
|
||||
pub(super) fn launch_wal_receiver(
|
||||
&self,
|
||||
/// Creates and starts the wal receiver.
|
||||
///
|
||||
/// This function is expected to be called at most once per Timeline's lifecycle
|
||||
/// when the timeline is activated.
|
||||
fn launch_wal_receiver(
|
||||
self: &Arc<Self>,
|
||||
ctx: &RequestContext,
|
||||
broker_client: BrokerClientChannel,
|
||||
) -> anyhow::Result<()> {
|
||||
) {
|
||||
info!(
|
||||
"launching WAL receiver for timeline {} of tenant {}",
|
||||
self.timeline_id, self.tenant_id
|
||||
);
|
||||
self.walreceiver.start(ctx, broker_client)?;
|
||||
Ok(())
|
||||
|
||||
let tenant_conf_guard = self.tenant_conf.read().unwrap();
|
||||
let wal_connect_timeout = tenant_conf_guard
|
||||
.walreceiver_connect_timeout
|
||||
.unwrap_or(self.conf.default_tenant_conf.walreceiver_connect_timeout);
|
||||
let lagging_wal_timeout = tenant_conf_guard
|
||||
.lagging_wal_timeout
|
||||
.unwrap_or(self.conf.default_tenant_conf.lagging_wal_timeout);
|
||||
let max_lsn_wal_lag = tenant_conf_guard
|
||||
.max_lsn_wal_lag
|
||||
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag);
|
||||
drop(tenant_conf_guard);
|
||||
|
||||
let mut guard = self.walreceiver.lock().unwrap();
|
||||
assert!(
|
||||
guard.is_none(),
|
||||
"multiple launches / re-launches of WAL receiver are not supported"
|
||||
);
|
||||
*guard = Some(WalReceiver::start(
|
||||
Arc::clone(self),
|
||||
WalReceiverConf {
|
||||
wal_connect_timeout,
|
||||
lagging_wal_timeout,
|
||||
max_lsn_wal_lag,
|
||||
auth_token: crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(),
|
||||
availability_zone: self.conf.availability_zone.clone(),
|
||||
},
|
||||
broker_client,
|
||||
ctx,
|
||||
));
|
||||
}
|
||||
|
||||
///
|
||||
|
||||
@@ -29,16 +29,14 @@ use crate::tenant::timeline::walreceiver::connection_manager::{
|
||||
connection_manager_loop_step, ConnectionManagerState,
|
||||
};
|
||||
|
||||
use anyhow::Context;
|
||||
use std::future::Future;
|
||||
use std::num::NonZeroU64;
|
||||
use std::ops::ControlFlow;
|
||||
use std::sync::atomic::{self, AtomicBool};
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use storage_broker::BrokerClientChannel;
|
||||
use tokio::select;
|
||||
use tokio::sync::{watch, RwLock};
|
||||
use tokio::sync::watch;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
|
||||
@@ -62,46 +60,23 @@ pub struct WalReceiverConf {
|
||||
|
||||
pub struct WalReceiver {
|
||||
timeline: TenantTimelineId,
|
||||
timeline_ref: Weak<Timeline>,
|
||||
conf: WalReceiverConf,
|
||||
started: AtomicBool,
|
||||
manager_status: Arc<RwLock<Option<ConnectionManagerStatus>>>,
|
||||
manager_status: Arc<std::sync::RwLock<Option<ConnectionManagerStatus>>>,
|
||||
}
|
||||
|
||||
impl WalReceiver {
|
||||
pub fn new(
|
||||
timeline: TenantTimelineId,
|
||||
timeline_ref: Weak<Timeline>,
|
||||
conf: WalReceiverConf,
|
||||
) -> Self {
|
||||
Self {
|
||||
timeline,
|
||||
timeline_ref,
|
||||
conf,
|
||||
started: AtomicBool::new(false),
|
||||
manager_status: Arc::new(RwLock::new(None)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn start(
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
timeline: Arc<Timeline>,
|
||||
conf: WalReceiverConf,
|
||||
mut broker_client: BrokerClientChannel,
|
||||
) -> anyhow::Result<()> {
|
||||
if self.started.load(atomic::Ordering::Acquire) {
|
||||
anyhow::bail!("Wal receiver is already started");
|
||||
}
|
||||
|
||||
let timeline = self.timeline_ref.upgrade().with_context(|| {
|
||||
format!("walreceiver start on a dropped timeline {}", self.timeline)
|
||||
})?;
|
||||
|
||||
ctx: &RequestContext,
|
||||
) -> Self {
|
||||
let tenant_id = timeline.tenant_id;
|
||||
let timeline_id = timeline.timeline_id;
|
||||
let walreceiver_ctx =
|
||||
ctx.detached_child(TaskKind::WalReceiverManager, DownloadBehavior::Error);
|
||||
let wal_receiver_conf = self.conf.clone();
|
||||
let loop_status = Arc::clone(&self.manager_status);
|
||||
|
||||
let loop_status = Arc::new(std::sync::RwLock::new(None));
|
||||
let manager_status = Arc::clone(&loop_status);
|
||||
task_mgr::spawn(
|
||||
WALRECEIVER_RUNTIME.handle(),
|
||||
TaskKind::WalReceiverManager,
|
||||
@@ -113,7 +88,7 @@ impl WalReceiver {
|
||||
info!("WAL receiver manager started, connecting to broker");
|
||||
let mut connection_manager_state = ConnectionManagerState::new(
|
||||
timeline,
|
||||
wal_receiver_conf,
|
||||
conf,
|
||||
);
|
||||
loop {
|
||||
select! {
|
||||
@@ -137,29 +112,29 @@ impl WalReceiver {
|
||||
}
|
||||
|
||||
connection_manager_state.shutdown().await;
|
||||
*loop_status.write().await = None;
|
||||
*loop_status.write().unwrap() = None;
|
||||
Ok(())
|
||||
}
|
||||
.instrument(info_span!(parent: None, "wal_connection_manager", tenant = %tenant_id, timeline = %timeline_id))
|
||||
);
|
||||
|
||||
self.started.store(true, atomic::Ordering::Release);
|
||||
|
||||
Ok(())
|
||||
Self {
|
||||
timeline: TenantTimelineId::new(tenant_id, timeline_id),
|
||||
manager_status,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn stop(&self) {
|
||||
pub async fn stop(self) {
|
||||
task_mgr::shutdown_tasks(
|
||||
Some(TaskKind::WalReceiverManager),
|
||||
Some(self.timeline.tenant_id),
|
||||
Some(self.timeline.timeline_id),
|
||||
)
|
||||
.await;
|
||||
self.started.store(false, atomic::Ordering::Release);
|
||||
}
|
||||
|
||||
pub(super) async fn status(&self) -> Option<ConnectionManagerStatus> {
|
||||
self.manager_status.read().await.clone()
|
||||
pub(super) fn status(&self) -> Option<ConnectionManagerStatus> {
|
||||
self.manager_status.read().unwrap().clone()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -29,7 +29,6 @@ use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
|
||||
use storage_broker::BrokerClientChannel;
|
||||
use storage_broker::Streaming;
|
||||
use tokio::select;
|
||||
use tokio::sync::RwLock;
|
||||
use tracing::*;
|
||||
|
||||
use crate::{exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS};
|
||||
@@ -48,7 +47,7 @@ pub(super) async fn connection_manager_loop_step(
|
||||
broker_client: &mut BrokerClientChannel,
|
||||
connection_manager_state: &mut ConnectionManagerState,
|
||||
ctx: &RequestContext,
|
||||
manager_status: &RwLock<Option<ConnectionManagerStatus>>,
|
||||
manager_status: &std::sync::RwLock<Option<ConnectionManagerStatus>>,
|
||||
) -> ControlFlow<(), ()> {
|
||||
match connection_manager_state
|
||||
.timeline
|
||||
@@ -195,7 +194,7 @@ pub(super) async fn connection_manager_loop_step(
|
||||
.change_connection(new_candidate, ctx)
|
||||
.await
|
||||
}
|
||||
*manager_status.write().await = Some(connection_manager_state.manager_status());
|
||||
*manager_status.write().unwrap() = Some(connection_manager_state.manager_status());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user