Compare commits

...

2 Commits

Author SHA1 Message Date
Heikki Linnakangas
2359f69278 WIP: Process received GetPage requests in parallel 2022-11-21 20:43:16 +02:00
Heikki Linnakangas
7235377e25 Have a pool of WAL redo processes per tenant
To allow more concurrency, have a pool of WAL redo processes that can
grow up to 4 processes per tenant. There's no way to shrink the pool,
that's why I'm capping it at 4 processes, to keep the total number of
processes reasonable.
2022-11-21 20:42:44 +02:00
2 changed files with 172 additions and 88 deletions

View File

@@ -13,6 +13,7 @@ use anyhow::{bail, ensure, Context, Result};
use bytes::Buf;
use bytes::Bytes;
use futures::{Stream, StreamExt};
use futures::stream::FuturesOrdered;
use pageserver_api::models::{
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
@@ -25,6 +26,7 @@ use std::net::TcpListener;
use std::str;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Instant;
use tokio::pin;
use tokio_util::io::StreamReader;
use tokio_util::io::SyncIoBridge;
@@ -54,6 +56,9 @@ use crate::CheckpointConfig;
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
use postgres_ffi::BLCKSZ;
/// Number of requests to process in parallel, from a single connection
const MAX_INFLIGHT_REQUESTS: usize = 4;
fn copyin_stream(pgb: &mut PostgresBackend) -> impl Stream<Item = io::Result<Bytes>> + '_ {
async_stream::try_stream! {
loop {
@@ -224,6 +229,13 @@ struct PageRequestMetrics {
get_db_size: metrics::Histogram,
}
pub enum RequestType {
Exists,
Nblocks,
GetPage,
DbSize,
}
impl PageRequestMetrics {
fn new(tenant_id: &TenantId, timeline_id: &TimelineId) -> Self {
let tenant_id = tenant_id.to_string();
@@ -298,66 +310,101 @@ impl PageServerHandler {
let metrics = PageRequestMetrics::new(&tenant_id, &timeline_id);
//
// Main loop to handle the stream of requests
//
// We process multiple requests in parallel, by spawning a new Task for each
// incoming request.
let mut inprogress_requests = FuturesOrdered::new();
loop {
let msg = tokio::select! {
tokio::select! {
biased;
// If we were requested to shut down, stop
_ = task_mgr::shutdown_watcher() => {
// We were requested to shut down.
info!("shutdown request received in page handler");
break;
}
msg = pgb.read_message() => { msg }
};
// When a task completes, send the response to the client
completed_task = inprogress_requests.next(), if !inprogress_requests.is_empty() => {
let response: Bytes;
let request_type: RequestType;
let elapsed_sec: f64;
(response, request_type, elapsed_sec) = completed_task.unwrap()?;
let copy_data_bytes = match msg? {
Some(FeMessage::CopyData(bytes)) => bytes,
Some(m) => {
bail!("unexpected message: {m:?} during COPY");
pgb.write_message(&BeMessage::CopyData(&response))?;
pgb.flush().await?;
match request_type {
RequestType::Exists => metrics.get_rel_exists.observe(elapsed_sec),
RequestType::Nblocks => metrics.get_rel_size.observe(elapsed_sec),
RequestType::GetPage => metrics.get_page_at_lsn.observe(elapsed_sec),
RequestType::DbSize => metrics.get_db_size.observe(elapsed_sec),
}
continue;
}
None => break, // client disconnected
};
trace!("query: {copy_data_bytes:?}");
// When a new request arrives, spawn a task to process it.
// If we already have MAX_INFLIGHT_REQUESTS requests in-progress, however,
// don't start new ones.
msg = pgb.read_message(), if inprogress_requests.len() < MAX_INFLIGHT_REQUESTS => {
let copy_data_bytes = match msg? {
Some(FeMessage::CopyData(bytes)) => bytes,
Some(m) => {
bail!("unexpected message: {m:?} during COPY");
}
None => break, // client disconnected
};
// Trace request if needed
if let Some(t) = tracer.as_mut() {
t.trace(&copy_data_bytes)
}
trace!("query: {copy_data_bytes:?}");
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
// Trace request if needed
if let Some(t) = tracer.as_mut() {
t.trace(&copy_data_bytes)
}
let response = match neon_fe_msg {
PagestreamFeMessage::Exists(req) => {
let _timer = metrics.get_rel_exists.start_timer();
self.handle_get_rel_exists_request(&timeline, &req).await
}
PagestreamFeMessage::Nblocks(req) => {
let _timer = metrics.get_rel_size.start_timer();
self.handle_get_nblocks_request(&timeline, &req).await
}
PagestreamFeMessage::GetPage(req) => {
let _timer = metrics.get_page_at_lsn.start_timer();
self.handle_get_page_at_lsn_request(&timeline, &req).await
}
PagestreamFeMessage::DbSize(req) => {
let _timer = metrics.get_db_size.start_timer();
self.handle_db_size_request(&timeline, &req).await
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
let timeline = Arc::clone(&timeline);
let conf = self.conf;
let task = async move {
let start_time = Instant::now();
let (response, request_type) = match neon_fe_msg {
PagestreamFeMessage::Exists(req) => {
(Self::handle_get_rel_exists_request(&timeline, &req).await,
RequestType::Exists)
}
PagestreamFeMessage::Nblocks(req) => {
(Self::handle_get_nblocks_request(&timeline, &req).await,
RequestType::Nblocks)
}
PagestreamFeMessage::GetPage(req) => {
(Self::handle_get_page_at_lsn_request(conf, &timeline, &req).await,
RequestType::GetPage)
}
PagestreamFeMessage::DbSize(req) => {
(Self::handle_db_size_request(&timeline, &req).await,
RequestType::DbSize)
}
};
let response = response.unwrap_or_else(|e| {
// print the all details to the log with {:#}, but for the client the
// error message is enough
error!("error reading relation or page version: {:?}", e);
PagestreamBeMessage::Error(PagestreamErrorResponse {
message: e.to_string(),
})
});
let response: Bytes = response.serialize();
(response, request_type, start_time.elapsed().as_secs_f64())
};
inprogress_requests.push_back(tokio::spawn(task));
continue;
}
};
let response = response.unwrap_or_else(|e| {
// print the all details to the log with {:#}, but for the client the
// error message is enough
error!("error reading relation or page version: {:?}", e);
PagestreamBeMessage::Error(PagestreamErrorResponse {
message: e.to_string(),
})
});
pgb.write_message(&BeMessage::CopyData(&response.serialize()))?;
pgb.flush().await?;
}
Ok(())
}
@@ -529,9 +576,8 @@ impl PageServerHandler {
Ok(lsn)
}
#[instrument(skip(self, timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
#[instrument(skip(timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
async fn handle_get_rel_exists_request(
&self,
timeline: &Timeline,
req: &PagestreamExistsRequest,
) -> Result<PagestreamBeMessage> {
@@ -546,9 +592,8 @@ impl PageServerHandler {
}))
}
#[instrument(skip(self, timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
#[instrument(skip(timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
async fn handle_get_nblocks_request(
&self,
timeline: &Timeline,
req: &PagestreamNblocksRequest,
) -> Result<PagestreamBeMessage> {
@@ -563,9 +608,8 @@ impl PageServerHandler {
}))
}
#[instrument(skip(self, timeline, req), fields(dbnode = %req.dbnode, req_lsn = %req.lsn))]
#[instrument(skip(timeline, req), fields(dbnode = %req.dbnode, req_lsn = %req.lsn))]
async fn handle_db_size_request(
&self,
timeline: &Timeline,
req: &PagestreamDbSizeRequest,
) -> Result<PagestreamBeMessage> {
@@ -583,11 +627,11 @@ impl PageServerHandler {
}))
}
#[instrument(skip(self, timeline, req), fields(rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn))]
async fn handle_get_page_at_lsn_request(
&self,
timeline: &Timeline,
req: &PagestreamGetPageRequest,
#[instrument(skip(conf, timeline, req), fields(rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn))]
async fn handle_get_page_at_lsn_request<'a>(
conf: &'static PageServerConf,
timeline: &'a Timeline,
req: &'a PagestreamGetPageRequest,
) -> Result<PagestreamBeMessage> {
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn)
@@ -604,7 +648,7 @@ impl PageServerHandler {
// FIXME: this profiling now happens at different place than it used to. The
// current profiling is based on a thread-local variable, so it doesn't work
// across awaits
let _profiling_guard = profpoint_start(self.conf, ProfilingConfig::PageRequests);
let _profiling_guard = profpoint_start(conf, ProfilingConfig::PageRequests);
let page = timeline.get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest)?;
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
@@ -612,9 +656,8 @@ impl PageServerHandler {
}))
}
#[instrument(skip(self, pgb))]
#[instrument(skip(pgb))]
async fn handle_basebackup_request(
&self,
pgb: &mut PostgresBackend,
tenant_id: TenantId,
timeline_id: TimelineId,
@@ -747,7 +790,7 @@ impl postgres_backend_async::Handler for PageServerHandler {
};
// Check that the timeline exists
self.handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, None, false)
Self::handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, None, false)
.await?;
pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
}
@@ -807,7 +850,7 @@ impl postgres_backend_async::Handler for PageServerHandler {
self.check_permission(Some(tenant_id))?;
// Check that the timeline exists
self.handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, prev_lsn, true)
Self::handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, prev_lsn, true)
.await?;
pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("import basebackup ") {

View File

@@ -21,6 +21,7 @@
use byteorder::{ByteOrder, LittleEndian};
use bytes::{BufMut, Bytes, BytesMut};
use nix::poll::*;
use once_cell::sync::Lazy;
use serde::Serialize;
use std::fs::OpenOptions;
use std::io::prelude::*;
@@ -31,7 +32,8 @@ use std::os::unix::prelude::CommandExt;
use std::path::PathBuf;
use std::process::Stdio;
use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command};
use std::sync::Mutex;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Condvar, Mutex};
use std::time::Duration;
use std::time::Instant;
use std::{fs, io};
@@ -57,6 +59,9 @@ use postgres_ffi::v14::nonrelfile_utils::{
};
use postgres_ffi::BLCKSZ;
/// Maximum number of WAL redo processes to launch for a single tenant.
const MAX_PROCESSES: usize = 4;
///
/// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster.
///
@@ -90,18 +95,32 @@ pub trait WalRedoManager: Send + Sync {
) -> Result<Bytes, WalRedoError>;
}
static WAL_REDO_PROCESS_COUNTER: Lazy<AtomicU64> = Lazy::new(|| { AtomicU64::new(0) });
///
/// This is the real implementation that uses a Postgres process to
/// perform WAL replay. Only one thread can use the process at a time,
/// that is controlled by the Mutex. In the future, we might want to
/// launch a pool of processes to allow concurrent replay of multiple
/// records.
/// This is the real implementation that uses a special Postgres
/// process to perform WAL replay. There is a pool of these processes.
///
pub struct PostgresRedoManager {
tenant_id: TenantId,
conf: &'static PageServerConf,
process: Mutex<Option<PostgresRedoProcess>>,
/// Pool of processes.
process_list: Mutex<ProcessList>,
/// Condition variable that can be used to sleep until a process
/// becomes available in the pool.
condvar: Condvar,
}
// A pool of WAL redo processes
#[derive(Default)]
struct ProcessList {
/// processes that are available for reuse
free_processes: Vec<PostgresRedoProcess>,
/// Total number of processes, including all the processes in
/// 'free_processes' list, and any processes that are in use.
num_processes: usize,
}
/// Can this request be served by neon redo functions
@@ -206,17 +225,39 @@ impl PostgresRedoManager {
PostgresRedoManager {
tenant_id,
conf,
process: Mutex::new(None),
process_list: Mutex::new(ProcessList::default()),
condvar: Condvar::new(),
}
}
// Get a handle to a redo process from the pool.
fn get_process(&self, pg_version: u32) -> Result<PostgresRedoProcess, WalRedoError> {
let mut process_list = self.process_list.lock().unwrap();
loop {
// If there's a free process immediately available, take it.
if let Some(process) = process_list.free_processes.pop() {
return Ok(process);
}
// All processes are in use. If the pool is at its maximum size
// already, wait for a process to become free. Otherwise launch
// a new process.
if process_list.num_processes >= MAX_PROCESSES {
process_list = self.condvar.wait(process_list).unwrap();
continue;
} else {
let process = PostgresRedoProcess::launch(self.conf, self.tenant_id, pg_version)?;
process_list.num_processes += 1;
return Ok(process);
}
}
}
/// Launch process pre-emptively. Should not be needed except for benchmarking.
pub fn launch_process(&mut self, pg_version: u32) -> anyhow::Result<()> {
let inner = self.process.get_mut().unwrap();
if inner.is_none() {
let p = PostgresRedoProcess::launch(self.conf, self.tenant_id, pg_version)?;
*inner = Some(p);
}
// get_process launches a process, if no processes were running previously
let _ = self.get_process(pg_version)?;
Ok(())
}
@@ -236,15 +277,9 @@ impl PostgresRedoManager {
let start_time = Instant::now();
let mut process_guard = self.process.lock().unwrap();
let lock_time = Instant::now();
let mut process = self.get_process(pg_version)?;
// launch the WAL redo process on first use
if process_guard.is_none() {
let p = PostgresRedoProcess::launch(self.conf, self.tenant_id, pg_version)?;
*process_guard = Some(p);
}
let process = process_guard.as_mut().unwrap();
let lock_time = Instant::now();
WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64());
@@ -278,8 +313,9 @@ impl PostgresRedoManager {
lsn
);
// If something went wrong, don't try to reuse the process. Kill it, and
// next request will launch a new one.
// If something went wrong, don't try to reuse the
// process. Kill it, and next request will launch a new one.
// Otherwise return the process to the pool.
if result.is_err() {
error!(
"error applying {} WAL records ({} bytes) to reconstruct page image at LSN {}",
@@ -287,8 +323,14 @@ impl PostgresRedoManager {
nbytes,
lsn
);
let process = process_guard.take().unwrap();
process.kill();
let mut process_list = self.process_list.lock().unwrap();
process_list.num_processes -= 1;
self.condvar.notify_one();
} else {
let mut process_list = self.process_list.lock().unwrap();
process_list.free_processes.push(process);
self.condvar.notify_one();
}
result
}
@@ -608,11 +650,10 @@ impl PostgresRedoProcess {
tenant_id: TenantId,
pg_version: u32,
) -> Result<PostgresRedoProcess, Error> {
// FIXME: We need a dummy Postgres cluster to run the process in. Currently, we
// just create one with constant name. That fails if you try to launch more than
// one WAL redo manager concurrently.
// We need a dummy Postgres cluster to run the process in.
let processno = WAL_REDO_PROCESS_COUNTER.fetch_add(1, Ordering::Relaxed);
let datadir = path_with_suffix_extension(
conf.tenant_path(&tenant_id).join("wal-redo-datadir"),
conf.tenant_path(&tenant_id).join(format!("wal-redo-datadir-{}", processno)),
TEMP_FILE_SUFFIX,
);