Compare commits

...

2 Commits

Author SHA1 Message Date
Anna Khanova
136ed19387 Test 2024-03-27 13:42:33 +01:00
Christian Schwarz
cdf12ed008 fix(walreceiver): Timeline::shutdown can leave a dangling handle_walreceiver_connection tokio task (#7235)
# Problem

As pointed out through doc-comments in this PR, `drop_old_connection` is
not cancellation-safe.

This means we can leave a `handle_walreceiver_connection` tokio task
dangling during Timeline shutdown.

More details described in the corresponding issue #7062.

# Solution

Don't cancel-by-drop the `connection_manager_loop_step` from the
`tokio::select!()` in the task_mgr task.
Instead, transform the code to use a `CancellationToken` ---
specifically, `task_mgr::shutdown_token()` --- and make code responsive
to it.

The `drop_old_connection()` is still not cancellation-safe and also
doesn't get a cancellation token, because there's no point inside the
function where we could return early if cancellation were requested
using a token.

We rely on the `handle_walreceiver_connection` to be sensitive to the
`TaskHandle`s cancellation token (argument name: `cancellation`).
Currently it checks for `cancellation` on each WAL message. It is
probably also sensitive to `Timeline::cancel` because ultimately all
that `handle_walreceiver_connection` does is interact with the
`Timeline`.

In summary, the above means that the following code (which is found in
`Timeline::shutdown`) now might **take longer**, but actually ensures
that all `handle_walreceiver_connection` tasks are finished:

```rust
task_mgr::shutdown_tasks(
    Some(TaskKind::WalReceiverManager),
    Some(self.tenant_shard_id),
    Some(self.timeline_id)
)
```

# Refs

refs #7062
2024-03-27 12:04:31 +01:00
3 changed files with 72 additions and 43 deletions

View File

@@ -33,11 +33,9 @@ use crate::tenant::timeline::walreceiver::connection_manager::{
use pageserver_api::shard::TenantShardId; use pageserver_api::shard::TenantShardId;
use std::future::Future; use std::future::Future;
use std::num::NonZeroU64; use std::num::NonZeroU64;
use std::ops::ControlFlow;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use storage_broker::BrokerClientChannel; use storage_broker::BrokerClientChannel;
use tokio::select;
use tokio::sync::watch; use tokio::sync::watch;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::*; use tracing::*;
@@ -91,31 +89,27 @@ impl WalReceiver {
async move { async move {
debug_assert_current_span_has_tenant_and_timeline_id(); debug_assert_current_span_has_tenant_and_timeline_id();
debug!("WAL receiver manager started, connecting to broker"); debug!("WAL receiver manager started, connecting to broker");
let cancel = task_mgr::shutdown_token();
let mut connection_manager_state = ConnectionManagerState::new( let mut connection_manager_state = ConnectionManagerState::new(
timeline, timeline,
conf, conf,
); );
loop { while !cancel.is_cancelled() {
select! { let loop_step_result = connection_manager_loop_step(
_ = task_mgr::shutdown_watcher() => { &mut broker_client,
trace!("WAL receiver shutdown requested, shutting down"); &mut connection_manager_state,
&walreceiver_ctx,
&cancel,
&loop_status,
).await;
match loop_step_result {
Ok(()) => continue,
Err(_cancelled) => {
trace!("Connection manager loop ended, shutting down");
break; break;
}, }
loop_step_result = connection_manager_loop_step(
&mut broker_client,
&mut connection_manager_state,
&walreceiver_ctx,
&loop_status,
) => match loop_step_result {
ControlFlow::Continue(()) => continue,
ControlFlow::Break(()) => {
trace!("Connection manager loop ended, shutting down");
break;
}
},
} }
} }
connection_manager_state.shutdown().await; connection_manager_state.shutdown().await;
*loop_status.write().unwrap() = None; *loop_status.write().unwrap() = None;
Ok(()) Ok(())
@@ -197,6 +191,9 @@ impl<E: Clone> TaskHandle<E> {
} }
} }
/// # Cancel-Safety
///
/// Cancellation-safe.
async fn next_task_event(&mut self) -> TaskEvent<E> { async fn next_task_event(&mut self) -> TaskEvent<E> {
match self.events_receiver.changed().await { match self.events_receiver.changed().await {
Ok(()) => TaskEvent::Update((self.events_receiver.borrow()).clone()), Ok(()) => TaskEvent::Update((self.events_receiver.borrow()).clone()),

View File

@@ -17,7 +17,7 @@ use crate::metrics::{
WALRECEIVER_ACTIVE_MANAGERS, WALRECEIVER_BROKER_UPDATES, WALRECEIVER_CANDIDATES_ADDED, WALRECEIVER_ACTIVE_MANAGERS, WALRECEIVER_BROKER_UPDATES, WALRECEIVER_CANDIDATES_ADDED,
WALRECEIVER_CANDIDATES_REMOVED, WALRECEIVER_SWITCHES, WALRECEIVER_CANDIDATES_REMOVED, WALRECEIVER_SWITCHES,
}; };
use crate::task_mgr::{shutdown_token, TaskKind}; use crate::task_mgr::TaskKind;
use crate::tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline}; use crate::tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline};
use anyhow::Context; use anyhow::Context;
use chrono::{NaiveDateTime, Utc}; use chrono::{NaiveDateTime, Utc};
@@ -27,7 +27,7 @@ use storage_broker::proto::SafekeeperTimelineInfo;
use storage_broker::proto::SubscribeSafekeeperInfoRequest; use storage_broker::proto::SubscribeSafekeeperInfoRequest;
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId; use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
use storage_broker::{BrokerClientChannel, Code, Streaming}; use storage_broker::{BrokerClientChannel, Code, Streaming};
use tokio::select; use tokio_util::sync::CancellationToken;
use tracing::*; use tracing::*;
use postgres_connection::PgConnectionConfig; use postgres_connection::PgConnectionConfig;
@@ -45,27 +45,33 @@ use super::{
TaskEvent, TaskHandle, TaskEvent, TaskHandle,
}; };
pub(crate) struct Cancelled;
/// Attempts to subscribe for timeline updates, pushed by safekeepers into the broker. /// Attempts to subscribe for timeline updates, pushed by safekeepers into the broker.
/// Based on the updates, desides whether to start, keep or stop a WAL receiver task. /// Based on the updates, desides whether to start, keep or stop a WAL receiver task.
/// If storage broker subscription is cancelled, exits. /// If storage broker subscription is cancelled, exits.
///
/// # Cancel-Safety
///
/// Not cancellation-safe. Use `cancel` token to request cancellation.
pub(super) async fn connection_manager_loop_step( pub(super) async fn connection_manager_loop_step(
broker_client: &mut BrokerClientChannel, broker_client: &mut BrokerClientChannel,
connection_manager_state: &mut ConnectionManagerState, connection_manager_state: &mut ConnectionManagerState,
ctx: &RequestContext, ctx: &RequestContext,
cancel: &CancellationToken,
manager_status: &std::sync::RwLock<Option<ConnectionManagerStatus>>, manager_status: &std::sync::RwLock<Option<ConnectionManagerStatus>>,
) -> ControlFlow<(), ()> { ) -> Result<(), Cancelled> {
match connection_manager_state match tokio::select! {
.timeline _ = cancel.cancelled() => { return Err(Cancelled); },
.wait_to_become_active(ctx) st = connection_manager_state.timeline.wait_to_become_active(ctx) => { st }
.await } {
{
Ok(()) => {} Ok(()) => {}
Err(new_state) => { Err(new_state) => {
debug!( debug!(
?new_state, ?new_state,
"state changed, stopping wal connection manager loop" "state changed, stopping wal connection manager loop"
); );
return ControlFlow::Break(()); return Err(Cancelled);
} }
} }
@@ -86,7 +92,7 @@ pub(super) async fn connection_manager_loop_step(
// Subscribe to the broker updates. Stream shares underlying TCP connection // Subscribe to the broker updates. Stream shares underlying TCP connection
// with other streams on this client (other connection managers). When // with other streams on this client (other connection managers). When
// object goes out of scope, stream finishes in drop() automatically. // object goes out of scope, stream finishes in drop() automatically.
let mut broker_subscription = subscribe_for_timeline_updates(broker_client, id).await; let mut broker_subscription = subscribe_for_timeline_updates(broker_client, id, cancel).await?;
debug!("Subscribed for broker timeline updates"); debug!("Subscribed for broker timeline updates");
loop { loop {
@@ -94,6 +100,7 @@ pub(super) async fn connection_manager_loop_step(
// These things are happening concurrently: // These things are happening concurrently:
// //
// - cancellation request
// - keep receiving WAL on the current connection // - keep receiving WAL on the current connection
// - if the shared state says we need to change connection, disconnect and return // - if the shared state says we need to change connection, disconnect and return
// - this runs in a separate task and we receive updates via a watch channel // - this runs in a separate task and we receive updates via a watch channel
@@ -101,7 +108,11 @@ pub(super) async fn connection_manager_loop_step(
// - receive updates from broker // - receive updates from broker
// - this might change the current desired connection // - this might change the current desired connection
// - timeline state changes to something that does not allow walreceiver to run concurrently // - timeline state changes to something that does not allow walreceiver to run concurrently
select! {
// NB: make sure each of the select expressions are cancellation-safe
// (no need for arms to be cancellation-safe).
tokio::select! {
_ = cancel.cancelled() => { return Err(Cancelled); }
Some(wal_connection_update) = async { Some(wal_connection_update) = async {
match connection_manager_state.wal_connection.as_mut() { match connection_manager_state.wal_connection.as_mut() {
Some(wal_connection) => Some(wal_connection.connection_task.next_task_event().await), Some(wal_connection) => Some(wal_connection.connection_task.next_task_event().await),
@@ -133,7 +144,7 @@ pub(super) async fn connection_manager_loop_step(
}, },
// Got a new update from the broker // Got a new update from the broker
broker_update = broker_subscription.message() => { broker_update = broker_subscription.message() /* TODO: review cancellation-safety */ => {
match broker_update { match broker_update {
Ok(Some(broker_update)) => connection_manager_state.register_timeline_update(broker_update), Ok(Some(broker_update)) => connection_manager_state.register_timeline_update(broker_update),
Err(status) => { Err(status) => {
@@ -147,16 +158,17 @@ pub(super) async fn connection_manager_loop_step(
warn!("broker subscription failed: {status}"); warn!("broker subscription failed: {status}");
} }
} }
return ControlFlow::Continue(()); return Ok(());
} }
Ok(None) => { Ok(None) => {
error!("broker subscription stream ended"); // can't happen error!("broker subscription stream ended"); // can't happen
return ControlFlow::Continue(()); return Ok(());
} }
} }
}, },
new_event = async { new_event = async {
// Reminder: this match arm needs to be cancellation-safe.
loop { loop {
if connection_manager_state.timeline.current_state() == TimelineState::Loading { if connection_manager_state.timeline.current_state() == TimelineState::Loading {
warn!("wal connection manager should only be launched after timeline has become active"); warn!("wal connection manager should only be launched after timeline has become active");
@@ -182,11 +194,11 @@ pub(super) async fn connection_manager_loop_step(
} }
} => match new_event { } => match new_event {
ControlFlow::Continue(()) => { ControlFlow::Continue(()) => {
return ControlFlow::Continue(()); return Ok(());
} }
ControlFlow::Break(()) => { ControlFlow::Break(()) => {
debug!("Timeline is no longer active, stopping wal connection manager loop"); debug!("Timeline is no longer active, stopping wal connection manager loop");
return ControlFlow::Break(()); return Err(Cancelled);
} }
}, },
@@ -218,16 +230,15 @@ pub(super) async fn connection_manager_loop_step(
async fn subscribe_for_timeline_updates( async fn subscribe_for_timeline_updates(
broker_client: &mut BrokerClientChannel, broker_client: &mut BrokerClientChannel,
id: TenantTimelineId, id: TenantTimelineId,
) -> Streaming<SafekeeperTimelineInfo> { cancel: &CancellationToken,
) -> Result<Streaming<SafekeeperTimelineInfo>, Cancelled> {
let mut attempt = 0; let mut attempt = 0;
let cancel = shutdown_token();
loop { loop {
exponential_backoff( exponential_backoff(
attempt, attempt,
DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_BASE_BACKOFF_SECONDS,
DEFAULT_MAX_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
&cancel, cancel,
) )
.await; .await;
attempt += 1; attempt += 1;
@@ -241,9 +252,14 @@ async fn subscribe_for_timeline_updates(
subscription_key: Some(key), subscription_key: Some(key),
}; };
match broker_client.subscribe_safekeeper_info(request).await { match {
tokio::select! {
r = broker_client.subscribe_safekeeper_info(request) => { r }
_ = cancel.cancelled() => { return Err(Cancelled); }
}
} {
Ok(resp) => { Ok(resp) => {
return resp.into_inner(); return Ok(resp.into_inner());
} }
Err(e) => { Err(e) => {
// Safekeeper nodes can stop pushing timeline updates to the broker, when no new writes happen and // Safekeeper nodes can stop pushing timeline updates to the broker, when no new writes happen and
@@ -486,6 +502,10 @@ impl ConnectionManagerState {
/// Drops the current connection (if any) and updates retry timeout for the next /// Drops the current connection (if any) and updates retry timeout for the next
/// connection attempt to the same safekeeper. /// connection attempt to the same safekeeper.
///
/// # Cancel-Safety
///
/// Not cancellation-safe.
async fn drop_old_connection(&mut self, needs_shutdown: bool) { async fn drop_old_connection(&mut self, needs_shutdown: bool) {
let wal_connection = match self.wal_connection.take() { let wal_connection = match self.wal_connection.take() {
Some(wal_connection) => wal_connection, Some(wal_connection) => wal_connection,
@@ -493,7 +513,14 @@ impl ConnectionManagerState {
}; };
if needs_shutdown { if needs_shutdown {
wal_connection.connection_task.shutdown().await; wal_connection
.connection_task
.shutdown()
// This here is why this function isn't cancellation-safe.
// If we got cancelled here, then self.wal_connection is already None and we lose track of the task.
// Even if our caller diligently calls Self::shutdown(), it will find a self.wal_connection=None
// and thus be ineffective.
.await;
} }
let retry = self let retry = self
@@ -838,6 +865,9 @@ impl ConnectionManagerState {
} }
} }
/// # Cancel-Safety
///
/// Not cancellation-safe.
pub(super) async fn shutdown(mut self) { pub(super) async fn shutdown(mut self) {
if let Some(wal_connection) = self.wal_connection.take() { if let Some(wal_connection) = self.wal_connection.take() {
wal_connection.connection_task.shutdown().await; wal_connection.connection_task.shutdown().await;

View File

@@ -75,6 +75,7 @@ impl Api {
let start = Instant::now(); let start = Instant::now();
let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Cplane); let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Cplane);
let response = self.endpoint.execute(request).await?; let response = self.endpoint.execute(request).await?;
info!("received http response {response:?}");
drop(pause); drop(pause);
info!(duration = ?start.elapsed(), "received http response"); info!(duration = ?start.elapsed(), "received http response");
let body = match parse_body::<GetRoleSecret>(response).await { let body = match parse_body::<GetRoleSecret>(response).await {
@@ -137,6 +138,7 @@ impl Api {
let start = Instant::now(); let start = Instant::now();
let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Cplane); let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Cplane);
let response = self.endpoint.execute(request).await?; let response = self.endpoint.execute(request).await?;
info!("received http response {response:?}");
drop(pause); drop(pause);
info!(duration = ?start.elapsed(), "received http response"); info!(duration = ?start.elapsed(), "received http response");
let body = parse_body::<WakeCompute>(response).await?; let body = parse_body::<WakeCompute>(response).await?;