WIP: async walredo

This commit is contained in:
Christian Schwarz
2024-01-27 12:47:29 +00:00
parent 6f488b3bd3
commit 03abbaaddf

View File

@@ -21,7 +21,6 @@
use anyhow::Context;
use byteorder::{ByteOrder, LittleEndian};
use bytes::{BufMut, Bytes, BytesMut};
use nix::poll::*;
use pageserver_api::shard::TenantShardId;
use serde::Serialize;
use std::collections::VecDeque;
@@ -31,10 +30,11 @@ use std::ops::{Deref, DerefMut};
use std::os::unix::io::AsRawFd;
use std::os::unix::prelude::CommandExt;
use std::process::Stdio;
use std::process::{Child, ChildStdin, ChildStdout, Command};
use std::sync::{Arc, Mutex, MutexGuard, RwLock};
use std::process::{Child, Command};
use std::sync::{Arc, RwLock};
use std::time::Duration;
use std::time::Instant;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tracing::*;
use utils::{bin_ser::BeSer, lsn::Lsn, nonblock::set_nonblock};
@@ -73,12 +73,12 @@ pub(crate) struct BufferTag {
}
struct ProcessInput {
stdin: ChildStdin,
stdin: tokio::process::ChildStdin,
n_requests: usize,
}
struct ProcessOutput {
stdout: ChildStdout,
stdout: tokio::process::ChildStdout,
pending_responses: VecDeque<Option<Bytes>>,
n_processed_responses: usize,
}
@@ -157,6 +157,7 @@ impl PostgresRedoManager {
self.conf.wal_redo_timeout,
pg_version,
)
.await
};
img = Some(result?);
@@ -177,6 +178,7 @@ impl PostgresRedoManager {
self.conf.wal_redo_timeout,
pg_version,
)
.await
}
}
}
@@ -217,7 +219,7 @@ impl PostgresRedoManager {
/// Process one request for WAL redo using wal-redo postgres
///
#[allow(clippy::too_many_arguments)]
fn apply_batch_postgres(
async fn apply_batch_postgres(
&self,
key: Key,
lsn: Lsn,
@@ -270,6 +272,7 @@ impl PostgresRedoManager {
let buf_tag = BufferTag { rel, blknum };
let result = proc
.apply_wal_records(buf_tag, &base_img, records, wal_redo_timeout)
.await
.context("apply_wal_records");
let duration = started_at.elapsed();
@@ -647,8 +650,8 @@ struct WalRedoProcess {
tenant_shard_id: TenantShardId,
// Some() on construction, only becomes None on Drop.
child: Option<NoLeakChild>,
stdout: Mutex<ProcessOutput>,
stdin: Mutex<ProcessInput>,
stdout: tokio::sync::Mutex<ProcessOutput>,
stdin: tokio::sync::Mutex<ProcessInput>,
/// Counter to separate same sized walredo inputs failing at the same millisecond.
#[cfg(feature = "testing")]
dump_sequence: AtomicUsize,
@@ -754,12 +757,12 @@ impl WalRedoProcess {
conf,
tenant_shard_id,
child: Some(child),
stdin: Mutex::new(ProcessInput {
stdin,
stdin: tokio::sync::Mutex::new(ProcessInput {
stdin: tokio::process::ChildStdin::from_std(stdin).unwrap(), // TODO error handling
n_requests: 0,
}),
stdout: Mutex::new(ProcessOutput {
stdout,
stdout: tokio::sync::Mutex::new(ProcessOutput {
stdout: tokio::process::ChildStdout::from_std(stdout).unwrap(), // TODO error handling
pending_responses: VecDeque::new(),
n_processed_responses: 0,
}),
@@ -779,15 +782,13 @@ impl WalRedoProcess {
// new page image.
//
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), pid=%self.id()))]
fn apply_wal_records(
async fn apply_wal_records(
&self,
tag: BufferTag,
base_img: &Option<Bytes>,
records: &[(Lsn, NeonWalRecord)],
wal_redo_timeout: Duration,
) -> anyhow::Result<Bytes> {
let input = self.stdin.lock().unwrap();
// Serialize all the messages to send the WAL redo process first.
//
// This could be problematic if there are millions of records to replay,
@@ -816,7 +817,7 @@ impl WalRedoProcess {
build_get_page_msg(tag, &mut writebuf);
WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
let res = self.apply_wal_records0(&writebuf, input, wal_redo_timeout);
let res = self.apply_wal_records0(&writebuf, wal_redo_timeout).await;
if res.is_err() {
// not all of these can be caused by this particular input, however these are so rare
@@ -827,38 +828,17 @@ impl WalRedoProcess {
res
}
fn apply_wal_records0(
async fn apply_wal_records0(
&self,
writebuf: &[u8],
input: MutexGuard<ProcessInput>,
wal_redo_timeout: Duration,
_wal_redo_timeout: Duration, // TODO respect
) -> anyhow::Result<Bytes> {
let input = self.stdin.lock().await;
let mut proc = { input }; // TODO: remove this legacy rename, but this keep the patch small.
let mut nwrite = 0usize;
while nwrite < writebuf.len() {
let mut stdin_pollfds = [PollFd::new(&proc.stdin, PollFlags::POLLOUT)];
let n = loop {
match nix::poll::poll(&mut stdin_pollfds[..], wal_redo_timeout.as_millis() as i32) {
Err(nix::errno::Errno::EINTR) => continue,
res => break res,
}
}?;
proc.stdin.write_all(writebuf).await.unwrap(); // TODO: bring back timeout & error handling
if n == 0 {
anyhow::bail!("WAL redo timed out");
}
// If 'stdin' is writeable, do write.
let in_revents = stdin_pollfds[0].revents().unwrap();
if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
nwrite += proc.stdin.write(&writebuf[nwrite..])?;
}
if in_revents.contains(PollFlags::POLLHUP) {
// We still have more data to write, but the process closed the pipe.
anyhow::bail!("WAL redo process closed its stdin unexpectedly");
}
}
let request_no = proc.n_requests;
proc.n_requests += 1;
drop(proc);
@@ -875,40 +855,13 @@ impl WalRedoProcess {
// pending responses ring buffer and truncate all empty elements from the front,
// advancing processed responses number.
let mut output = self.stdout.lock().unwrap();
let mut output = self.stdout.lock().await;
let n_processed_responses = output.n_processed_responses;
while n_processed_responses + output.pending_responses.len() <= request_no {
// We expect the WAL redo process to respond with an 8k page image. We read it
// into this buffer.
let mut resultbuf = vec![0; BLCKSZ.into()];
let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far
while nresult < BLCKSZ.into() {
let mut stdout_pollfds = [PollFd::new(&output.stdout, PollFlags::POLLIN)];
// We do two things simultaneously: reading response from stdout
// and forward any logging information that the child writes to its stderr to the page server's log.
let n = loop {
match nix::poll::poll(
&mut stdout_pollfds[..],
wal_redo_timeout.as_millis() as i32,
) {
Err(nix::errno::Errno::EINTR) => continue,
res => break res,
}
}?;
if n == 0 {
anyhow::bail!("WAL redo timed out");
}
// If we have some data in stdout, read it to the result buffer.
let out_revents = stdout_pollfds[0].revents().unwrap();
if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
nresult += output.stdout.read(&mut resultbuf[nresult..])?;
}
if out_revents.contains(PollFlags::POLLHUP) {
anyhow::bail!("WAL redo process closed its stdout unexpectedly");
}
}
output.stdout.read_exact(&mut resultbuf).await.unwrap();
output
.pending_responses
.push_back(Some(Bytes::from(resultbuf)));