From e4d90fe0bbf818b72db43875fcff208a54238d98 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Fri, 26 Mar 2021 15:25:57 +0200 Subject: [PATCH] Add timeouts to WAL redo, to prevent it from getting stuck. It is getting stuck at least in spgist index currently. Not sure why, that needs to be investigated, but having a timeout is a good idea anyway. --- src/walredo.rs | 32 ++++++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/src/walredo.rs b/src/walredo.rs index 4b484a4481..5ea7e8b238 100644 --- a/src/walredo.rs +++ b/src/walredo.rs @@ -19,12 +19,14 @@ use tokio::process::{Command, Child, ChildStdin, ChildStdout}; use std::process::Stdio; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::AsyncBufReadExt; +use tokio::time::timeout; use std::io::Error; use std::cell::RefCell; use std::assert; use std::sync::{Arc}; use log::*; use std::time::Instant; +use std::time::Duration; use bytes::{Bytes, BytesMut, BufMut}; @@ -33,6 +35,8 @@ use crate::page_cache::CacheEntry; use crate::page_cache::WALRecord; use crate::page_cache; +static TIMEOUT: Duration = Duration::from_secs(20); + // // Main entry point for the WAL applicator thread. // @@ -65,7 +69,11 @@ pub fn wal_applicator_main() let request = walredo_channel_receiver.recv().unwrap(); - handle_apply_request(&process, &runtime, request); + let result = handle_apply_request(&process, &runtime, request); + if result.is_err() { + // On error, kill the process. + break; + } } info!("killing WAL redo postgres process"); @@ -76,7 +84,7 @@ pub fn wal_applicator_main() } } -fn handle_apply_request(process: &WalRedoProcess, runtime: &Runtime, entry_rc: Arc) +fn handle_apply_request(process: &WalRedoProcess, runtime: &Runtime, entry_rc: Arc) -> Result<(), Error> { let tag = entry_rc.key.tag; let lsn = entry_rc.key.lsn; @@ -91,19 +99,25 @@ fn handle_apply_request(process: &WalRedoProcess, runtime: &Runtime, entry_rc: A let apply_result = process.apply_wal_records(runtime, tag, base_img, records); let duration = start.elapsed(); + let result; + debug!("applied {} WAL records in {} ms to reconstruct page image at LSN {:X}/{:X}", nrecords, duration.as_millis(), lsn >> 32, lsn & 0xffff_ffff); if let Err(e) = apply_result { error!("could not apply WAL records: {}", e); + result = Err(e); } else { entry.page_image = Some(apply_result.unwrap()); page_cache::PAGECACHE.num_page_images.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + result = Ok(()); } // Wake up the requester, whether the operation succeeded or not. entry_rc.walredo_condvar.notify_all(); + + return result; } struct WalRedoProcess { @@ -179,23 +193,25 @@ impl WalRedoProcess { let f_stdin = async { // Send base image, if any. (If the record initializes the page, previous page // version is not needed.) - stdin.write(&build_begin_redo_for_block_msg(tag)).await?; + timeout(TIMEOUT, stdin.write(&build_begin_redo_for_block_msg(tag))).await??; if base_img.is_some() { - stdin.write(&build_push_page_msg(tag, base_img.unwrap())).await?; + timeout(TIMEOUT, stdin.write(&build_push_page_msg(tag, base_img.unwrap()))).await??; } // Send WAL records. for rec in records.iter() { let r = rec.clone(); - stdin.write(&build_apply_record_msg(r.lsn, r.rec)).await?; + timeout(TIMEOUT, stdin.write(&build_apply_record_msg(r.lsn, r.rec))).await??; + //debug!("sent WAL record to wal redo postgres process ({:X}/{:X}", + // r.lsn >> 32, r.lsn & 0xffff_ffff); } //debug!("sent {} WAL records to wal redo postgres process ({:X}/{:X}", // records.len(), lsn >> 32, lsn & 0xffff_ffff); // Send GetPage command to get the result back - stdin.write(&build_get_page_msg(tag)).await?; - stdin.flush().await?; + timeout(TIMEOUT, stdin.write(&build_get_page_msg(tag))).await??; + timeout(TIMEOUT, stdin.flush()).await??; //debug!("sent GetPage for {}", tag.blknum); Ok::<(), Error>(()) }; @@ -204,7 +220,7 @@ impl WalRedoProcess { let f_stdout = async { let mut buf = [0u8; 8192]; - stdout.read_exact(&mut buf).await?; + timeout(TIMEOUT, stdout.read_exact(&mut buf)).await??; //debug!("got response for {}", tag.blknum); Ok::<[u8;8192], Error>(buf) };