mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-06 13:02:55 +00:00
Add timeouts to WAL redo, to prevent it from getting stuck.
It is getting stuck at least in spgist index currently. Not sure why, that needs to be investigated, but having a timeout is a good idea anyway.
This commit is contained in:
committed by
Stas Kelvich
parent
95c1ef5bb7
commit
e4d90fe0bb
@@ -19,12 +19,14 @@ use tokio::process::{Command, Child, ChildStdin, ChildStdout};
|
|||||||
use std::process::Stdio;
|
use std::process::Stdio;
|
||||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||||
use tokio::io::AsyncBufReadExt;
|
use tokio::io::AsyncBufReadExt;
|
||||||
|
use tokio::time::timeout;
|
||||||
use std::io::Error;
|
use std::io::Error;
|
||||||
use std::cell::RefCell;
|
use std::cell::RefCell;
|
||||||
use std::assert;
|
use std::assert;
|
||||||
use std::sync::{Arc};
|
use std::sync::{Arc};
|
||||||
use log::*;
|
use log::*;
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
use bytes::{Bytes, BytesMut, BufMut};
|
use bytes::{Bytes, BytesMut, BufMut};
|
||||||
|
|
||||||
@@ -33,6 +35,8 @@ use crate::page_cache::CacheEntry;
|
|||||||
use crate::page_cache::WALRecord;
|
use crate::page_cache::WALRecord;
|
||||||
use crate::page_cache;
|
use crate::page_cache;
|
||||||
|
|
||||||
|
static TIMEOUT: Duration = Duration::from_secs(20);
|
||||||
|
|
||||||
//
|
//
|
||||||
// Main entry point for the WAL applicator thread.
|
// Main entry point for the WAL applicator thread.
|
||||||
//
|
//
|
||||||
@@ -65,7 +69,11 @@ pub fn wal_applicator_main()
|
|||||||
|
|
||||||
let request = walredo_channel_receiver.recv().unwrap();
|
let request = walredo_channel_receiver.recv().unwrap();
|
||||||
|
|
||||||
handle_apply_request(&process, &runtime, request);
|
let result = handle_apply_request(&process, &runtime, request);
|
||||||
|
if result.is_err() {
|
||||||
|
// On error, kill the process.
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("killing WAL redo postgres process");
|
info!("killing WAL redo postgres process");
|
||||||
@@ -76,7 +84,7 @@ pub fn wal_applicator_main()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_apply_request(process: &WalRedoProcess, runtime: &Runtime, entry_rc: Arc<CacheEntry>)
|
fn handle_apply_request(process: &WalRedoProcess, runtime: &Runtime, entry_rc: Arc<CacheEntry>) -> Result<(), Error>
|
||||||
{
|
{
|
||||||
let tag = entry_rc.key.tag;
|
let tag = entry_rc.key.tag;
|
||||||
let lsn = entry_rc.key.lsn;
|
let lsn = entry_rc.key.lsn;
|
||||||
@@ -91,19 +99,25 @@ fn handle_apply_request(process: &WalRedoProcess, runtime: &Runtime, entry_rc: A
|
|||||||
let apply_result = process.apply_wal_records(runtime, tag, base_img, records);
|
let apply_result = process.apply_wal_records(runtime, tag, base_img, records);
|
||||||
let duration = start.elapsed();
|
let duration = start.elapsed();
|
||||||
|
|
||||||
|
let result;
|
||||||
|
|
||||||
debug!("applied {} WAL records in {} ms to reconstruct page image at LSN {:X}/{:X}",
|
debug!("applied {} WAL records in {} ms to reconstruct page image at LSN {:X}/{:X}",
|
||||||
nrecords, duration.as_millis(),
|
nrecords, duration.as_millis(),
|
||||||
lsn >> 32, lsn & 0xffff_ffff);
|
lsn >> 32, lsn & 0xffff_ffff);
|
||||||
|
|
||||||
if let Err(e) = apply_result {
|
if let Err(e) = apply_result {
|
||||||
error!("could not apply WAL records: {}", e);
|
error!("could not apply WAL records: {}", e);
|
||||||
|
result = Err(e);
|
||||||
} else {
|
} else {
|
||||||
entry.page_image = Some(apply_result.unwrap());
|
entry.page_image = Some(apply_result.unwrap());
|
||||||
page_cache::PAGECACHE.num_page_images.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
page_cache::PAGECACHE.num_page_images.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||||
|
result = Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wake up the requester, whether the operation succeeded or not.
|
// Wake up the requester, whether the operation succeeded or not.
|
||||||
entry_rc.walredo_condvar.notify_all();
|
entry_rc.walredo_condvar.notify_all();
|
||||||
|
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
struct WalRedoProcess {
|
struct WalRedoProcess {
|
||||||
@@ -179,23 +193,25 @@ impl WalRedoProcess {
|
|||||||
let f_stdin = async {
|
let f_stdin = async {
|
||||||
// Send base image, if any. (If the record initializes the page, previous page
|
// Send base image, if any. (If the record initializes the page, previous page
|
||||||
// version is not needed.)
|
// version is not needed.)
|
||||||
stdin.write(&build_begin_redo_for_block_msg(tag)).await?;
|
timeout(TIMEOUT, stdin.write(&build_begin_redo_for_block_msg(tag))).await??;
|
||||||
if base_img.is_some() {
|
if base_img.is_some() {
|
||||||
stdin.write(&build_push_page_msg(tag, base_img.unwrap())).await?;
|
timeout(TIMEOUT, stdin.write(&build_push_page_msg(tag, base_img.unwrap()))).await??;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send WAL records.
|
// Send WAL records.
|
||||||
for rec in records.iter() {
|
for rec in records.iter() {
|
||||||
let r = rec.clone();
|
let r = rec.clone();
|
||||||
|
|
||||||
stdin.write(&build_apply_record_msg(r.lsn, r.rec)).await?;
|
timeout(TIMEOUT, stdin.write(&build_apply_record_msg(r.lsn, r.rec))).await??;
|
||||||
|
//debug!("sent WAL record to wal redo postgres process ({:X}/{:X}",
|
||||||
|
// r.lsn >> 32, r.lsn & 0xffff_ffff);
|
||||||
}
|
}
|
||||||
//debug!("sent {} WAL records to wal redo postgres process ({:X}/{:X}",
|
//debug!("sent {} WAL records to wal redo postgres process ({:X}/{:X}",
|
||||||
// records.len(), lsn >> 32, lsn & 0xffff_ffff);
|
// records.len(), lsn >> 32, lsn & 0xffff_ffff);
|
||||||
|
|
||||||
// Send GetPage command to get the result back
|
// Send GetPage command to get the result back
|
||||||
stdin.write(&build_get_page_msg(tag)).await?;
|
timeout(TIMEOUT, stdin.write(&build_get_page_msg(tag))).await??;
|
||||||
stdin.flush().await?;
|
timeout(TIMEOUT, stdin.flush()).await??;
|
||||||
//debug!("sent GetPage for {}", tag.blknum);
|
//debug!("sent GetPage for {}", tag.blknum);
|
||||||
Ok::<(), Error>(())
|
Ok::<(), Error>(())
|
||||||
};
|
};
|
||||||
@@ -204,7 +220,7 @@ impl WalRedoProcess {
|
|||||||
let f_stdout = async {
|
let f_stdout = async {
|
||||||
let mut buf = [0u8; 8192];
|
let mut buf = [0u8; 8192];
|
||||||
|
|
||||||
stdout.read_exact(&mut buf).await?;
|
timeout(TIMEOUT, stdout.read_exact(&mut buf)).await??;
|
||||||
//debug!("got response for {}", tag.blknum);
|
//debug!("got response for {}", tag.blknum);
|
||||||
Ok::<[u8;8192], Error>(buf)
|
Ok::<[u8;8192], Error>(buf)
|
||||||
};
|
};
|
||||||
|
|||||||
Reference in New Issue
Block a user