mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-04 00:40:38 +00:00
Compare commits
2 Commits
release-pr
...
proxy-cpla
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
136ed19387 | ||
|
|
cdf12ed008 |
@@ -33,11 +33,9 @@ use crate::tenant::timeline::walreceiver::connection_manager::{
|
|||||||
use pageserver_api::shard::TenantShardId;
|
use pageserver_api::shard::TenantShardId;
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
use std::num::NonZeroU64;
|
use std::num::NonZeroU64;
|
||||||
use std::ops::ControlFlow;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use storage_broker::BrokerClientChannel;
|
use storage_broker::BrokerClientChannel;
|
||||||
use tokio::select;
|
|
||||||
use tokio::sync::watch;
|
use tokio::sync::watch;
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
use tracing::*;
|
use tracing::*;
|
||||||
@@ -91,31 +89,27 @@ impl WalReceiver {
|
|||||||
async move {
|
async move {
|
||||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||||
debug!("WAL receiver manager started, connecting to broker");
|
debug!("WAL receiver manager started, connecting to broker");
|
||||||
|
let cancel = task_mgr::shutdown_token();
|
||||||
let mut connection_manager_state = ConnectionManagerState::new(
|
let mut connection_manager_state = ConnectionManagerState::new(
|
||||||
timeline,
|
timeline,
|
||||||
conf,
|
conf,
|
||||||
);
|
);
|
||||||
loop {
|
while !cancel.is_cancelled() {
|
||||||
select! {
|
let loop_step_result = connection_manager_loop_step(
|
||||||
_ = task_mgr::shutdown_watcher() => {
|
&mut broker_client,
|
||||||
trace!("WAL receiver shutdown requested, shutting down");
|
&mut connection_manager_state,
|
||||||
|
&walreceiver_ctx,
|
||||||
|
&cancel,
|
||||||
|
&loop_status,
|
||||||
|
).await;
|
||||||
|
match loop_step_result {
|
||||||
|
Ok(()) => continue,
|
||||||
|
Err(_cancelled) => {
|
||||||
|
trace!("Connection manager loop ended, shutting down");
|
||||||
break;
|
break;
|
||||||
},
|
}
|
||||||
loop_step_result = connection_manager_loop_step(
|
|
||||||
&mut broker_client,
|
|
||||||
&mut connection_manager_state,
|
|
||||||
&walreceiver_ctx,
|
|
||||||
&loop_status,
|
|
||||||
) => match loop_step_result {
|
|
||||||
ControlFlow::Continue(()) => continue,
|
|
||||||
ControlFlow::Break(()) => {
|
|
||||||
trace!("Connection manager loop ended, shutting down");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
connection_manager_state.shutdown().await;
|
connection_manager_state.shutdown().await;
|
||||||
*loop_status.write().unwrap() = None;
|
*loop_status.write().unwrap() = None;
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -197,6 +191,9 @@ impl<E: Clone> TaskHandle<E> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// # Cancel-Safety
|
||||||
|
///
|
||||||
|
/// Cancellation-safe.
|
||||||
async fn next_task_event(&mut self) -> TaskEvent<E> {
|
async fn next_task_event(&mut self) -> TaskEvent<E> {
|
||||||
match self.events_receiver.changed().await {
|
match self.events_receiver.changed().await {
|
||||||
Ok(()) => TaskEvent::Update((self.events_receiver.borrow()).clone()),
|
Ok(()) => TaskEvent::Update((self.events_receiver.borrow()).clone()),
|
||||||
|
|||||||
@@ -17,7 +17,7 @@ use crate::metrics::{
|
|||||||
WALRECEIVER_ACTIVE_MANAGERS, WALRECEIVER_BROKER_UPDATES, WALRECEIVER_CANDIDATES_ADDED,
|
WALRECEIVER_ACTIVE_MANAGERS, WALRECEIVER_BROKER_UPDATES, WALRECEIVER_CANDIDATES_ADDED,
|
||||||
WALRECEIVER_CANDIDATES_REMOVED, WALRECEIVER_SWITCHES,
|
WALRECEIVER_CANDIDATES_REMOVED, WALRECEIVER_SWITCHES,
|
||||||
};
|
};
|
||||||
use crate::task_mgr::{shutdown_token, TaskKind};
|
use crate::task_mgr::TaskKind;
|
||||||
use crate::tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline};
|
use crate::tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline};
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use chrono::{NaiveDateTime, Utc};
|
use chrono::{NaiveDateTime, Utc};
|
||||||
@@ -27,7 +27,7 @@ use storage_broker::proto::SafekeeperTimelineInfo;
|
|||||||
use storage_broker::proto::SubscribeSafekeeperInfoRequest;
|
use storage_broker::proto::SubscribeSafekeeperInfoRequest;
|
||||||
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
|
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
|
||||||
use storage_broker::{BrokerClientChannel, Code, Streaming};
|
use storage_broker::{BrokerClientChannel, Code, Streaming};
|
||||||
use tokio::select;
|
use tokio_util::sync::CancellationToken;
|
||||||
use tracing::*;
|
use tracing::*;
|
||||||
|
|
||||||
use postgres_connection::PgConnectionConfig;
|
use postgres_connection::PgConnectionConfig;
|
||||||
@@ -45,27 +45,33 @@ use super::{
|
|||||||
TaskEvent, TaskHandle,
|
TaskEvent, TaskHandle,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
pub(crate) struct Cancelled;
|
||||||
|
|
||||||
/// Attempts to subscribe for timeline updates, pushed by safekeepers into the broker.
|
/// Attempts to subscribe for timeline updates, pushed by safekeepers into the broker.
|
||||||
/// Based on the updates, desides whether to start, keep or stop a WAL receiver task.
|
/// Based on the updates, desides whether to start, keep or stop a WAL receiver task.
|
||||||
/// If storage broker subscription is cancelled, exits.
|
/// If storage broker subscription is cancelled, exits.
|
||||||
|
///
|
||||||
|
/// # Cancel-Safety
|
||||||
|
///
|
||||||
|
/// Not cancellation-safe. Use `cancel` token to request cancellation.
|
||||||
pub(super) async fn connection_manager_loop_step(
|
pub(super) async fn connection_manager_loop_step(
|
||||||
broker_client: &mut BrokerClientChannel,
|
broker_client: &mut BrokerClientChannel,
|
||||||
connection_manager_state: &mut ConnectionManagerState,
|
connection_manager_state: &mut ConnectionManagerState,
|
||||||
ctx: &RequestContext,
|
ctx: &RequestContext,
|
||||||
|
cancel: &CancellationToken,
|
||||||
manager_status: &std::sync::RwLock<Option<ConnectionManagerStatus>>,
|
manager_status: &std::sync::RwLock<Option<ConnectionManagerStatus>>,
|
||||||
) -> ControlFlow<(), ()> {
|
) -> Result<(), Cancelled> {
|
||||||
match connection_manager_state
|
match tokio::select! {
|
||||||
.timeline
|
_ = cancel.cancelled() => { return Err(Cancelled); },
|
||||||
.wait_to_become_active(ctx)
|
st = connection_manager_state.timeline.wait_to_become_active(ctx) => { st }
|
||||||
.await
|
} {
|
||||||
{
|
|
||||||
Ok(()) => {}
|
Ok(()) => {}
|
||||||
Err(new_state) => {
|
Err(new_state) => {
|
||||||
debug!(
|
debug!(
|
||||||
?new_state,
|
?new_state,
|
||||||
"state changed, stopping wal connection manager loop"
|
"state changed, stopping wal connection manager loop"
|
||||||
);
|
);
|
||||||
return ControlFlow::Break(());
|
return Err(Cancelled);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -86,7 +92,7 @@ pub(super) async fn connection_manager_loop_step(
|
|||||||
// Subscribe to the broker updates. Stream shares underlying TCP connection
|
// Subscribe to the broker updates. Stream shares underlying TCP connection
|
||||||
// with other streams on this client (other connection managers). When
|
// with other streams on this client (other connection managers). When
|
||||||
// object goes out of scope, stream finishes in drop() automatically.
|
// object goes out of scope, stream finishes in drop() automatically.
|
||||||
let mut broker_subscription = subscribe_for_timeline_updates(broker_client, id).await;
|
let mut broker_subscription = subscribe_for_timeline_updates(broker_client, id, cancel).await?;
|
||||||
debug!("Subscribed for broker timeline updates");
|
debug!("Subscribed for broker timeline updates");
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
@@ -94,6 +100,7 @@ pub(super) async fn connection_manager_loop_step(
|
|||||||
|
|
||||||
// These things are happening concurrently:
|
// These things are happening concurrently:
|
||||||
//
|
//
|
||||||
|
// - cancellation request
|
||||||
// - keep receiving WAL on the current connection
|
// - keep receiving WAL on the current connection
|
||||||
// - if the shared state says we need to change connection, disconnect and return
|
// - if the shared state says we need to change connection, disconnect and return
|
||||||
// - this runs in a separate task and we receive updates via a watch channel
|
// - this runs in a separate task and we receive updates via a watch channel
|
||||||
@@ -101,7 +108,11 @@ pub(super) async fn connection_manager_loop_step(
|
|||||||
// - receive updates from broker
|
// - receive updates from broker
|
||||||
// - this might change the current desired connection
|
// - this might change the current desired connection
|
||||||
// - timeline state changes to something that does not allow walreceiver to run concurrently
|
// - timeline state changes to something that does not allow walreceiver to run concurrently
|
||||||
select! {
|
|
||||||
|
// NB: make sure each of the select expressions are cancellation-safe
|
||||||
|
// (no need for arms to be cancellation-safe).
|
||||||
|
tokio::select! {
|
||||||
|
_ = cancel.cancelled() => { return Err(Cancelled); }
|
||||||
Some(wal_connection_update) = async {
|
Some(wal_connection_update) = async {
|
||||||
match connection_manager_state.wal_connection.as_mut() {
|
match connection_manager_state.wal_connection.as_mut() {
|
||||||
Some(wal_connection) => Some(wal_connection.connection_task.next_task_event().await),
|
Some(wal_connection) => Some(wal_connection.connection_task.next_task_event().await),
|
||||||
@@ -133,7 +144,7 @@ pub(super) async fn connection_manager_loop_step(
|
|||||||
},
|
},
|
||||||
|
|
||||||
// Got a new update from the broker
|
// Got a new update from the broker
|
||||||
broker_update = broker_subscription.message() => {
|
broker_update = broker_subscription.message() /* TODO: review cancellation-safety */ => {
|
||||||
match broker_update {
|
match broker_update {
|
||||||
Ok(Some(broker_update)) => connection_manager_state.register_timeline_update(broker_update),
|
Ok(Some(broker_update)) => connection_manager_state.register_timeline_update(broker_update),
|
||||||
Err(status) => {
|
Err(status) => {
|
||||||
@@ -147,16 +158,17 @@ pub(super) async fn connection_manager_loop_step(
|
|||||||
warn!("broker subscription failed: {status}");
|
warn!("broker subscription failed: {status}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return ControlFlow::Continue(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
Ok(None) => {
|
Ok(None) => {
|
||||||
error!("broker subscription stream ended"); // can't happen
|
error!("broker subscription stream ended"); // can't happen
|
||||||
return ControlFlow::Continue(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
new_event = async {
|
new_event = async {
|
||||||
|
// Reminder: this match arm needs to be cancellation-safe.
|
||||||
loop {
|
loop {
|
||||||
if connection_manager_state.timeline.current_state() == TimelineState::Loading {
|
if connection_manager_state.timeline.current_state() == TimelineState::Loading {
|
||||||
warn!("wal connection manager should only be launched after timeline has become active");
|
warn!("wal connection manager should only be launched after timeline has become active");
|
||||||
@@ -182,11 +194,11 @@ pub(super) async fn connection_manager_loop_step(
|
|||||||
}
|
}
|
||||||
} => match new_event {
|
} => match new_event {
|
||||||
ControlFlow::Continue(()) => {
|
ControlFlow::Continue(()) => {
|
||||||
return ControlFlow::Continue(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
ControlFlow::Break(()) => {
|
ControlFlow::Break(()) => {
|
||||||
debug!("Timeline is no longer active, stopping wal connection manager loop");
|
debug!("Timeline is no longer active, stopping wal connection manager loop");
|
||||||
return ControlFlow::Break(());
|
return Err(Cancelled);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
@@ -218,16 +230,15 @@ pub(super) async fn connection_manager_loop_step(
|
|||||||
async fn subscribe_for_timeline_updates(
|
async fn subscribe_for_timeline_updates(
|
||||||
broker_client: &mut BrokerClientChannel,
|
broker_client: &mut BrokerClientChannel,
|
||||||
id: TenantTimelineId,
|
id: TenantTimelineId,
|
||||||
) -> Streaming<SafekeeperTimelineInfo> {
|
cancel: &CancellationToken,
|
||||||
|
) -> Result<Streaming<SafekeeperTimelineInfo>, Cancelled> {
|
||||||
let mut attempt = 0;
|
let mut attempt = 0;
|
||||||
let cancel = shutdown_token();
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
exponential_backoff(
|
exponential_backoff(
|
||||||
attempt,
|
attempt,
|
||||||
DEFAULT_BASE_BACKOFF_SECONDS,
|
DEFAULT_BASE_BACKOFF_SECONDS,
|
||||||
DEFAULT_MAX_BACKOFF_SECONDS,
|
DEFAULT_MAX_BACKOFF_SECONDS,
|
||||||
&cancel,
|
cancel,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
attempt += 1;
|
attempt += 1;
|
||||||
@@ -241,9 +252,14 @@ async fn subscribe_for_timeline_updates(
|
|||||||
subscription_key: Some(key),
|
subscription_key: Some(key),
|
||||||
};
|
};
|
||||||
|
|
||||||
match broker_client.subscribe_safekeeper_info(request).await {
|
match {
|
||||||
|
tokio::select! {
|
||||||
|
r = broker_client.subscribe_safekeeper_info(request) => { r }
|
||||||
|
_ = cancel.cancelled() => { return Err(Cancelled); }
|
||||||
|
}
|
||||||
|
} {
|
||||||
Ok(resp) => {
|
Ok(resp) => {
|
||||||
return resp.into_inner();
|
return Ok(resp.into_inner());
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
// Safekeeper nodes can stop pushing timeline updates to the broker, when no new writes happen and
|
// Safekeeper nodes can stop pushing timeline updates to the broker, when no new writes happen and
|
||||||
@@ -486,6 +502,10 @@ impl ConnectionManagerState {
|
|||||||
|
|
||||||
/// Drops the current connection (if any) and updates retry timeout for the next
|
/// Drops the current connection (if any) and updates retry timeout for the next
|
||||||
/// connection attempt to the same safekeeper.
|
/// connection attempt to the same safekeeper.
|
||||||
|
///
|
||||||
|
/// # Cancel-Safety
|
||||||
|
///
|
||||||
|
/// Not cancellation-safe.
|
||||||
async fn drop_old_connection(&mut self, needs_shutdown: bool) {
|
async fn drop_old_connection(&mut self, needs_shutdown: bool) {
|
||||||
let wal_connection = match self.wal_connection.take() {
|
let wal_connection = match self.wal_connection.take() {
|
||||||
Some(wal_connection) => wal_connection,
|
Some(wal_connection) => wal_connection,
|
||||||
@@ -493,7 +513,14 @@ impl ConnectionManagerState {
|
|||||||
};
|
};
|
||||||
|
|
||||||
if needs_shutdown {
|
if needs_shutdown {
|
||||||
wal_connection.connection_task.shutdown().await;
|
wal_connection
|
||||||
|
.connection_task
|
||||||
|
.shutdown()
|
||||||
|
// This here is why this function isn't cancellation-safe.
|
||||||
|
// If we got cancelled here, then self.wal_connection is already None and we lose track of the task.
|
||||||
|
// Even if our caller diligently calls Self::shutdown(), it will find a self.wal_connection=None
|
||||||
|
// and thus be ineffective.
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
let retry = self
|
let retry = self
|
||||||
@@ -838,6 +865,9 @@ impl ConnectionManagerState {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// # Cancel-Safety
|
||||||
|
///
|
||||||
|
/// Not cancellation-safe.
|
||||||
pub(super) async fn shutdown(mut self) {
|
pub(super) async fn shutdown(mut self) {
|
||||||
if let Some(wal_connection) = self.wal_connection.take() {
|
if let Some(wal_connection) = self.wal_connection.take() {
|
||||||
wal_connection.connection_task.shutdown().await;
|
wal_connection.connection_task.shutdown().await;
|
||||||
|
|||||||
@@ -75,6 +75,7 @@ impl Api {
|
|||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Cplane);
|
let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Cplane);
|
||||||
let response = self.endpoint.execute(request).await?;
|
let response = self.endpoint.execute(request).await?;
|
||||||
|
info!("received http response {response:?}");
|
||||||
drop(pause);
|
drop(pause);
|
||||||
info!(duration = ?start.elapsed(), "received http response");
|
info!(duration = ?start.elapsed(), "received http response");
|
||||||
let body = match parse_body::<GetRoleSecret>(response).await {
|
let body = match parse_body::<GetRoleSecret>(response).await {
|
||||||
@@ -137,6 +138,7 @@ impl Api {
|
|||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Cplane);
|
let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Cplane);
|
||||||
let response = self.endpoint.execute(request).await?;
|
let response = self.endpoint.execute(request).await?;
|
||||||
|
info!("received http response {response:?}");
|
||||||
drop(pause);
|
drop(pause);
|
||||||
info!(duration = ?start.elapsed(), "received http response");
|
info!(duration = ?start.elapsed(), "received http response");
|
||||||
let body = parse_body::<WakeCompute>(response).await?;
|
let body = parse_body::<WakeCompute>(response).await?;
|
||||||
|
|||||||
Reference in New Issue
Block a user