This commit is contained in:
Heikki Linnakangas
2021-10-27 19:00:40 +03:00
parent ea90d102e2
commit 28675739de

View File

@@ -30,6 +30,7 @@ use std::io::Error;
use std::path::PathBuf;
use std::process::Stdio;
use std::sync::Mutex;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use std::time::Instant;
use tokio::io::AsyncBufReadExt;
@@ -133,14 +134,15 @@ lazy_static! {
/// perform WAL replay. Only one thread can use the processs at a time,
/// that is controlled by the Mutex. In the future, we might want to
/// launch a pool of processes to allow concurrent replay of multiple
/// records.
/// records. FIXME we have a pool now
///
pub struct PostgresRedoManager {
tenantid: ZTenantId,
conf: &'static PageServerConf,
runtime: tokio::runtime::Runtime,
process: Mutex<Option<PostgresRedoProcess>>,
processes: Vec<Mutex<Option<PostgresRedoProcess>>>,
next: AtomicUsize,
}
#[derive(Debug)]
@@ -210,14 +212,15 @@ impl WalRedoManager for PostgresRedoManager {
end_time = Instant::now();
WAL_REDO_TIME.observe(end_time.duration_since(start_time).as_secs_f64());
} else {
let mut process_guard = self.process.lock().unwrap();
let process_no = self.next.fetch_add(1, Ordering::AcqRel) % self.processes.len();
let mut process_guard = self.processes[process_no].lock().unwrap();
let lock_time = Instant::now();
// launch the WAL redo process on first use
if process_guard.is_none() {
let p = self
.runtime
.block_on(PostgresRedoProcess::launch(self.conf, &self.tenantid))?;
.block_on(PostgresRedoProcess::launch(self.conf, process_no, &self.tenantid))?;
*process_guard = Some(p);
}
let process = process_guard.as_mut().unwrap();
@@ -248,12 +251,18 @@ impl PostgresRedoManager {
.build()
.unwrap();
let mut processes: Vec<Mutex<Option<PostgresRedoProcess>>> = Vec::new();
for _ in 1..4 {
processes.push(Mutex::new(None));
}
// The actual process is launched lazily, on first request.
PostgresRedoManager {
runtime,
tenantid,
conf,
process: Mutex::new(None),
processes,
next: AtomicUsize::new(0),
}
}
@@ -282,7 +291,7 @@ impl PostgresRedoManager {
let duration = start.elapsed();
debug!(
info!(
"postgres applied {} WAL records in {} ms to reconstruct page image at LSN {}",
nrecords,
duration.as_millis(),
@@ -479,12 +488,13 @@ impl PostgresRedoProcess {
//
async fn launch(
conf: &PageServerConf,
process_no: usize,
tenantid: &ZTenantId,
) -> Result<PostgresRedoProcess, Error> {
// FIXME: We need a dummy Postgres cluster to run the process in. Currently, we
// just create one with constant name. That fails if you try to launch more than
// one WAL redo manager concurrently.
let datadir = conf.tenant_path(tenantid).join("wal-redo-datadir");
let datadir = conf.tenant_path(tenantid).join(format!("wal-redo-datadir-{}", process_no));
// Create empty data directory for wal-redo postgres, deleting old one first.
if datadir.exists() {
@@ -590,22 +600,23 @@ impl PostgresRedoProcess {
let f_stdin = async {
// Send base image, if any. (If the record initializes the page, previous page
// version is not needed.)
timeout(
TIMEOUT,
stdin.write_all(&build_begin_redo_for_block_msg(tag)),
)
.await??;
let mut buf: Vec<u8> = Vec::new();
build_begin_redo_for_block_msg(tag, &mut buf);
timeout(TIMEOUT, stdin.write_all(&buf)).await??;
buf.clear();
if let Some(img) = base_img {
timeout(TIMEOUT, stdin.write_all(&build_push_page_msg(tag, &img))).await??;
build_push_page_msg(tag, &img, &mut buf);
timeout(TIMEOUT, stdin.write_all(&buf)).await??;
buf.clear();
}
// Send WAL records.
for (lsn, rec) in records.iter() {
WAL_REDO_RECORD_COUNTER.inc();
stdin
.write_all(&build_apply_record_msg(*lsn, &rec.rec))
.await?;
build_apply_record_msg(*lsn, &rec.rec, &mut buf);
timeout(TIMEOUT, stdin.write_all(&buf)).await??;
buf.clear();
//debug!("sent WAL record to wal redo postgres process ({:X}/{:X}",
// r.lsn >> 32, r.lsn & 0xffff_ffff);
@@ -614,7 +625,9 @@ impl PostgresRedoProcess {
// records.len(), lsn >> 32, lsn & 0xffff_ffff);
// Send GetPage command to get the result back
timeout(TIMEOUT, stdin.write_all(&build_get_page_msg(tag))).await??;
build_get_page_msg(tag, &mut buf);
timeout(TIMEOUT, stdin.write_all(&buf)).await??;
buf.clear();
timeout(TIMEOUT, stdin.flush()).await??;
//debug!("sent GetPage for {}", tag.blknum);
Ok::<(), Error>(())
@@ -641,41 +654,34 @@ impl PostgresRedoProcess {
// process. See vendor/postgres/src/backend/tcop/zenith_wal_redo.c for
// explanation of the protocol.
fn build_begin_redo_for_block_msg(tag: BufferTag) -> Vec<u8> {
fn build_begin_redo_for_block_msg(tag: BufferTag, buf: &mut Vec<u8>) {
let len = 4 + 1 + 4 * 4;
let mut buf = Vec::with_capacity(1 + len);
buf.put_u8(b'B');
buf.put_u32(len as u32);
tag.ser_into(&mut buf)
tag.ser_into(buf)
.expect("serialize BufferTag should always succeed");
debug_assert!(buf.len() == 1 + len);
buf
}
fn build_push_page_msg(tag: BufferTag, base_img: &[u8]) -> Vec<u8> {
fn build_push_page_msg(tag: BufferTag, base_img: &[u8], buf: &mut Vec<u8>) {
assert!(base_img.len() == 8192);
let len = 4 + 1 + 4 * 4 + base_img.len();
let mut buf = Vec::with_capacity(1 + len);
buf.put_u8(b'P');
buf.put_u32(len as u32);
tag.ser_into(&mut buf)
tag.ser_into(buf)
.expect("serialize BufferTag should always succeed");
buf.put(base_img);
debug_assert!(buf.len() == 1 + len);
buf
}
fn build_apply_record_msg(endlsn: Lsn, rec: &[u8]) -> Vec<u8> {
fn build_apply_record_msg(endlsn: Lsn, rec: &[u8], buf: &mut Vec<u8>) {
let len = 4 + 8 + rec.len();
let mut buf: Vec<u8> = Vec::with_capacity(1 + len);
buf.put_u8(b'A');
buf.put_u32(len as u32);
@@ -683,20 +689,15 @@ fn build_apply_record_msg(endlsn: Lsn, rec: &[u8]) -> Vec<u8> {
buf.put(rec);
debug_assert!(buf.len() == 1 + len);
buf
}
fn build_get_page_msg(tag: BufferTag) -> Vec<u8> {
fn build_get_page_msg(tag: BufferTag, buf: &mut Vec<u8>) {
let len = 4 + 1 + 4 * 4;
let mut buf = Vec::with_capacity(1 + len);
buf.put_u8(b'G');
buf.put_u32(len as u32);
tag.ser_into(&mut buf)
tag.ser_into(buf)
.expect("serialize BufferTag should always succeed");
debug_assert!(buf.len() == 1 + len);
buf
}