mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-03 16:30:38 +00:00
Compare commits
2 Commits
release-pr
...
proxy-cpla
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
136ed19387 | ||
|
|
cdf12ed008 |
@@ -33,11 +33,9 @@ use crate::tenant::timeline::walreceiver::connection_manager::{
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use std::future::Future;
|
||||
use std::num::NonZeroU64;
|
||||
use std::ops::ControlFlow;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use storage_broker::BrokerClientChannel;
|
||||
use tokio::select;
|
||||
use tokio::sync::watch;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
@@ -91,31 +89,27 @@ impl WalReceiver {
|
||||
async move {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
debug!("WAL receiver manager started, connecting to broker");
|
||||
let cancel = task_mgr::shutdown_token();
|
||||
let mut connection_manager_state = ConnectionManagerState::new(
|
||||
timeline,
|
||||
conf,
|
||||
);
|
||||
loop {
|
||||
select! {
|
||||
_ = task_mgr::shutdown_watcher() => {
|
||||
trace!("WAL receiver shutdown requested, shutting down");
|
||||
while !cancel.is_cancelled() {
|
||||
let loop_step_result = connection_manager_loop_step(
|
||||
&mut broker_client,
|
||||
&mut connection_manager_state,
|
||||
&walreceiver_ctx,
|
||||
&cancel,
|
||||
&loop_status,
|
||||
).await;
|
||||
match loop_step_result {
|
||||
Ok(()) => continue,
|
||||
Err(_cancelled) => {
|
||||
trace!("Connection manager loop ended, shutting down");
|
||||
break;
|
||||
},
|
||||
loop_step_result = connection_manager_loop_step(
|
||||
&mut broker_client,
|
||||
&mut connection_manager_state,
|
||||
&walreceiver_ctx,
|
||||
&loop_status,
|
||||
) => match loop_step_result {
|
||||
ControlFlow::Continue(()) => continue,
|
||||
ControlFlow::Break(()) => {
|
||||
trace!("Connection manager loop ended, shutting down");
|
||||
break;
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
connection_manager_state.shutdown().await;
|
||||
*loop_status.write().unwrap() = None;
|
||||
Ok(())
|
||||
@@ -197,6 +191,9 @@ impl<E: Clone> TaskHandle<E> {
|
||||
}
|
||||
}
|
||||
|
||||
/// # Cancel-Safety
|
||||
///
|
||||
/// Cancellation-safe.
|
||||
async fn next_task_event(&mut self) -> TaskEvent<E> {
|
||||
match self.events_receiver.changed().await {
|
||||
Ok(()) => TaskEvent::Update((self.events_receiver.borrow()).clone()),
|
||||
|
||||
@@ -17,7 +17,7 @@ use crate::metrics::{
|
||||
WALRECEIVER_ACTIVE_MANAGERS, WALRECEIVER_BROKER_UPDATES, WALRECEIVER_CANDIDATES_ADDED,
|
||||
WALRECEIVER_CANDIDATES_REMOVED, WALRECEIVER_SWITCHES,
|
||||
};
|
||||
use crate::task_mgr::{shutdown_token, TaskKind};
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline};
|
||||
use anyhow::Context;
|
||||
use chrono::{NaiveDateTime, Utc};
|
||||
@@ -27,7 +27,7 @@ use storage_broker::proto::SafekeeperTimelineInfo;
|
||||
use storage_broker::proto::SubscribeSafekeeperInfoRequest;
|
||||
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
|
||||
use storage_broker::{BrokerClientChannel, Code, Streaming};
|
||||
use tokio::select;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
|
||||
use postgres_connection::PgConnectionConfig;
|
||||
@@ -45,27 +45,33 @@ use super::{
|
||||
TaskEvent, TaskHandle,
|
||||
};
|
||||
|
||||
pub(crate) struct Cancelled;
|
||||
|
||||
/// Attempts to subscribe for timeline updates, pushed by safekeepers into the broker.
|
||||
/// Based on the updates, desides whether to start, keep or stop a WAL receiver task.
|
||||
/// If storage broker subscription is cancelled, exits.
|
||||
///
|
||||
/// # Cancel-Safety
|
||||
///
|
||||
/// Not cancellation-safe. Use `cancel` token to request cancellation.
|
||||
pub(super) async fn connection_manager_loop_step(
|
||||
broker_client: &mut BrokerClientChannel,
|
||||
connection_manager_state: &mut ConnectionManagerState,
|
||||
ctx: &RequestContext,
|
||||
cancel: &CancellationToken,
|
||||
manager_status: &std::sync::RwLock<Option<ConnectionManagerStatus>>,
|
||||
) -> ControlFlow<(), ()> {
|
||||
match connection_manager_state
|
||||
.timeline
|
||||
.wait_to_become_active(ctx)
|
||||
.await
|
||||
{
|
||||
) -> Result<(), Cancelled> {
|
||||
match tokio::select! {
|
||||
_ = cancel.cancelled() => { return Err(Cancelled); },
|
||||
st = connection_manager_state.timeline.wait_to_become_active(ctx) => { st }
|
||||
} {
|
||||
Ok(()) => {}
|
||||
Err(new_state) => {
|
||||
debug!(
|
||||
?new_state,
|
||||
"state changed, stopping wal connection manager loop"
|
||||
);
|
||||
return ControlFlow::Break(());
|
||||
return Err(Cancelled);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -86,7 +92,7 @@ pub(super) async fn connection_manager_loop_step(
|
||||
// Subscribe to the broker updates. Stream shares underlying TCP connection
|
||||
// with other streams on this client (other connection managers). When
|
||||
// object goes out of scope, stream finishes in drop() automatically.
|
||||
let mut broker_subscription = subscribe_for_timeline_updates(broker_client, id).await;
|
||||
let mut broker_subscription = subscribe_for_timeline_updates(broker_client, id, cancel).await?;
|
||||
debug!("Subscribed for broker timeline updates");
|
||||
|
||||
loop {
|
||||
@@ -94,6 +100,7 @@ pub(super) async fn connection_manager_loop_step(
|
||||
|
||||
// These things are happening concurrently:
|
||||
//
|
||||
// - cancellation request
|
||||
// - keep receiving WAL on the current connection
|
||||
// - if the shared state says we need to change connection, disconnect and return
|
||||
// - this runs in a separate task and we receive updates via a watch channel
|
||||
@@ -101,7 +108,11 @@ pub(super) async fn connection_manager_loop_step(
|
||||
// - receive updates from broker
|
||||
// - this might change the current desired connection
|
||||
// - timeline state changes to something that does not allow walreceiver to run concurrently
|
||||
select! {
|
||||
|
||||
// NB: make sure each of the select expressions are cancellation-safe
|
||||
// (no need for arms to be cancellation-safe).
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => { return Err(Cancelled); }
|
||||
Some(wal_connection_update) = async {
|
||||
match connection_manager_state.wal_connection.as_mut() {
|
||||
Some(wal_connection) => Some(wal_connection.connection_task.next_task_event().await),
|
||||
@@ -133,7 +144,7 @@ pub(super) async fn connection_manager_loop_step(
|
||||
},
|
||||
|
||||
// Got a new update from the broker
|
||||
broker_update = broker_subscription.message() => {
|
||||
broker_update = broker_subscription.message() /* TODO: review cancellation-safety */ => {
|
||||
match broker_update {
|
||||
Ok(Some(broker_update)) => connection_manager_state.register_timeline_update(broker_update),
|
||||
Err(status) => {
|
||||
@@ -147,16 +158,17 @@ pub(super) async fn connection_manager_loop_step(
|
||||
warn!("broker subscription failed: {status}");
|
||||
}
|
||||
}
|
||||
return ControlFlow::Continue(());
|
||||
return Ok(());
|
||||
}
|
||||
Ok(None) => {
|
||||
error!("broker subscription stream ended"); // can't happen
|
||||
return ControlFlow::Continue(());
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
new_event = async {
|
||||
// Reminder: this match arm needs to be cancellation-safe.
|
||||
loop {
|
||||
if connection_manager_state.timeline.current_state() == TimelineState::Loading {
|
||||
warn!("wal connection manager should only be launched after timeline has become active");
|
||||
@@ -182,11 +194,11 @@ pub(super) async fn connection_manager_loop_step(
|
||||
}
|
||||
} => match new_event {
|
||||
ControlFlow::Continue(()) => {
|
||||
return ControlFlow::Continue(());
|
||||
return Ok(());
|
||||
}
|
||||
ControlFlow::Break(()) => {
|
||||
debug!("Timeline is no longer active, stopping wal connection manager loop");
|
||||
return ControlFlow::Break(());
|
||||
return Err(Cancelled);
|
||||
}
|
||||
},
|
||||
|
||||
@@ -218,16 +230,15 @@ pub(super) async fn connection_manager_loop_step(
|
||||
async fn subscribe_for_timeline_updates(
|
||||
broker_client: &mut BrokerClientChannel,
|
||||
id: TenantTimelineId,
|
||||
) -> Streaming<SafekeeperTimelineInfo> {
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<Streaming<SafekeeperTimelineInfo>, Cancelled> {
|
||||
let mut attempt = 0;
|
||||
let cancel = shutdown_token();
|
||||
|
||||
loop {
|
||||
exponential_backoff(
|
||||
attempt,
|
||||
DEFAULT_BASE_BACKOFF_SECONDS,
|
||||
DEFAULT_MAX_BACKOFF_SECONDS,
|
||||
&cancel,
|
||||
cancel,
|
||||
)
|
||||
.await;
|
||||
attempt += 1;
|
||||
@@ -241,9 +252,14 @@ async fn subscribe_for_timeline_updates(
|
||||
subscription_key: Some(key),
|
||||
};
|
||||
|
||||
match broker_client.subscribe_safekeeper_info(request).await {
|
||||
match {
|
||||
tokio::select! {
|
||||
r = broker_client.subscribe_safekeeper_info(request) => { r }
|
||||
_ = cancel.cancelled() => { return Err(Cancelled); }
|
||||
}
|
||||
} {
|
||||
Ok(resp) => {
|
||||
return resp.into_inner();
|
||||
return Ok(resp.into_inner());
|
||||
}
|
||||
Err(e) => {
|
||||
// Safekeeper nodes can stop pushing timeline updates to the broker, when no new writes happen and
|
||||
@@ -486,6 +502,10 @@ impl ConnectionManagerState {
|
||||
|
||||
/// Drops the current connection (if any) and updates retry timeout for the next
|
||||
/// connection attempt to the same safekeeper.
|
||||
///
|
||||
/// # Cancel-Safety
|
||||
///
|
||||
/// Not cancellation-safe.
|
||||
async fn drop_old_connection(&mut self, needs_shutdown: bool) {
|
||||
let wal_connection = match self.wal_connection.take() {
|
||||
Some(wal_connection) => wal_connection,
|
||||
@@ -493,7 +513,14 @@ impl ConnectionManagerState {
|
||||
};
|
||||
|
||||
if needs_shutdown {
|
||||
wal_connection.connection_task.shutdown().await;
|
||||
wal_connection
|
||||
.connection_task
|
||||
.shutdown()
|
||||
// This here is why this function isn't cancellation-safe.
|
||||
// If we got cancelled here, then self.wal_connection is already None and we lose track of the task.
|
||||
// Even if our caller diligently calls Self::shutdown(), it will find a self.wal_connection=None
|
||||
// and thus be ineffective.
|
||||
.await;
|
||||
}
|
||||
|
||||
let retry = self
|
||||
@@ -838,6 +865,9 @@ impl ConnectionManagerState {
|
||||
}
|
||||
}
|
||||
|
||||
/// # Cancel-Safety
|
||||
///
|
||||
/// Not cancellation-safe.
|
||||
pub(super) async fn shutdown(mut self) {
|
||||
if let Some(wal_connection) = self.wal_connection.take() {
|
||||
wal_connection.connection_task.shutdown().await;
|
||||
|
||||
@@ -75,6 +75,7 @@ impl Api {
|
||||
let start = Instant::now();
|
||||
let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Cplane);
|
||||
let response = self.endpoint.execute(request).await?;
|
||||
info!("received http response {response:?}");
|
||||
drop(pause);
|
||||
info!(duration = ?start.elapsed(), "received http response");
|
||||
let body = match parse_body::<GetRoleSecret>(response).await {
|
||||
@@ -137,6 +138,7 @@ impl Api {
|
||||
let start = Instant::now();
|
||||
let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Cplane);
|
||||
let response = self.endpoint.execute(request).await?;
|
||||
info!("received http response {response:?}");
|
||||
drop(pause);
|
||||
info!(duration = ?start.elapsed(), "received http response");
|
||||
let body = parse_body::<WakeCompute>(response).await?;
|
||||
|
||||
Reference in New Issue
Block a user