error handling

This commit is contained in:
Christian Schwarz
2024-01-31 12:22:21 +01:00
parent 8012b80a45
commit 2736f61604

View File

@@ -699,17 +699,10 @@ impl WalRedoProcess {
let stderr = child.stderr.take().unwrap();
let stderr = tokio::process::ChildStderr::from_std(stderr)
.context("convert to tokio::ChildStderr")?;
macro_rules! set_nonblock_or_log_err {
($file:ident) => {{
let res = set_nonblock($file.as_raw_fd());
if let Err(e) = &res {
error!(error = %e, file = stringify!($file), pid = child.id(), "set_nonblock failed");
}
res
}};
}
set_nonblock_or_log_err!(stdin)?;
set_nonblock_or_log_err!(stdout)?;
let stdin =
tokio::process::ChildStdin::from_std(stdin).context("convert to tokio::ChildStdin")?;
let stdout = tokio::process::ChildStdout::from_std(stdout)
.context("convert to tokio::ChildStdout")?;
// all fallible operations post-spawn are complete, so get rid of the guard
let child = scopeguard::ScopeGuard::into_inner(child);
@@ -755,11 +748,11 @@ impl WalRedoProcess {
tenant_shard_id,
child: Some(child),
stdin: tokio::sync::Mutex::new(ProcessInput {
stdin: tokio::process::ChildStdin::from_std(stdin).unwrap(), // TODO error handling
stdin,
n_requests: 0,
}),
stdout: tokio::sync::Mutex::new(ProcessOutput {
stdout: tokio::process::ChildStdout::from_std(stdout).unwrap(), // TODO error handling
stdout,
pending_responses: VecDeque::new(),
n_processed_responses: 0,
}),
@@ -826,7 +819,11 @@ impl WalRedoProcess {
async fn apply_wal_records0(&self, writebuf: &[u8]) -> anyhow::Result<Bytes> {
let input = self.stdin.lock().await;
input.stdin.write_all(writebuf).await.unwrap(); // TODO: bring back timeout & error handling
input
.stdin
.write_all(writebuf)
.await
.context("write to walredo stdin")?;
let request_no = input.n_requests;
input.n_requests += 1;
drop(input);
@@ -849,7 +846,11 @@ impl WalRedoProcess {
// We expect the WAL redo process to respond with an 8k page image. We read it
// into this buffer.
let mut resultbuf = vec![0; BLCKSZ.into()];
output.stdout.read_exact(&mut resultbuf).await.unwrap();
output
.stdout
.read_exact(&mut resultbuf)
.await
.context("read walredo stdout")?;
output
.pending_responses
.push_back(Some(Bytes::from(resultbuf)));