From 28675739de9af9e8a0e09ed177963ec6fb2eaa4e Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Wed, 27 Oct 2021 19:00:40 +0300 Subject: [PATCH] WIP --- pageserver/src/walredo.rs | 73 ++++++++++++++++++++------------------- 1 file changed, 37 insertions(+), 36 deletions(-) diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 4ceb4e2b37..915193a5d6 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -30,6 +30,7 @@ use std::io::Error; use std::path::PathBuf; use std::process::Stdio; use std::sync::Mutex; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Duration; use std::time::Instant; use tokio::io::AsyncBufReadExt; @@ -133,14 +134,15 @@ lazy_static! { /// perform WAL replay. Only one thread can use the processs at a time, /// that is controlled by the Mutex. In the future, we might want to /// launch a pool of processes to allow concurrent replay of multiple -/// records. +/// records. FIXME we have a pool now /// pub struct PostgresRedoManager { tenantid: ZTenantId, conf: &'static PageServerConf, runtime: tokio::runtime::Runtime, - process: Mutex>, + processes: Vec>>, + next: AtomicUsize, } #[derive(Debug)] @@ -210,14 +212,15 @@ impl WalRedoManager for PostgresRedoManager { end_time = Instant::now(); WAL_REDO_TIME.observe(end_time.duration_since(start_time).as_secs_f64()); } else { - let mut process_guard = self.process.lock().unwrap(); + let process_no = self.next.fetch_add(1, Ordering::AcqRel) % self.processes.len(); + let mut process_guard = self.processes[process_no].lock().unwrap(); let lock_time = Instant::now(); // launch the WAL redo process on first use if process_guard.is_none() { let p = self .runtime - .block_on(PostgresRedoProcess::launch(self.conf, &self.tenantid))?; + .block_on(PostgresRedoProcess::launch(self.conf, process_no, &self.tenantid))?; *process_guard = Some(p); } let process = process_guard.as_mut().unwrap(); @@ -248,12 +251,18 @@ impl PostgresRedoManager { .build() .unwrap(); + let mut processes: Vec>> = Vec::new(); + for _ in 1..4 { + processes.push(Mutex::new(None)); + } + // The actual process is launched lazily, on first request. PostgresRedoManager { runtime, tenantid, conf, - process: Mutex::new(None), + processes, + next: AtomicUsize::new(0), } } @@ -282,7 +291,7 @@ impl PostgresRedoManager { let duration = start.elapsed(); - debug!( + info!( "postgres applied {} WAL records in {} ms to reconstruct page image at LSN {}", nrecords, duration.as_millis(), @@ -479,12 +488,13 @@ impl PostgresRedoProcess { // async fn launch( conf: &PageServerConf, + process_no: usize, tenantid: &ZTenantId, ) -> Result { // FIXME: We need a dummy Postgres cluster to run the process in. Currently, we // just create one with constant name. That fails if you try to launch more than // one WAL redo manager concurrently. - let datadir = conf.tenant_path(tenantid).join("wal-redo-datadir"); + let datadir = conf.tenant_path(tenantid).join(format!("wal-redo-datadir-{}", process_no)); // Create empty data directory for wal-redo postgres, deleting old one first. if datadir.exists() { @@ -590,22 +600,23 @@ impl PostgresRedoProcess { let f_stdin = async { // Send base image, if any. (If the record initializes the page, previous page // version is not needed.) - timeout( - TIMEOUT, - stdin.write_all(&build_begin_redo_for_block_msg(tag)), - ) - .await??; + let mut buf: Vec = Vec::new(); + build_begin_redo_for_block_msg(tag, &mut buf); + timeout(TIMEOUT, stdin.write_all(&buf)).await??; + buf.clear(); if let Some(img) = base_img { - timeout(TIMEOUT, stdin.write_all(&build_push_page_msg(tag, &img))).await??; + build_push_page_msg(tag, &img, &mut buf); + timeout(TIMEOUT, stdin.write_all(&buf)).await??; + buf.clear(); } // Send WAL records. for (lsn, rec) in records.iter() { WAL_REDO_RECORD_COUNTER.inc(); - stdin - .write_all(&build_apply_record_msg(*lsn, &rec.rec)) - .await?; + build_apply_record_msg(*lsn, &rec.rec, &mut buf); + timeout(TIMEOUT, stdin.write_all(&buf)).await??; + buf.clear(); //debug!("sent WAL record to wal redo postgres process ({:X}/{:X}", // r.lsn >> 32, r.lsn & 0xffff_ffff); @@ -614,7 +625,9 @@ impl PostgresRedoProcess { // records.len(), lsn >> 32, lsn & 0xffff_ffff); // Send GetPage command to get the result back - timeout(TIMEOUT, stdin.write_all(&build_get_page_msg(tag))).await??; + build_get_page_msg(tag, &mut buf); + timeout(TIMEOUT, stdin.write_all(&buf)).await??; + buf.clear(); timeout(TIMEOUT, stdin.flush()).await??; //debug!("sent GetPage for {}", tag.blknum); Ok::<(), Error>(()) @@ -641,41 +654,34 @@ impl PostgresRedoProcess { // process. See vendor/postgres/src/backend/tcop/zenith_wal_redo.c for // explanation of the protocol. -fn build_begin_redo_for_block_msg(tag: BufferTag) -> Vec { +fn build_begin_redo_for_block_msg(tag: BufferTag, buf: &mut Vec) { let len = 4 + 1 + 4 * 4; - let mut buf = Vec::with_capacity(1 + len); buf.put_u8(b'B'); buf.put_u32(len as u32); - tag.ser_into(&mut buf) + tag.ser_into(buf) .expect("serialize BufferTag should always succeed"); debug_assert!(buf.len() == 1 + len); - - buf } -fn build_push_page_msg(tag: BufferTag, base_img: &[u8]) -> Vec { +fn build_push_page_msg(tag: BufferTag, base_img: &[u8], buf: &mut Vec) { assert!(base_img.len() == 8192); let len = 4 + 1 + 4 * 4 + base_img.len(); - let mut buf = Vec::with_capacity(1 + len); buf.put_u8(b'P'); buf.put_u32(len as u32); - tag.ser_into(&mut buf) + tag.ser_into(buf) .expect("serialize BufferTag should always succeed"); buf.put(base_img); debug_assert!(buf.len() == 1 + len); - - buf } -fn build_apply_record_msg(endlsn: Lsn, rec: &[u8]) -> Vec { +fn build_apply_record_msg(endlsn: Lsn, rec: &[u8], buf: &mut Vec) { let len = 4 + 8 + rec.len(); - let mut buf: Vec = Vec::with_capacity(1 + len); buf.put_u8(b'A'); buf.put_u32(len as u32); @@ -683,20 +689,15 @@ fn build_apply_record_msg(endlsn: Lsn, rec: &[u8]) -> Vec { buf.put(rec); debug_assert!(buf.len() == 1 + len); - - buf } -fn build_get_page_msg(tag: BufferTag) -> Vec { +fn build_get_page_msg(tag: BufferTag, buf: &mut Vec) { let len = 4 + 1 + 4 * 4; - let mut buf = Vec::with_capacity(1 + len); buf.put_u8(b'G'); buf.put_u32(len as u32); - tag.ser_into(&mut buf) + tag.ser_into(buf) .expect("serialize BufferTag should always succeed"); debug_assert!(buf.len() == 1 + len); - - buf }