mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-10 15:02:56 +00:00
refactor(walreceiver): eliminate task_mgr usage (#7260)
We want to move the code base away from task_mgr. This PR refactors the walreceiver code such that it doesn't use `task_mgr` anymore. # Background As a reminder, there are three tasks in a Timeline that's ingesting WAL. `WalReceiverManager`, `WalReceiverConnectionHandler`, and `WalReceiverConnectionPoller`. See the documentation in `task_mgr.rs` for how they interact. Before this PR, cancellation was requested through task_mgr::shutdown_token() and `TaskHandle::shutdown`. Wait-for-task-finish was implemented using a mixture of `task_mgr::shutdown_tasks` and `TaskHandle::shutdown`. This drawing might help: <img width="300" alt="image" src="https://github.com/neondatabase/neon/assets/956573/b6be7ad6-ecb3-41d0-b410-ec85cb8d6d20"> # Changes For cancellation, the entire WalReceiver task tree now has a `child_token()` of `Timeline::cancel`. The `TaskHandle` no longer is a cancellation root. This means that `Timeline::cancel.cancel()` is propagated. For wait-for-task-finish, all three tasks in the task tree hold the `Timeline::gate` open until they exit. The downside of using the `Timeline::gate` is that we can no longer wait for just the walreceiver to shut down, which is particularly relevant for `Timeline::flush_and_shutdown`. Effectively, it means that we might ingest more WAL while the `freeze_and_flush()` call is ongoing. Also, drive-by-fix the assertiosn around task kinds in `wait_lsn`. The check for `WalReceiverConnectionHandler` was ineffective because that never was a task_mgr task, but a TaskHandle task. Refine the assertion to check whether we would wait, and only fail in that case. # Alternatives I contemplated (ab-)using the `Gate` by having a separate `Gate` for `struct WalReceiver`. All the child tasks would use _that_ gate instead of `Timeline::gate`. And `struct WalReceiver` itself would hold an `Option<GateGuard>` of the `Timeline::gate`. Then we could have a `WalReceiver::stop` function that closes the WalReceiver's gate, then drops the `WalReceiver::Option<GateGuard>`. However, such design would mean sharing the WalReceiver's `Gate` in an `Arc`, which seems awkward. A proper abstraction would be to make gates hierarchical, analogous to CancellationToken. In the end, @jcsp and I talked it over and we determined that it's not worth the effort at this time. # Refs part of #7062
This commit is contained in:
committed by
GitHub
parent
bc05d7eb9c
commit
3de416a016
@@ -182,6 +182,18 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if [`Self::wait_for`] or [`Self::wait_for_timeout`] would wait if called with `num`.
|
||||
pub fn would_wait_for(&self, num: V) -> Result<(), V> {
|
||||
let internal = self.internal.lock().unwrap();
|
||||
let cnt = internal.current.cnt_value();
|
||||
drop(internal);
|
||||
if cnt >= num {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(cnt)
|
||||
}
|
||||
}
|
||||
|
||||
/// Register and return a channel that will be notified when a number arrives,
|
||||
/// or None, if it has already arrived.
|
||||
fn queue_for_wait(&self, num: V) -> Result<Option<Receiver<()>>, SeqWaitError> {
|
||||
|
||||
@@ -876,7 +876,13 @@ impl PageServerHandler {
|
||||
if lsn <= last_record_lsn {
|
||||
lsn = last_record_lsn;
|
||||
} else {
|
||||
timeline.wait_lsn(lsn, ctx).await?;
|
||||
timeline
|
||||
.wait_lsn(
|
||||
lsn,
|
||||
crate::tenant::timeline::WaitLsnWaiter::PageService,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
// Since we waited for 'lsn' to arrive, that is now the last
|
||||
// record LSN. (Or close enough for our purposes; the
|
||||
// last-record LSN can advance immediately after we return
|
||||
@@ -888,7 +894,13 @@ impl PageServerHandler {
|
||||
"invalid LSN(0) in request".into(),
|
||||
));
|
||||
}
|
||||
timeline.wait_lsn(lsn, ctx).await?;
|
||||
timeline
|
||||
.wait_lsn(
|
||||
lsn,
|
||||
crate::tenant::timeline::WaitLsnWaiter::PageService,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
if lsn < **latest_gc_cutoff_lsn {
|
||||
@@ -1215,7 +1227,13 @@ impl PageServerHandler {
|
||||
if let Some(lsn) = lsn {
|
||||
// Backup was requested at a particular LSN. Wait for it to arrive.
|
||||
info!("waiting for {}", lsn);
|
||||
timeline.wait_lsn(lsn, ctx).await?;
|
||||
timeline
|
||||
.wait_lsn(
|
||||
lsn,
|
||||
crate::tenant::timeline::WaitLsnWaiter::PageService,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
timeline
|
||||
.check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
|
||||
.context("invalid basebackup lsn")?;
|
||||
|
||||
@@ -214,13 +214,12 @@ pub enum TaskKind {
|
||||
/// Internally, `Client` hands over requests to the `Connection` object.
|
||||
/// The `Connection` object is responsible for speaking the wire protocol.
|
||||
///
|
||||
/// Walreceiver uses its own abstraction called `TaskHandle` to represent the activity of establishing and handling a connection.
|
||||
/// That abstraction doesn't use `task_mgr`.
|
||||
/// Walreceiver uses a legacy abstraction called `TaskHandle` to represent the activity of establishing and handling a connection.
|
||||
/// The `WalReceiverManager` task ensures that this `TaskHandle` task does not outlive the `WalReceiverManager` task.
|
||||
/// For the `RequestContext` that we hand to the TaskHandle, we use the [`WalReceiverConnectionHandler`] task kind.
|
||||
///
|
||||
/// Once the connection is established, the `TaskHandle` task creates a
|
||||
/// [`WalReceiverConnectionPoller`] task_mgr task that is responsible for polling
|
||||
/// Once the connection is established, the `TaskHandle` task spawns a
|
||||
/// [`WalReceiverConnectionPoller`] task that is responsible for polling
|
||||
/// the `Connection` object.
|
||||
/// A `CancellationToken` created by the `TaskHandle` task ensures
|
||||
/// that the [`WalReceiverConnectionPoller`] task will cancel soon after as the `TaskHandle` is dropped.
|
||||
@@ -230,7 +229,6 @@ pub enum TaskKind {
|
||||
WalReceiverManager,
|
||||
|
||||
/// The `TaskHandle` task that executes `handle_walreceiver_connection`.
|
||||
/// Not a `task_mgr` task, but we use this `TaskKind` for its `RequestContext`.
|
||||
/// See the comment on [`WalReceiverManager`].
|
||||
///
|
||||
/// [`WalReceiverManager`]: Self::WalReceiverManager
|
||||
|
||||
@@ -1515,7 +1515,7 @@ impl Tenant {
|
||||
// sizes etc. and that would get confused if the previous page versions
|
||||
// are not in the repository yet.
|
||||
ancestor_timeline
|
||||
.wait_lsn(*lsn, ctx)
|
||||
.wait_lsn(*lsn, timeline::WaitLsnWaiter::Tenant, ctx)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
e @ (WaitLsnError::Timeout(_) | WaitLsnError::BadState) => {
|
||||
|
||||
@@ -1649,7 +1649,14 @@ impl TenantManager {
|
||||
fail::fail_point!("shard-split-lsn-wait", |_| Err(anyhow::anyhow!(
|
||||
"failpoint"
|
||||
)));
|
||||
if let Err(e) = timeline.wait_lsn(*target_lsn, ctx).await {
|
||||
if let Err(e) = timeline
|
||||
.wait_lsn(
|
||||
*target_lsn,
|
||||
crate::tenant::timeline::WaitLsnWaiter::Tenant,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
// Failure here might mean shutdown, in any case this part is an optimization
|
||||
// and we shouldn't hold up the split operation.
|
||||
tracing::warn!(
|
||||
|
||||
@@ -612,6 +612,12 @@ pub enum GetVectoredImpl {
|
||||
Vectored,
|
||||
}
|
||||
|
||||
pub(crate) enum WaitLsnWaiter<'a> {
|
||||
Timeline(&'a Timeline),
|
||||
Tenant,
|
||||
PageService,
|
||||
}
|
||||
|
||||
/// Public interface functions
|
||||
impl Timeline {
|
||||
/// Get the LSN where this branch was created
|
||||
@@ -1060,7 +1066,8 @@ impl Timeline {
|
||||
pub(crate) async fn wait_lsn(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
_ctx: &RequestContext, /* Prepare for use by cancellation */
|
||||
who_is_waiting: WaitLsnWaiter<'_>,
|
||||
ctx: &RequestContext, /* Prepare for use by cancellation */
|
||||
) -> Result<(), WaitLsnError> {
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(WaitLsnError::Shutdown);
|
||||
@@ -1068,20 +1075,28 @@ impl Timeline {
|
||||
return Err(WaitLsnError::BadState);
|
||||
}
|
||||
|
||||
// This should never be called from the WAL receiver, because that could lead
|
||||
// to a deadlock.
|
||||
debug_assert!(
|
||||
task_mgr::current_task_kind() != Some(TaskKind::WalReceiverManager),
|
||||
"wait_lsn cannot be called in WAL receiver"
|
||||
);
|
||||
debug_assert!(
|
||||
task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionHandler),
|
||||
"wait_lsn cannot be called in WAL receiver"
|
||||
);
|
||||
debug_assert!(
|
||||
task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionPoller),
|
||||
"wait_lsn cannot be called in WAL receiver"
|
||||
);
|
||||
if cfg!(debug_assertions) {
|
||||
match ctx.task_kind() {
|
||||
TaskKind::WalReceiverManager
|
||||
| TaskKind::WalReceiverConnectionHandler
|
||||
| TaskKind::WalReceiverConnectionPoller => {
|
||||
let is_myself = match who_is_waiting {
|
||||
WaitLsnWaiter::Timeline(waiter) => Weak::ptr_eq(&waiter.myself, &self.myself),
|
||||
WaitLsnWaiter::Tenant | WaitLsnWaiter::PageService => unreachable!("tenant or page_service context are not expected to have task kind {:?}", ctx.task_kind()),
|
||||
};
|
||||
if is_myself {
|
||||
if let Err(current) = self.last_record_lsn.would_wait_for(lsn) {
|
||||
// walingest is the only one that can advance last_record_lsn; it should make sure to never reach here
|
||||
panic!("this timeline's walingest task is calling wait_lsn({lsn}) but we only have last_record_lsn={current}; would deadlock");
|
||||
}
|
||||
} else {
|
||||
// if another timeline's is waiting for us, there's no deadlock risk because
|
||||
// our walreceiver task can make progress independent of theirs
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
let _timer = crate::metrics::WAIT_LSN_TIME.start_timer();
|
||||
|
||||
@@ -1297,15 +1312,18 @@ impl Timeline {
|
||||
pub(crate) async fn flush_and_shutdown(&self) {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
// Stop ingesting data, so that we are not still writing to an InMemoryLayer while
|
||||
// trying to flush
|
||||
tracing::debug!("Waiting for WalReceiverManager...");
|
||||
task_mgr::shutdown_tasks(
|
||||
Some(TaskKind::WalReceiverManager),
|
||||
Some(self.tenant_shard_id),
|
||||
Some(self.timeline_id),
|
||||
)
|
||||
.await;
|
||||
// Stop ingesting data. Walreceiver only provides cancellation but no
|
||||
// "wait until gone", because it uses the Timeline::gate. So, only
|
||||
// after the self.gate.close() in self.shutdown() below will we know for
|
||||
// sure that no walreceiver tasks are left.
|
||||
// This means that we might still be ingesting data during the call to
|
||||
// `self.freeze_and_flush()` below. That's not ideal, but, we don't have
|
||||
// the concept of a ChildGuard, which is what we'd need to properly model
|
||||
// early shutdown of the walreceiver task sub-tree before the other
|
||||
// Timeline task sub-trees.
|
||||
if let Some(walreceiver) = self.walreceiver.lock().unwrap().take() {
|
||||
walreceiver.cancel();
|
||||
}
|
||||
|
||||
// Since we have shut down WAL ingest, we should not let anyone start waiting for the LSN to advance
|
||||
self.last_record_lsn.shutdown();
|
||||
@@ -3054,7 +3072,7 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
ancestor
|
||||
.wait_lsn(self.ancestor_lsn, ctx)
|
||||
.wait_lsn(self.ancestor_lsn, WaitLsnWaiter::Timeline(self), ctx)
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
e @ WaitLsnError::Timeout(_) => GetReadyAncestorError::AncestorLsnTimeout(e),
|
||||
|
||||
@@ -6,7 +6,7 @@ use std::{
|
||||
use anyhow::Context;
|
||||
use pageserver_api::{models::TimelineState, shard::TenantShardId};
|
||||
use tokio::sync::OwnedMutexGuard;
|
||||
use tracing::{debug, error, info, instrument, Instrument};
|
||||
use tracing::{error, info, instrument, Instrument};
|
||||
use utils::{crashsafe, fs_ext, id::TimelineId};
|
||||
|
||||
use crate::{
|
||||
@@ -30,22 +30,6 @@ async fn stop_tasks(timeline: &Timeline) -> Result<(), DeleteTimelineError> {
|
||||
tracing::debug!("Cancelling CancellationToken");
|
||||
timeline.cancel.cancel();
|
||||
|
||||
// Stop the walreceiver first.
|
||||
debug!("waiting for wal receiver to shutdown");
|
||||
let maybe_started_walreceiver = { timeline.walreceiver.lock().unwrap().take() };
|
||||
if let Some(walreceiver) = maybe_started_walreceiver {
|
||||
walreceiver.stop().await;
|
||||
}
|
||||
debug!("wal receiver shutdown confirmed");
|
||||
|
||||
// Shut down the layer flush task before the remote client, as one depends on the other
|
||||
task_mgr::shutdown_tasks(
|
||||
Some(TaskKind::LayerFlushTask),
|
||||
Some(timeline.tenant_shard_id),
|
||||
Some(timeline.timeline_id),
|
||||
)
|
||||
.await;
|
||||
|
||||
// Prevent new uploads from starting.
|
||||
if let Some(remote_client) = timeline.remote_client.as_ref() {
|
||||
remote_client.stop();
|
||||
|
||||
@@ -24,13 +24,12 @@ mod connection_manager;
|
||||
mod walreceiver_connection;
|
||||
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::task_mgr::{self, TaskKind, WALRECEIVER_RUNTIME};
|
||||
use crate::task_mgr::{TaskKind, WALRECEIVER_RUNTIME};
|
||||
use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
use crate::tenant::timeline::walreceiver::connection_manager::{
|
||||
connection_manager_loop_step, ConnectionManagerState,
|
||||
};
|
||||
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use std::future::Future;
|
||||
use std::num::NonZeroU64;
|
||||
use std::sync::Arc;
|
||||
@@ -40,8 +39,6 @@ use tokio::sync::watch;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
|
||||
use utils::id::TimelineId;
|
||||
|
||||
use self::connection_manager::ConnectionManagerStatus;
|
||||
|
||||
use super::Timeline;
|
||||
@@ -60,9 +57,10 @@ pub struct WalReceiverConf {
|
||||
}
|
||||
|
||||
pub struct WalReceiver {
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
manager_status: Arc<std::sync::RwLock<Option<ConnectionManagerStatus>>>,
|
||||
/// All task spawned by [`WalReceiver::start`] and its children are sensitive to this token.
|
||||
/// It's a child token of [`Timeline`] so that timeline shutdown can cancel WalReceiver tasks early for `freeze_and_flush=true`.
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
impl WalReceiver {
|
||||
@@ -76,23 +74,23 @@ impl WalReceiver {
|
||||
let timeline_id = timeline.timeline_id;
|
||||
let walreceiver_ctx =
|
||||
ctx.detached_child(TaskKind::WalReceiverManager, DownloadBehavior::Error);
|
||||
|
||||
let loop_status = Arc::new(std::sync::RwLock::new(None));
|
||||
let manager_status = Arc::clone(&loop_status);
|
||||
task_mgr::spawn(
|
||||
WALRECEIVER_RUNTIME.handle(),
|
||||
TaskKind::WalReceiverManager,
|
||||
Some(timeline.tenant_shard_id),
|
||||
Some(timeline_id),
|
||||
&format!("walreceiver for timeline {tenant_shard_id}/{timeline_id}"),
|
||||
false,
|
||||
let cancel = timeline.cancel.child_token();
|
||||
WALRECEIVER_RUNTIME.spawn({
|
||||
let cancel = cancel.clone();
|
||||
async move {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
// acquire timeline gate so we know the task doesn't outlive the Timeline
|
||||
let Ok(_guard) = timeline.gate.enter() else {
|
||||
debug!("WAL receiver manager could not enter the gate timeline gate, it's closed already");
|
||||
return;
|
||||
};
|
||||
debug!("WAL receiver manager started, connecting to broker");
|
||||
let cancel = task_mgr::shutdown_token();
|
||||
let mut connection_manager_state = ConnectionManagerState::new(
|
||||
timeline,
|
||||
conf,
|
||||
cancel.clone(),
|
||||
);
|
||||
while !cancel.is_cancelled() {
|
||||
let loop_step_result = connection_manager_loop_step(
|
||||
@@ -112,25 +110,22 @@ impl WalReceiver {
|
||||
}
|
||||
connection_manager_state.shutdown().await;
|
||||
*loop_status.write().unwrap() = None;
|
||||
Ok(())
|
||||
debug!("task exits");
|
||||
}
|
||||
.instrument(info_span!(parent: None, "wal_connection_manager", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), timeline_id = %timeline_id))
|
||||
);
|
||||
});
|
||||
|
||||
Self {
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
manager_status,
|
||||
cancel,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn stop(self) {
|
||||
task_mgr::shutdown_tasks(
|
||||
Some(TaskKind::WalReceiverManager),
|
||||
Some(self.tenant_shard_id),
|
||||
Some(self.timeline_id),
|
||||
)
|
||||
.await;
|
||||
#[instrument(skip_all, level = tracing::Level::DEBUG)]
|
||||
pub fn cancel(&self) {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
debug!("cancelling walreceiver tasks");
|
||||
self.cancel.cancel();
|
||||
}
|
||||
|
||||
pub(crate) fn status(&self) -> Option<ConnectionManagerStatus> {
|
||||
@@ -164,14 +159,18 @@ enum TaskStateUpdate<E> {
|
||||
|
||||
impl<E: Clone> TaskHandle<E> {
|
||||
/// Initializes the task, starting it immediately after the creation.
|
||||
///
|
||||
/// The second argument to `task` is a child token of `cancel_parent` ([`CancellationToken::child_token`]).
|
||||
/// It being a child token enables us to provide a [`Self::shutdown`] method.
|
||||
fn spawn<Fut>(
|
||||
cancel_parent: &CancellationToken,
|
||||
task: impl FnOnce(watch::Sender<TaskStateUpdate<E>>, CancellationToken) -> Fut + Send + 'static,
|
||||
) -> Self
|
||||
where
|
||||
Fut: Future<Output = anyhow::Result<()>> + Send,
|
||||
E: Send + Sync + 'static,
|
||||
{
|
||||
let cancellation = CancellationToken::new();
|
||||
let cancellation = cancel_parent.child_token();
|
||||
let (events_sender, events_receiver) = watch::channel(TaskStateUpdate::Started);
|
||||
|
||||
let cancellation_clone = cancellation.clone();
|
||||
|
||||
@@ -280,6 +280,8 @@ pub(super) struct ConnectionManagerState {
|
||||
id: TenantTimelineId,
|
||||
/// Use pageserver data about the timeline to filter out some of the safekeepers.
|
||||
timeline: Arc<Timeline>,
|
||||
/// Child token of [`super::WalReceiver::cancel`], inherited to all tasks we spawn.
|
||||
cancel: CancellationToken,
|
||||
conf: WalReceiverConf,
|
||||
/// Current connection to safekeeper for WAL streaming.
|
||||
wal_connection: Option<WalConnection>,
|
||||
@@ -402,7 +404,11 @@ struct BrokerSkTimeline {
|
||||
}
|
||||
|
||||
impl ConnectionManagerState {
|
||||
pub(super) fn new(timeline: Arc<Timeline>, conf: WalReceiverConf) -> Self {
|
||||
pub(super) fn new(
|
||||
timeline: Arc<Timeline>,
|
||||
conf: WalReceiverConf,
|
||||
cancel: CancellationToken,
|
||||
) -> Self {
|
||||
let id = TenantTimelineId {
|
||||
tenant_id: timeline.tenant_shard_id.tenant_id,
|
||||
timeline_id: timeline.timeline_id,
|
||||
@@ -410,6 +416,7 @@ impl ConnectionManagerState {
|
||||
Self {
|
||||
id,
|
||||
timeline,
|
||||
cancel,
|
||||
conf,
|
||||
wal_connection: None,
|
||||
wal_stream_candidates: HashMap::new(),
|
||||
@@ -417,6 +424,22 @@ impl ConnectionManagerState {
|
||||
}
|
||||
}
|
||||
|
||||
fn spawn<Fut>(
|
||||
&self,
|
||||
task: impl FnOnce(
|
||||
tokio::sync::watch::Sender<TaskStateUpdate<WalConnectionStatus>>,
|
||||
CancellationToken,
|
||||
) -> Fut
|
||||
+ Send
|
||||
+ 'static,
|
||||
) -> TaskHandle<WalConnectionStatus>
|
||||
where
|
||||
Fut: std::future::Future<Output = anyhow::Result<()>> + Send,
|
||||
{
|
||||
// TODO: get rid of TaskHandle
|
||||
super::TaskHandle::spawn(&self.cancel, task)
|
||||
}
|
||||
|
||||
/// Shuts down the current connection (if any) and immediately starts another one with the given connection string.
|
||||
async fn change_connection(&mut self, new_sk: NewWalConnectionCandidate, ctx: &RequestContext) {
|
||||
WALRECEIVER_SWITCHES
|
||||
@@ -435,7 +458,7 @@ impl ConnectionManagerState {
|
||||
);
|
||||
|
||||
let span = info_span!("connection", %node_id);
|
||||
let connection_handle = TaskHandle::spawn(move |events_sender, cancellation| {
|
||||
let connection_handle = self.spawn(move |events_sender, cancellation| {
|
||||
async move {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
@@ -463,6 +486,12 @@ impl ConnectionManagerState {
|
||||
info!("walreceiver connection handling ended: {e}");
|
||||
Ok(())
|
||||
}
|
||||
WalReceiverError::ClosedGate => {
|
||||
info!(
|
||||
"walreceiver connection handling ended because of closed gate"
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
WalReceiverError::Other(e) => {
|
||||
// give out an error to have task_mgr give it a really verbose logging
|
||||
if cancellation.is_cancelled() {
|
||||
@@ -1016,7 +1045,7 @@ mod tests {
|
||||
sk_id: connected_sk_id,
|
||||
availability_zone: None,
|
||||
status: connection_status,
|
||||
connection_task: TaskHandle::spawn(move |sender, _| async move {
|
||||
connection_task: state.spawn(move |sender, _| async move {
|
||||
sender
|
||||
.send(TaskStateUpdate::Progress(connection_status))
|
||||
.ok();
|
||||
@@ -1184,7 +1213,7 @@ mod tests {
|
||||
sk_id: connected_sk_id,
|
||||
availability_zone: None,
|
||||
status: connection_status,
|
||||
connection_task: TaskHandle::spawn(move |sender, _| async move {
|
||||
connection_task: state.spawn(move |sender, _| async move {
|
||||
sender
|
||||
.send(TaskStateUpdate::Progress(connection_status))
|
||||
.ok();
|
||||
@@ -1251,7 +1280,7 @@ mod tests {
|
||||
sk_id: NodeId(1),
|
||||
availability_zone: None,
|
||||
status: connection_status,
|
||||
connection_task: TaskHandle::spawn(move |sender, _| async move {
|
||||
connection_task: state.spawn(move |sender, _| async move {
|
||||
sender
|
||||
.send(TaskStateUpdate::Progress(connection_status))
|
||||
.ok();
|
||||
@@ -1315,7 +1344,7 @@ mod tests {
|
||||
sk_id: NodeId(1),
|
||||
availability_zone: None,
|
||||
status: connection_status,
|
||||
connection_task: TaskHandle::spawn(move |_, _| async move { Ok(()) }),
|
||||
connection_task: state.spawn(move |_, _| async move { Ok(()) }),
|
||||
discovered_new_wal: Some(NewCommittedWAL {
|
||||
discovered_at: time_over_threshold,
|
||||
lsn: new_lsn,
|
||||
@@ -1371,6 +1400,7 @@ mod tests {
|
||||
timeline_id: TIMELINE_ID,
|
||||
},
|
||||
timeline,
|
||||
cancel: CancellationToken::new(),
|
||||
conf: WalReceiverConf {
|
||||
wal_connect_timeout: Duration::from_secs(1),
|
||||
lagging_wal_timeout: Duration::from_secs(1),
|
||||
@@ -1414,7 +1444,7 @@ mod tests {
|
||||
sk_id: connected_sk_id,
|
||||
availability_zone: None,
|
||||
status: connection_status,
|
||||
connection_task: TaskHandle::spawn(move |sender, _| async move {
|
||||
connection_task: state.spawn(move |sender, _| async move {
|
||||
sender
|
||||
.send(TaskStateUpdate::Progress(connection_status))
|
||||
.ok();
|
||||
|
||||
@@ -27,7 +27,6 @@ use super::TaskStateUpdate;
|
||||
use crate::{
|
||||
context::RequestContext,
|
||||
metrics::{LIVE_CONNECTIONS_COUNT, WALRECEIVER_STARTED_CONNECTIONS, WAL_INGEST},
|
||||
task_mgr,
|
||||
task_mgr::TaskKind,
|
||||
task_mgr::WALRECEIVER_RUNTIME,
|
||||
tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline, WalReceiverInfo},
|
||||
@@ -37,8 +36,8 @@ use crate::{
|
||||
use postgres_backend::is_expected_io_error;
|
||||
use postgres_connection::PgConnectionConfig;
|
||||
use postgres_ffi::waldecoder::WalStreamDecoder;
|
||||
use utils::pageserver_feedback::PageserverFeedback;
|
||||
use utils::{id::NodeId, lsn::Lsn};
|
||||
use utils::{pageserver_feedback::PageserverFeedback, sync::gate::GateError};
|
||||
|
||||
/// Status of the connection.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
@@ -68,6 +67,7 @@ pub(super) enum WalReceiverError {
|
||||
SuccessfulCompletion(String),
|
||||
/// Generic error
|
||||
Other(anyhow::Error),
|
||||
ClosedGate,
|
||||
}
|
||||
|
||||
impl From<tokio_postgres::Error> for WalReceiverError {
|
||||
@@ -119,6 +119,16 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
) -> Result<(), WalReceiverError> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
// prevent timeline shutdown from finishing until we have exited
|
||||
let _guard = timeline.gate.enter().map_err(|e| match e {
|
||||
GateError::GateClosed => WalReceiverError::ClosedGate,
|
||||
})?;
|
||||
// This function spawns a side-car task (WalReceiverConnectionPoller).
|
||||
// Get its gate guard now as well.
|
||||
let poller_guard = timeline.gate.enter().map_err(|e| match e {
|
||||
GateError::GateClosed => WalReceiverError::ClosedGate,
|
||||
})?;
|
||||
|
||||
WALRECEIVER_STARTED_CONNECTIONS.inc();
|
||||
|
||||
// Connect to the database in replication mode.
|
||||
@@ -156,22 +166,19 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
}
|
||||
|
||||
// The connection object performs the actual communication with the database,
|
||||
// so spawn it off to run on its own.
|
||||
// so spawn it off to run on its own. It shouldn't outlive this function, but,
|
||||
// due to lack of async drop, we can't enforce that. However, we ensure that
|
||||
// 1. it is sensitive to `cancellation` and
|
||||
// 2. holds the Timeline gate open so that after timeline shutdown,
|
||||
// we know this task is gone.
|
||||
let _connection_ctx = ctx.detached_child(
|
||||
TaskKind::WalReceiverConnectionPoller,
|
||||
ctx.download_behavior(),
|
||||
);
|
||||
let connection_cancellation = cancellation.clone();
|
||||
task_mgr::spawn(
|
||||
WALRECEIVER_RUNTIME.handle(),
|
||||
TaskKind::WalReceiverConnectionPoller,
|
||||
Some(timeline.tenant_shard_id),
|
||||
Some(timeline.timeline_id),
|
||||
"walreceiver connection",
|
||||
false,
|
||||
WALRECEIVER_RUNTIME.spawn(
|
||||
async move {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
select! {
|
||||
connection_result = connection => match connection_result {
|
||||
Ok(()) => debug!("Walreceiver db connection closed"),
|
||||
@@ -182,6 +189,9 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
// with a similar error.
|
||||
},
|
||||
WalReceiverError::SuccessfulCompletion(_) => {}
|
||||
WalReceiverError::ClosedGate => {
|
||||
// doesn't happen at runtime
|
||||
}
|
||||
WalReceiverError::Other(err) => {
|
||||
warn!("Connection aborted: {err:#}")
|
||||
}
|
||||
@@ -190,7 +200,7 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
},
|
||||
_ = connection_cancellation.cancelled() => debug!("Connection cancelled"),
|
||||
}
|
||||
Ok(())
|
||||
drop(poller_guard);
|
||||
}
|
||||
// Enrich the log lines emitted by this closure with meaningful context.
|
||||
// TODO: technically, this task outlives the surrounding function, so, the
|
||||
|
||||
Reference in New Issue
Block a user