diff --git a/src/walredo.rs b/src/walredo.rs index 4b484a4481..5ea7e8b238 100644 --- a/src/walredo.rs +++ b/src/walredo.rs @@ -19,12 +19,14 @@ use tokio::process::{Command, Child, ChildStdin, ChildStdout}; use std::process::Stdio; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::AsyncBufReadExt; +use tokio::time::timeout; use std::io::Error; use std::cell::RefCell; use std::assert; use std::sync::{Arc}; use log::*; use std::time::Instant; +use std::time::Duration; use bytes::{Bytes, BytesMut, BufMut}; @@ -33,6 +35,8 @@ use crate::page_cache::CacheEntry; use crate::page_cache::WALRecord; use crate::page_cache; +static TIMEOUT: Duration = Duration::from_secs(20); + // // Main entry point for the WAL applicator thread. // @@ -65,7 +69,11 @@ pub fn wal_applicator_main() let request = walredo_channel_receiver.recv().unwrap(); - handle_apply_request(&process, &runtime, request); + let result = handle_apply_request(&process, &runtime, request); + if result.is_err() { + // On error, kill the process. + break; + } } info!("killing WAL redo postgres process"); @@ -76,7 +84,7 @@ pub fn wal_applicator_main() } } -fn handle_apply_request(process: &WalRedoProcess, runtime: &Runtime, entry_rc: Arc) +fn handle_apply_request(process: &WalRedoProcess, runtime: &Runtime, entry_rc: Arc) -> Result<(), Error> { let tag = entry_rc.key.tag; let lsn = entry_rc.key.lsn; @@ -91,19 +99,25 @@ fn handle_apply_request(process: &WalRedoProcess, runtime: &Runtime, entry_rc: A let apply_result = process.apply_wal_records(runtime, tag, base_img, records); let duration = start.elapsed(); + let result; + debug!("applied {} WAL records in {} ms to reconstruct page image at LSN {:X}/{:X}", nrecords, duration.as_millis(), lsn >> 32, lsn & 0xffff_ffff); if let Err(e) = apply_result { error!("could not apply WAL records: {}", e); + result = Err(e); } else { entry.page_image = Some(apply_result.unwrap()); page_cache::PAGECACHE.num_page_images.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + result = Ok(()); } // Wake up the requester, whether the operation succeeded or not. entry_rc.walredo_condvar.notify_all(); + + return result; } struct WalRedoProcess { @@ -179,23 +193,25 @@ impl WalRedoProcess { let f_stdin = async { // Send base image, if any. (If the record initializes the page, previous page // version is not needed.) - stdin.write(&build_begin_redo_for_block_msg(tag)).await?; + timeout(TIMEOUT, stdin.write(&build_begin_redo_for_block_msg(tag))).await??; if base_img.is_some() { - stdin.write(&build_push_page_msg(tag, base_img.unwrap())).await?; + timeout(TIMEOUT, stdin.write(&build_push_page_msg(tag, base_img.unwrap()))).await??; } // Send WAL records. for rec in records.iter() { let r = rec.clone(); - stdin.write(&build_apply_record_msg(r.lsn, r.rec)).await?; + timeout(TIMEOUT, stdin.write(&build_apply_record_msg(r.lsn, r.rec))).await??; + //debug!("sent WAL record to wal redo postgres process ({:X}/{:X}", + // r.lsn >> 32, r.lsn & 0xffff_ffff); } //debug!("sent {} WAL records to wal redo postgres process ({:X}/{:X}", // records.len(), lsn >> 32, lsn & 0xffff_ffff); // Send GetPage command to get the result back - stdin.write(&build_get_page_msg(tag)).await?; - stdin.flush().await?; + timeout(TIMEOUT, stdin.write(&build_get_page_msg(tag))).await??; + timeout(TIMEOUT, stdin.flush()).await??; //debug!("sent GetPage for {}", tag.blknum); Ok::<(), Error>(()) }; @@ -204,7 +220,7 @@ impl WalRedoProcess { let f_stdout = async { let mut buf = [0u8; 8192]; - stdout.read_exact(&mut buf).await?; + timeout(TIMEOUT, stdout.read_exact(&mut buf)).await??; //debug!("got response for {}", tag.blknum); Ok::<[u8;8192], Error>(buf) };