mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-15 12:10:37 +00:00
fix deadlock
This commit is contained in:
@@ -206,7 +206,7 @@ impl PostgresRedoManager {
|
||||
let mut n_attempts = 0u32;
|
||||
loop {
|
||||
// launch the WAL redo process on first use
|
||||
let proc = self
|
||||
let proc_once_cell_guard_ref = self
|
||||
.redo_process
|
||||
.get_or_init(|init_permit| async move {
|
||||
let start = Instant::now();
|
||||
@@ -232,7 +232,7 @@ impl PostgresRedoManager {
|
||||
let started_at = std::time::Instant::now();
|
||||
|
||||
// Relational WAL records are applied using wal-redo-postgres
|
||||
let result = proc
|
||||
let result = proc_once_cell_guard_ref
|
||||
.apply_wal_records(rel, blknum, &base_img, records, wal_redo_timeout)
|
||||
.context("apply_wal_records");
|
||||
|
||||
@@ -275,32 +275,30 @@ impl PostgresRedoManager {
|
||||
);
|
||||
// Avoid concurrent callers hitting the same issue.
|
||||
// We can't prevent it from happening because we want to enable parallelism.
|
||||
{
|
||||
match self.redo_process.get_mut().await {
|
||||
Some(mut guard) => {
|
||||
if Arc::ptr_eq(&*guard, &proc) {
|
||||
// We're the first to observe an error from `proc`, it's our job to take it out of rotation.
|
||||
drop(guard.take_and_deinit());
|
||||
}
|
||||
}
|
||||
None => {
|
||||
// Another thread was faster to observe the error, and already took the process out of rotation.
|
||||
let proc_clone = Arc::clone(&proc_once_cell_guard_ref);
|
||||
drop(proc_once_cell_guard_ref); // otherwise, the .get_mut() in the next line would deadlock with us holding the guard
|
||||
match self.redo_process.get_mut().await {
|
||||
Some(mut guard) => {
|
||||
if Arc::ptr_eq(&*guard, &proc_clone) {
|
||||
// We're the first to observe an error from `proc`, it's our job to take it out of rotation.
|
||||
drop(guard.take_and_deinit());
|
||||
}
|
||||
}
|
||||
None => {
|
||||
// Another thread was faster to observe the error, and already took the process out of rotation.
|
||||
}
|
||||
}
|
||||
// NB: there may still be other concurrent threads using `proc`.
|
||||
// The last one will send SIGKILL when the underlying Arc reaches refcount 0.
|
||||
// NB: it's important to drop(proc) after drop(guard). Otherwise we'd keep
|
||||
// holding the lock while waiting for the process to exit.
|
||||
// NB: the drop impl blocks the current threads with a wait() system call for
|
||||
// the child process. We dropped the `guard` above so that other threads aren't
|
||||
// affected. But, it's good that the current thread _does_ block to wait.
|
||||
// If we instead deferred the waiting into the background / to tokio, it could
|
||||
// happen that if walredo always fails immediately, we spawn processes faster
|
||||
// than we can SIGKILL & `wait` for them to exit. By doing it the way we do here,
|
||||
// we limit this risk of run-away to at most $num_runtimes * $num_executor_threads.
|
||||
// the child process. We usually avoid stalling the executor thread that way,
|
||||
// but, here it's actually somewhat good. If we instead deferred the waiting
|
||||
// into the background / to tokio, it could happen that if walredo always fails
|
||||
// immediately, we spawn processes faster han we can SIGKILL & `wait` for them to exit.
|
||||
// By doing it the way we do here, e limit this risk of run-away to at most
|
||||
// $num_runtimes * $num_executor_threads.
|
||||
// This probably needs revisiting at some later point.
|
||||
drop(proc);
|
||||
drop(proc_clone);
|
||||
} else if n_attempts != 0 {
|
||||
info!(n_attempts, "retried walredo succeeded");
|
||||
}
|
||||
@@ -500,4 +498,11 @@ mod tests {
|
||||
tracing::info_span!("RedoHarness", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug())
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for RedoHarness {
|
||||
fn drop(&mut self) {
|
||||
self.span()
|
||||
.in_scope(|| tracing::info!("RedoHarness dropping"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use tracing;
|
||||
use tracing::debug;
|
||||
use tracing::error;
|
||||
use tracing::info;
|
||||
use tracing::instrument;
|
||||
@@ -76,7 +77,7 @@ impl NoLeakChild {
|
||||
// with the wait().
|
||||
error!(error = %e, "failed to SIGKILL; subsequent wait() might fail or wait for wrong process");
|
||||
}
|
||||
|
||||
debug!("sent SIGKILL, waiting for child to exit");
|
||||
match child.wait() {
|
||||
Ok(exit_status) => {
|
||||
info!(exit_status = %exit_status, "wait successful");
|
||||
|
||||
Reference in New Issue
Block a user