Compare commits

...

3 Commits

Author SHA1 Message Date
Heikki Linnakangas
081b0d1e80 Use explicit counter to detect when WAL redo process has been restarted.
More robust than relying on FDs.
2023-10-13 17:08:09 +03:00
Konstantin Knizhnik
e083c86c93 Move saving of stdin descriptor 2023-10-13 09:16:52 +03:00
Konstantin Knizhnik
3406676abd Check if walredo pipe was recreated by some other backend before klilling walredo process 2023-10-12 22:53:27 +03:00

View File

@@ -30,6 +30,7 @@ use std::os::unix::io::{AsRawFd, RawFd};
use std::os::unix::prelude::CommandExt;
use std::process::Stdio;
use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Mutex, MutexGuard};
use std::time::Duration;
use std::time::Instant;
@@ -39,7 +40,7 @@ use utils::crashsafe::path_with_suffix_extension;
use utils::{bin_ser::BeSer, id::TenantId, lsn::Lsn, nonblock::set_nonblock};
#[cfg(feature = "testing")]
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::AtomicUsize;
use crate::metrics::{
WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_RECORDS_HISTOGRAM, WAL_REDO_RECORD_COUNTER, WAL_REDO_TIME,
@@ -93,6 +94,7 @@ pub trait WalRedoManager: Send + Sync {
}
struct ProcessInput {
restart_no: u64,
child: NoLeakChild,
stdin: ChildStdin,
stderr_fd: RawFd,
@@ -101,6 +103,7 @@ struct ProcessInput {
}
struct ProcessOutput {
restart_no: u64,
stdout: ChildStdout,
pending_responses: VecDeque<Option<Bytes>>,
n_processed_responses: usize,
@@ -120,6 +123,7 @@ pub struct PostgresRedoManager {
#[cfg(feature = "testing")]
dump_sequence: AtomicUsize,
restart_counter: AtomicU64,
stdout: Mutex<Option<ProcessOutput>>,
stdin: Mutex<Option<ProcessInput>>,
stderr: Mutex<Option<ChildStderr>>,
@@ -228,6 +232,7 @@ impl PostgresRedoManager {
pub fn new(conf: &'static PageServerConf, tenant_id: TenantId) -> PostgresRedoManager {
// The actual process is launched lazily, on first request.
PostgresRedoManager {
restart_counter: AtomicU64::new(0),
tenant_id,
conf,
#[cfg(feature = "testing")]
@@ -273,6 +278,7 @@ impl PostgresRedoManager {
if proc.is_none() {
self.launch(&mut proc, pg_version)?;
}
let restart_no = proc.as_ref().unwrap().restart_no;
WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64());
// Relational WAL records are applied using wal-redo-postgres
@@ -322,18 +328,12 @@ impl PostgresRedoManager {
// self.stdin only holds stdin & stderr as_raw_fd().
// Dropping it as part of take() doesn't close them.
// The owning objects (ChildStdout and ChildStderr) are stored in
// self.stdout and self.stderr, respsectively.
// We intentionally keep them open here to avoid a race between
// currently running `apply_wal_records()` and a `launch()` call
// after we return here.
// The currently running `apply_wal_records()` must not read from
// the newly launched process.
// By keeping self.stdout and self.stderr open here, `launch()` will
// get other file descriptors for the new child's stdout and stderr,
// and hence the current `apply_wal_records()` calls will observe
// `output.stdout.as_raw_fd() != stdout_fd` .
// self.stdout and self.stderr, respectively.
// They will be closed when the new process is launched.
if let Some(proc) = self.stdin.lock().unwrap().take() {
proc.child.kill_and_wait();
if proc.restart_no == restart_no {
proc.child.kill_and_wait();
}
}
} else if n_attempts != 0 {
info!(n_attempts, "retried walredo succeeded");
@@ -730,7 +730,9 @@ impl PostgresRedoManager {
// all fallible operations post-spawn are complete, so get rid of the guard
let child = scopeguard::ScopeGuard::into_inner(child);
let restart_no = self.restart_counter.fetch_add(1, Ordering::SeqCst);
**input = Some(ProcessInput {
restart_no,
child,
stdout_fd: stdout.as_raw_fd(),
stderr_fd: stderr.as_raw_fd(),
@@ -739,6 +741,7 @@ impl PostgresRedoManager {
});
*self.stdout.lock().unwrap() = Some(ProcessOutput {
restart_no,
stdout,
pending_responses: VecDeque::new(),
n_processed_responses: 0,
@@ -810,13 +813,13 @@ impl PostgresRedoManager {
) -> Result<Bytes, std::io::Error> {
let proc = input.as_mut().unwrap();
let mut nwrite = 0usize;
let stdout_fd = proc.stdout_fd;
let restart_no = proc.restart_no;
// Prepare for calling poll()
let mut pollfds = [
PollFd::new(proc.stdin.as_raw_fd(), PollFlags::POLLOUT),
PollFd::new(proc.stderr_fd, PollFlags::POLLIN),
PollFd::new(stdout_fd, PollFlags::POLLIN),
PollFd::new(proc.stdout_fd, PollFlags::POLLIN),
];
// We do two things simultaneously: send the old base image and WAL records to
@@ -891,13 +894,10 @@ impl PostgresRedoManager {
let mut output_guard = self.stdout.lock().unwrap();
let output = output_guard.as_mut().unwrap();
if output.stdout.as_raw_fd() != stdout_fd {
// If stdout file descriptor is changed then it means that walredo process is crashed and restarted.
// As far as ProcessInput and ProcessOutout are protected by different mutexes,
// it can happen that we send request to one process and waiting response from another.
// To prevent such situation we compare stdout file descriptors.
// As far as old stdout pipe is destroyed only after new one is created,
// it can not reuse the same file descriptor, so this check is safe.
if output.restart_no != restart_no {
// If restart_no changed, the walredo process crashed and was restarted
// between dropping the 'input' lock and acquiring 'output'. In that case,
// 'output' belongs to different process than where we sent the request.
//
// Cross-read this with the comment in apply_batch_postgres if result.is_err().
// That's where we kill the child process.