mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 21:42:56 +00:00
From #6037 on, until this patch, if the client opens the connection but doesn't send a `PagestreamFeMessage` within the first 10ms, we'd close the connection because `self.timeline_cancelled()` returns. It returns because `self.shard_timelines` is still empty at that point: it gets filled lazily within the handlers for the incoming messages. Changes ------- The question is: if we can't check for timeline cancellation, what else do we need to be cancellable for? `tenant.cancel` is also a bad choice because the `tenant` (shard) we pick at the top of handle_pagerequests might indeed go away over the course of the connection lifetime, but other shards may still be there. The correct solution, I think, is to be responsive to task_mgr cancellation, because the connection handler runs in a task_mgr task and it is already the current canonical way how we shut down a tenant's / timelin's page_service connections (see `Tenant::shutdown` / `Timeline::shutdown`). So, rename the function and make it sensitive to task_mgr cancellation.
This commit is contained in:
committed by
GitHub
parent
88df057531
commit
760a48207d
@@ -384,11 +384,17 @@ impl PageServerHandler {
|
||||
}
|
||||
}
|
||||
|
||||
/// Analogous to calling cancelled() on a Timeline's cancellation token: waits for cancellation.
|
||||
/// Future that completes when we need to shut down the connection.
|
||||
///
|
||||
/// We use many Timeline objects, and hold GateGuards on all of them. We must therefore respect
|
||||
/// all of their cancellation tokens.
|
||||
async fn timeline_cancelled(&self) {
|
||||
/// Reasons for need to shut down are:
|
||||
/// - any of the timelines we hold GateGuards for in `shard_timelines` is cancelled
|
||||
/// - task_mgr requests shutdown of the connection
|
||||
///
|
||||
/// The need to check for `task_mgr` cancellation arises mainly from `handle_pagerequests`
|
||||
/// where, at first, `shard_timelines` is empty, see <https://github.com/neondatabase/neon/pull/6388>
|
||||
///
|
||||
/// NB: keep in sync with [`Self::is_connection_cancelled`]
|
||||
async fn await_connection_cancelled(&self) {
|
||||
// A short wait before we expend the cycles to walk our timeline map. This avoids incurring
|
||||
// that cost every time we check for cancellation.
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
@@ -404,14 +410,19 @@ impl PageServerHandler {
|
||||
.map(|ht| ht.timeline.cancel.cancelled())
|
||||
.collect::<FuturesUnordered<_>>();
|
||||
|
||||
futs.next().await;
|
||||
tokio::select! {
|
||||
_ = task_mgr::shutdown_watcher() => { }
|
||||
_ = futs.next() => {}
|
||||
}
|
||||
}
|
||||
|
||||
/// Analogous to calling is_cancelled() on a Timeline's cancellation token
|
||||
fn timeline_is_cancelled(&self) -> bool {
|
||||
self.shard_timelines
|
||||
.values()
|
||||
.any(|ht| ht.timeline.cancel.is_cancelled() || ht.timeline.is_stopping())
|
||||
/// Checking variant of [`Self::await_connection_cancelled`].
|
||||
fn is_connection_cancelled(&self) -> bool {
|
||||
task_mgr::is_shutdown_requested()
|
||||
|| self
|
||||
.shard_timelines
|
||||
.values()
|
||||
.any(|ht| ht.timeline.cancel.is_cancelled() || ht.timeline.is_stopping())
|
||||
}
|
||||
|
||||
/// This function always respects cancellation of any timeline in `[Self::shard_timelines]`. Pass in
|
||||
@@ -432,7 +443,7 @@ impl PageServerHandler {
|
||||
flush_r = pgb.flush() => {
|
||||
Ok(flush_r?)
|
||||
},
|
||||
_ = self.timeline_cancelled() => {
|
||||
_ = self.await_connection_cancelled() => {
|
||||
Err(QueryError::Shutdown)
|
||||
}
|
||||
_ = cancel.cancelled() => {
|
||||
@@ -549,7 +560,7 @@ impl PageServerHandler {
|
||||
let msg = tokio::select! {
|
||||
biased;
|
||||
|
||||
_ = self.timeline_cancelled() => {
|
||||
_ = self.await_connection_cancelled() => {
|
||||
// We were requested to shut down.
|
||||
info!("shutdown request received in page handler");
|
||||
return Err(QueryError::Shutdown)
|
||||
@@ -632,7 +643,7 @@ impl PageServerHandler {
|
||||
span.in_scope(|| info!("handler requested reconnect: {reason}"));
|
||||
return Err(QueryError::Reconnect);
|
||||
}
|
||||
Err(e) if self.timeline_is_cancelled() => {
|
||||
Err(e) if self.is_connection_cancelled() => {
|
||||
// This branch accomodates code within request handlers that returns an anyhow::Error instead of a clean
|
||||
// shutdown error, this may be buried inside a PageReconstructError::Other for example.
|
||||
//
|
||||
|
||||
Reference in New Issue
Block a user