Compare commits

...

5 Commits

Author SHA1 Message Date
Heikki Linnakangas
c2d5cee943 Avoid some vector-growing overhead.
I saw this in 'perf' profile of a sequential scan:

> -   31.93%     0.21%  compute request  pageserver         [.] <pageserver::walredo::PostgresRedoManager as pageserver::walredo::WalRedoManager>::request_redo
>    - 31.72% <pageserver::walredo::PostgresRedoManager as pageserver::walredo::WalRedoManager>::request_redo
>       - 31.26% pageserver::walredo::PostgresRedoManager::apply_batch_postgres
>          + 7.64% <std::process::ChildStdin as std::io::Write>::write
>          + 6.17% nix::poll::poll
>          + 3.58% <std::process::ChildStderr as std::io::Read>::read
>          + 2.96% std::sync::condvar::Condvar::notify_one
>          + 2.48% std::sys::unix::locks::futex::Condvar::wait
>          + 2.19% alloc::raw_vec::RawVec<T,A>::reserve::do_reserve_and_handle
>          + 1.14% std::sys::unix::locks::futex::Mutex::lock_contended
>            0.67% __rust_alloc_zeroed
>            0.62% __stpcpy_ssse3
>            0.56% std::sys::unix::locks::futex::Mutex::wake

Note the 'do_reserve_handle' overhead. That's caused by having to grow
the buffer used to construct the WAL redo request. This commit
eliminates that overhead. It's only about 2% of the overall CPU usage,
but every little helps.

Also reuse the temp buffer when reading records from a DeltaLayer. I
didn't actually see that show up in profiling, but seems like a
trivial change that can't hurt.
2022-11-08 17:55:43 +02:00
Heikki Linnakangas
db7ca18c43 Use a cached WaitEventSet instead of WaitLatchOrSocket.
When we repeatedly wait for the same events, it's faster to create the
event set once and reuse it. While testing with a sequential scan test
case, I saw WaitLatchOrSocket consuming a lot of CPU:

> -   40.52%     0.14%  postgres  postgres           [.] WaitLatchOrSocket
>    - 40.38% WaitLatchOrSocket
>       + 17.83% AddWaitEventToSet
>       + 9.47% close@plt
>       + 8.29% CreateWaitEventSet
>       + 4.57% WaitEventSetWait

This eliminates most of that overhead.
2022-11-08 15:53:12 +02:00
Heikki Linnakangas
a726b37bca Have a pool of WAL redo processes per tenant
To allow more concurrency, have a pool of WAL redo processes that can
grow up to 4 processes per tenant. There's no way to shrink the pool,
that's why I'm capping it at 4 processes, to keep the total number of
processes reasonable.
2022-11-08 15:00:31 +02:00
Heikki Linnakangas
5bd843551c WIP: Process received GetPage requests in parallel 2022-11-08 14:42:40 +02:00
Heikki Linnakangas
c5c4e47edd Fix neon.flush_output_after GUC.
The neon.flush_output_after was not effective, at least not in a sequential
scan, because neon_read_at_lsn() flushed the prefetch queue on every call
anyway. Change neon_read_at_lsn() so that it only flushes the queue if
the GetPage request that it needs to wait for hasn't alrady been flushed.
To make that possible, move the tracking of unflushed requests into a new
ring_flush variable, alongside the other ring buffer indexes.

While we're at it, mark neon.flush_output_after as PGC_USERSET, so that it
can be changed per-session with "SET neon.flush_output_after = ...". Makes
it easier to test different values.
2022-11-08 12:11:17 +02:00
7 changed files with 181 additions and 108 deletions

View File

@@ -13,6 +13,7 @@ use anyhow::{bail, ensure, Context, Result};
use bytes::Buf; use bytes::Buf;
use bytes::Bytes; use bytes::Bytes;
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
use futures::stream::FuturesOrdered;
use pageserver_api::models::{ use pageserver_api::models::{
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse, PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse, PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
@@ -39,10 +40,9 @@ use utils::{
}; };
use crate::basebackup; use crate::basebackup;
use crate::config::{PageServerConf, ProfilingConfig}; use crate::config::PageServerConf;
use crate::import_datadir::import_wal_from_tar; use crate::import_datadir::import_wal_from_tar;
use crate::metrics::{LIVE_CONNECTIONS_COUNT, SMGR_QUERY_TIME}; use crate::metrics::{LIVE_CONNECTIONS_COUNT, SMGR_QUERY_TIME};
use crate::profiling::profpoint_start;
use crate::task_mgr; use crate::task_mgr;
use crate::task_mgr::TaskKind; use crate::task_mgr::TaskKind;
use crate::tenant::Timeline; use crate::tenant::Timeline;
@@ -276,10 +276,10 @@ impl PageServerHandler {
pgb.write_message(&BeMessage::CopyBothResponse)?; pgb.write_message(&BeMessage::CopyBothResponse)?;
pgb.flush().await?; pgb.flush().await?;
let metrics = PageRequestMetrics::new(&tenant_id, &timeline_id); let mut inprogress_requests = FuturesOrdered::new();
loop { loop {
let msg = tokio::select! { tokio::select! {
biased; biased;
_ = task_mgr::shutdown_watcher() => { _ = task_mgr::shutdown_watcher() => {
@@ -288,51 +288,58 @@ impl PageServerHandler {
break; break;
} }
msg = pgb.read_message() => { msg } response = inprogress_requests.next(), if !inprogress_requests.is_empty() => {
}; let response: Bytes = response.unwrap()?;
pgb.write_message(&BeMessage::CopyData(&response))?;
let copy_data_bytes = match msg? { pgb.flush().await?;
Some(FeMessage::CopyData(bytes)) => bytes, continue;
Some(m) => {
bail!("unexpected message: {m:?} during COPY");
} }
None => break, // client disconnected
};
trace!("query: {copy_data_bytes:?}"); msg = pgb.read_message() => {
let copy_data_bytes = match msg? {
Some(FeMessage::CopyData(bytes)) => bytes,
Some(m) => {
bail!("unexpected message: {m:?} during COPY");
}
None => break, // client disconnected
};
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?; trace!("query: {copy_data_bytes:?}");
let response = match neon_fe_msg { let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
PagestreamFeMessage::Exists(req) => {
let _timer = metrics.get_rel_exists.start_timer(); let timeline = Arc::clone(&timeline);
self.handle_get_rel_exists_request(&timeline, &req).await let task = async move {
} let response = match neon_fe_msg {
PagestreamFeMessage::Nblocks(req) => { PagestreamFeMessage::Exists(req) => {
let _timer = metrics.get_rel_size.start_timer(); Self::handle_get_rel_exists_request(&timeline, &req).await
self.handle_get_nblocks_request(&timeline, &req).await }
} PagestreamFeMessage::Nblocks(req) => {
PagestreamFeMessage::GetPage(req) => { Self::handle_get_nblocks_request(&timeline, &req).await
let _timer = metrics.get_page_at_lsn.start_timer(); }
self.handle_get_page_at_lsn_request(&timeline, &req).await PagestreamFeMessage::GetPage(req) => {
} Self::handle_get_page_at_lsn_request(&timeline, &req).await
PagestreamFeMessage::DbSize(req) => { }
let _timer = metrics.get_db_size.start_timer(); PagestreamFeMessage::DbSize(req) => {
self.handle_db_size_request(&timeline, &req).await Self::handle_db_size_request(&timeline, &req).await
}
};
let response = response.unwrap_or_else(|e| {
// print the all details to the log with {:#}, but for the client the
// error message is enough
error!("error reading relation or page version: {:?}", e);
PagestreamBeMessage::Error(PagestreamErrorResponse {
message: e.to_string(),
})
});
let response: Bytes = response.serialize();
response
};
inprogress_requests.push_back(tokio::spawn(task));
continue;
} }
}; };
let response = response.unwrap_or_else(|e| {
// print the all details to the log with {:#}, but for the client the
// error message is enough
error!("error reading relation or page version: {:?}", e);
PagestreamBeMessage::Error(PagestreamErrorResponse {
message: e.to_string(),
})
});
pgb.write_message(&BeMessage::CopyData(&response.serialize()))?;
pgb.flush().await?;
} }
Ok(()) Ok(())
} }
@@ -504,9 +511,8 @@ impl PageServerHandler {
Ok(lsn) Ok(lsn)
} }
#[instrument(skip(self, timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))] #[instrument(skip(timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
async fn handle_get_rel_exists_request( async fn handle_get_rel_exists_request(
&self,
timeline: &Timeline, timeline: &Timeline,
req: &PagestreamExistsRequest, req: &PagestreamExistsRequest,
) -> Result<PagestreamBeMessage> { ) -> Result<PagestreamBeMessage> {
@@ -521,9 +527,8 @@ impl PageServerHandler {
})) }))
} }
#[instrument(skip(self, timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))] #[instrument(skip(timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
async fn handle_get_nblocks_request( async fn handle_get_nblocks_request(
&self,
timeline: &Timeline, timeline: &Timeline,
req: &PagestreamNblocksRequest, req: &PagestreamNblocksRequest,
) -> Result<PagestreamBeMessage> { ) -> Result<PagestreamBeMessage> {
@@ -538,9 +543,8 @@ impl PageServerHandler {
})) }))
} }
#[instrument(skip(self, timeline, req), fields(dbnode = %req.dbnode, req_lsn = %req.lsn))] #[instrument(skip(timeline, req), fields(dbnode = %req.dbnode, req_lsn = %req.lsn))]
async fn handle_db_size_request( async fn handle_db_size_request(
&self,
timeline: &Timeline, timeline: &Timeline,
req: &PagestreamDbSizeRequest, req: &PagestreamDbSizeRequest,
) -> Result<PagestreamBeMessage> { ) -> Result<PagestreamBeMessage> {
@@ -558,9 +562,8 @@ impl PageServerHandler {
})) }))
} }
#[instrument(skip(self, timeline, req), fields(rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn))] #[instrument(skip(timeline, req), fields(rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn))]
async fn handle_get_page_at_lsn_request( async fn handle_get_page_at_lsn_request(
&self,
timeline: &Timeline, timeline: &Timeline,
req: &PagestreamGetPageRequest, req: &PagestreamGetPageRequest,
) -> Result<PagestreamBeMessage> { ) -> Result<PagestreamBeMessage> {
@@ -579,7 +582,7 @@ impl PageServerHandler {
// FIXME: this profiling now happens at different place than it used to. The // FIXME: this profiling now happens at different place than it used to. The
// current profiling is based on a thread-local variable, so it doesn't work // current profiling is based on a thread-local variable, so it doesn't work
// across awaits // across awaits
let _profiling_guard = profpoint_start(self.conf, ProfilingConfig::PageRequests); //let _profiling_guard = profpoint_start(self.conf, ProfilingConfig::PageRequests);
let page = timeline.get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest)?; let page = timeline.get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest)?;
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse { Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
@@ -587,9 +590,8 @@ impl PageServerHandler {
})) }))
} }
#[instrument(skip(self, pgb))] #[instrument(skip(pgb))]
async fn handle_basebackup_request( async fn handle_basebackup_request(
&self,
pgb: &mut PostgresBackend, pgb: &mut PostgresBackend,
tenant_id: TenantId, tenant_id: TenantId,
timeline_id: TimelineId, timeline_id: TimelineId,
@@ -722,7 +724,7 @@ impl postgres_backend_async::Handler for PageServerHandler {
}; };
// Check that the timeline exists // Check that the timeline exists
self.handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, None, false) Self::handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, None, false)
.await?; .await?;
pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?; pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
} }
@@ -782,7 +784,7 @@ impl postgres_backend_async::Handler for PageServerHandler {
self.check_permission(Some(tenant_id))?; self.check_permission(Some(tenant_id))?;
// Check that the timeline exists // Check that the timeline exists
self.handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, prev_lsn, true) Self::handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, prev_lsn, true)
.await?; .await?;
pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?; pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("import basebackup ") { } else if query_string.starts_with("import basebackup ") {

View File

@@ -74,6 +74,7 @@ where
}; };
dstbuf.clear(); dstbuf.clear();
dstbuf.reserve(len);
// Read the payload // Read the payload
let mut remain = len; let mut remain = len;

View File

@@ -260,8 +260,9 @@ impl Layer for DeltaLayer {
// Ok, 'offsets' now contains the offsets of all the entries we need to read // Ok, 'offsets' now contains the offsets of all the entries we need to read
let mut cursor = file.block_cursor(); let mut cursor = file.block_cursor();
let mut buf = Vec::new();
for (entry_lsn, pos) in offsets { for (entry_lsn, pos) in offsets {
let buf = cursor.read_blob(pos).with_context(|| { cursor.read_blob_into_buf(pos, &mut buf).with_context(|| {
format!( format!(
"Failed to read blob from virtual file {}", "Failed to read blob from virtual file {}",
file.file.path.display() file.file.path.display()

View File

@@ -21,6 +21,7 @@
use byteorder::{ByteOrder, LittleEndian}; use byteorder::{ByteOrder, LittleEndian};
use bytes::{BufMut, Bytes, BytesMut}; use bytes::{BufMut, Bytes, BytesMut};
use nix::poll::*; use nix::poll::*;
use once_cell::sync::Lazy;
use serde::Serialize; use serde::Serialize;
use std::fs; use std::fs;
use std::fs::OpenOptions; use std::fs::OpenOptions;
@@ -31,7 +32,8 @@ use std::os::unix::prelude::CommandExt;
use std::path::PathBuf; use std::path::PathBuf;
use std::process::Stdio; use std::process::Stdio;
use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command}; use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command};
use std::sync::Mutex; use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Condvar, Mutex};
use std::time::Duration; use std::time::Duration;
use std::time::Instant; use std::time::Instant;
use tracing::*; use tracing::*;
@@ -55,6 +57,9 @@ use postgres_ffi::v14::nonrelfile_utils::{
}; };
use postgres_ffi::BLCKSZ; use postgres_ffi::BLCKSZ;
/// Maximum number of WAL redo processes to launch for a single tenant.
const MAX_PROCESSES: usize = 4;
/// ///
/// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster. /// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster.
/// ///
@@ -88,18 +93,32 @@ pub trait WalRedoManager: Send + Sync {
) -> Result<Bytes, WalRedoError>; ) -> Result<Bytes, WalRedoError>;
} }
static WAL_REDO_PROCESS_COUNTER: Lazy<AtomicU64> = Lazy::new(|| { AtomicU64::new(0) });
/// ///
/// This is the real implementation that uses a Postgres process to /// This is the real implementation that uses a special Postgres
/// perform WAL replay. Only one thread can use the process at a time, /// process to perform WAL replay. There is a pool of these processes.
/// that is controlled by the Mutex. In the future, we might want to
/// launch a pool of processes to allow concurrent replay of multiple
/// records.
/// ///
pub struct PostgresRedoManager { pub struct PostgresRedoManager {
tenant_id: TenantId, tenant_id: TenantId,
conf: &'static PageServerConf, conf: &'static PageServerConf,
process: Mutex<Option<PostgresRedoProcess>>, /// Pool of processes.
process_list: Mutex<ProcessList>,
/// Condition variable that can be used to sleep until a process
/// becomes available in the pool.
condvar: Condvar,
}
// A pool of WAL redo processes
#[derive(Default)]
struct ProcessList {
/// processes that are available for reuse
free_processes: Vec<PostgresRedoProcess>,
/// Total number of processes, including all the processes in
/// 'free_processes' list, and any processes that are in use.
num_processes: usize,
} }
/// Can this request be served by neon redo functions /// Can this request be served by neon redo functions
@@ -204,7 +223,32 @@ impl PostgresRedoManager {
PostgresRedoManager { PostgresRedoManager {
tenant_id, tenant_id,
conf, conf,
process: Mutex::new(None), process_list: Mutex::new(ProcessList::default()),
condvar: Condvar::new(),
}
}
// Get a handle to a redo process from the pool.
fn get_process(&self, pg_version: u32) -> Result<PostgresRedoProcess, WalRedoError> {
let mut process_list = self.process_list.lock().unwrap();
loop {
// If there's a free process immediately available, take it.
if let Some(process) = process_list.free_processes.pop() {
return Ok(process);
}
// All processes are in use. If the pool is at its maximum size
// already, wait for a process to become free. Otherwise launch
// a new process.
if process_list.num_processes >= MAX_PROCESSES {
process_list = self.condvar.wait(process_list).unwrap();
continue;
} else {
let process = PostgresRedoProcess::launch(self.conf, &self.tenant_id, pg_version)?;
process_list.num_processes += 1;
return Ok(process);
}
} }
} }
@@ -224,15 +268,9 @@ impl PostgresRedoManager {
let start_time = Instant::now(); let start_time = Instant::now();
let mut process_guard = self.process.lock().unwrap(); let mut process = self.get_process(pg_version)?;
let lock_time = Instant::now();
// launch the WAL redo process on first use let lock_time = Instant::now();
if process_guard.is_none() {
let p = PostgresRedoProcess::launch(self.conf, &self.tenant_id, pg_version)?;
*process_guard = Some(p);
}
let process = process_guard.as_mut().unwrap();
WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64()); WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64());
@@ -266,8 +304,9 @@ impl PostgresRedoManager {
lsn lsn
); );
// If something went wrong, don't try to reuse the process. Kill it, and // If something went wrong, don't try to reuse the
// next request will launch a new one. // process. Kill it, and next request will launch a new one.
// Otherwise return the process to the pool.
if result.is_err() { if result.is_err() {
error!( error!(
"error applying {} WAL records ({} bytes) to reconstruct page image at LSN {}", "error applying {} WAL records ({} bytes) to reconstruct page image at LSN {}",
@@ -275,8 +314,14 @@ impl PostgresRedoManager {
nbytes, nbytes,
lsn lsn
); );
let process = process_guard.take().unwrap();
process.kill(); process.kill();
let mut process_list = self.process_list.lock().unwrap();
process_list.num_processes -= 1;
self.condvar.notify_one();
} else {
let mut process_list = self.process_list.lock().unwrap();
process_list.free_processes.push(process);
self.condvar.notify_one();
} }
result result
} }
@@ -594,11 +639,10 @@ impl PostgresRedoProcess {
tenant_id: &TenantId, tenant_id: &TenantId,
pg_version: u32, pg_version: u32,
) -> Result<PostgresRedoProcess, Error> { ) -> Result<PostgresRedoProcess, Error> {
// FIXME: We need a dummy Postgres cluster to run the process in. Currently, we // We need a dummy Postgres cluster to run the process in.
// just create one with constant name. That fails if you try to launch more than let processno = WAL_REDO_PROCESS_COUNTER.fetch_add(1, Ordering::Relaxed);
// one WAL redo manager concurrently.
let datadir = path_with_suffix_extension( let datadir = path_with_suffix_extension(
conf.tenant_path(tenant_id).join("wal-redo-datadir"), conf.tenant_path(tenant_id).join(format!("wal-redo-datadir-{}", processno)),
TEMP_FILE_SUFFIX, TEMP_FILE_SUFFIX,
); );
@@ -725,7 +769,11 @@ impl PostgresRedoProcess {
// This could be problematic if there are millions of records to replay, // This could be problematic if there are millions of records to replay,
// but in practice the number of records is usually so small that it doesn't // but in practice the number of records is usually so small that it doesn't
// matter, and it's better to keep this code simple. // matter, and it's better to keep this code simple.
let mut writebuf: Vec<u8> = Vec::new(); //
// Most requests start with a before-image with BLCKSZ bytes, followed by
// by some other WAL records. Start with a buffer that can hold that
// comfortably.
let mut writebuf: Vec<u8> = Vec::with_capacity((BLCKSZ as usize) * 3);
build_begin_redo_for_block_msg(tag, &mut writebuf); build_begin_redo_for_block_msg(tag, &mut writebuf);
if let Some(img) = base_img { if let Some(img) = base_img {
build_push_page_msg(tag, &img, &mut writebuf); build_push_page_msg(tag, &img, &mut writebuf);

View File

@@ -40,9 +40,16 @@
bool connected = false; bool connected = false;
PGconn *pageserver_conn = NULL; PGconn *pageserver_conn = NULL;
/*
* WaitEventSet containing:
* - WL_SOCKET_READABLE on pageserver_conn,
* - WL_LATCH_SET on MyLatch, and
* - WL_EXIT_ON_PM_DEATH.
*/
WaitEventSet *pageserver_conn_wes = NULL;
char *page_server_connstring_raw; char *page_server_connstring_raw;
int n_unflushed_requests = 0;
int flush_every_n_requests = 8; int flush_every_n_requests = 8;
static void pageserver_flush(void); static void pageserver_flush(void);
@@ -63,6 +70,7 @@ pageserver_connect()
PQfinish(pageserver_conn); PQfinish(pageserver_conn);
pageserver_conn = NULL; pageserver_conn = NULL;
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
errmsg(NEON_TAG "could not establish connection to pageserver"), errmsg(NEON_TAG "could not establish connection to pageserver"),
@@ -78,22 +86,26 @@ pageserver_connect()
neon_log(ERROR, "could not send pagestream command to pageserver"); neon_log(ERROR, "could not send pagestream command to pageserver");
} }
pageserver_conn_wes = CreateWaitEventSet(TopMemoryContext, 3);
AddWaitEventToSet(pageserver_conn_wes, WL_LATCH_SET, PGINVALID_SOCKET,
MyLatch, NULL);
AddWaitEventToSet(pageserver_conn_wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
NULL, NULL);
AddWaitEventToSet(pageserver_conn_wes, WL_SOCKET_READABLE, PQsocket(pageserver_conn), NULL, NULL);
while (PQisBusy(pageserver_conn)) while (PQisBusy(pageserver_conn))
{ {
int wc; int wc;
WaitEvent event;
/* Sleep until there's something to do */ /* Sleep until there's something to do */
wc = WaitLatchOrSocket(MyLatch, wc = WaitEventSetWait(pageserver_conn_wes, -1L, &event, 1, PG_WAIT_EXTENSION);
WL_LATCH_SET | WL_SOCKET_READABLE |
WL_EXIT_ON_PM_DEATH,
PQsocket(pageserver_conn),
-1L, PG_WAIT_EXTENSION);
ResetLatch(MyLatch); ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
/* Data available in socket? */ /* Data available in socket? */
if (wc & WL_SOCKET_READABLE) if (event.events & WL_SOCKET_READABLE)
{ {
if (!PQconsumeInput(pageserver_conn)) if (!PQconsumeInput(pageserver_conn))
{ {
@@ -101,6 +113,7 @@ pageserver_connect()
PQfinish(pageserver_conn); PQfinish(pageserver_conn);
pageserver_conn = NULL; pageserver_conn = NULL;
FreeWaitEventSet(pageserver_conn_wes);
neon_log(ERROR, "could not complete handshake with pageserver: %s", neon_log(ERROR, "could not complete handshake with pageserver: %s",
msg); msg);
@@ -117,33 +130,30 @@ pageserver_connect()
* A wrapper around PQgetCopyData that checks for interrupts while sleeping. * A wrapper around PQgetCopyData that checks for interrupts while sleeping.
*/ */
static int static int
call_PQgetCopyData(PGconn *conn, char **buffer) call_PQgetCopyData(char **buffer)
{ {
int ret; int ret;
retry: retry:
ret = PQgetCopyData(conn, buffer, 1 /* async */ ); ret = PQgetCopyData(pageserver_conn, buffer, 1 /* async */ );
if (ret == 0) if (ret == 0)
{ {
int wc; int wc;
WaitEvent event;
/* Sleep until there's something to do */ /* Sleep until there's something to do */
wc = WaitLatchOrSocket(MyLatch, wc = WaitEventSetWait(pageserver_conn_wes, -1L, &event, 1, PG_WAIT_EXTENSION);
WL_LATCH_SET | WL_SOCKET_READABLE |
WL_EXIT_ON_PM_DEATH,
PQsocket(conn),
-1L, PG_WAIT_EXTENSION);
ResetLatch(MyLatch); ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
/* Data available in socket? */ /* Data available in socket? */
if (wc & WL_SOCKET_READABLE) if (event.events & WL_SOCKET_READABLE)
{ {
if (!PQconsumeInput(conn)) if (!PQconsumeInput(pageserver_conn))
neon_log(ERROR, "could not get response from pageserver: %s", neon_log(ERROR, "could not get response from pageserver: %s",
PQerrorMessage(conn)); PQerrorMessage(pageserver_conn));
} }
goto retry; goto retry;
@@ -172,6 +182,8 @@ pageserver_disconnect(void)
prefetch_on_ps_disconnect(); prefetch_on_ps_disconnect();
} }
if (pageserver_conn_wes != NULL)
FreeWaitEventSet(pageserver_conn_wes);
} }
static void static void
@@ -205,11 +217,6 @@ pageserver_send(NeonRequest * request)
} }
pfree(req_buff.data); pfree(req_buff.data);
n_unflushed_requests++;
if (flush_every_n_requests > 0 && n_unflushed_requests >= flush_every_n_requests)
pageserver_flush();
if (message_level_is_interesting(PageStoreTrace)) if (message_level_is_interesting(PageStoreTrace))
{ {
char *msg = nm_to_string((NeonMessage *) request); char *msg = nm_to_string((NeonMessage *) request);
@@ -228,7 +235,7 @@ pageserver_receive(void)
PG_TRY(); PG_TRY();
{ {
/* read response */ /* read response */
resp_buff.len = call_PQgetCopyData(pageserver_conn, &resp_buff.data); resp_buff.len = call_PQgetCopyData(&resp_buff.data);
resp_buff.cursor = 0; resp_buff.cursor = 0;
if (resp_buff.len < 0) if (resp_buff.len < 0)
@@ -274,7 +281,6 @@ pageserver_flush(void)
pageserver_disconnect(); pageserver_disconnect();
neon_log(ERROR, "failed to flush page requests: %s", msg); neon_log(ERROR, "failed to flush page requests: %s", msg);
} }
n_unflushed_requests = 0;
} }
page_server_api api = { page_server_api api = {
@@ -436,7 +442,7 @@ pg_init_libpagestore(void)
NULL, NULL,
&flush_every_n_requests, &flush_every_n_requests,
8, -1, INT_MAX, 8, -1, INT_MAX,
PGC_SIGHUP, PGC_USERSET,
0, /* no flags required */ 0, /* no flags required */
NULL, NULL, NULL); NULL, NULL, NULL);

View File

@@ -150,6 +150,7 @@ extern void prefetch_on_ps_disconnect(void);
extern page_server_api * page_server; extern page_server_api * page_server;
extern char *page_server_connstring; extern char *page_server_connstring;
extern int flush_every_n_requests;
extern bool seqscan_prefetch_enabled; extern bool seqscan_prefetch_enabled;
extern int seqscan_prefetch_distance; extern int seqscan_prefetch_distance;
extern char *neon_timeline; extern char *neon_timeline;

View File

@@ -192,7 +192,7 @@ typedef struct PrfHashEntry {
* It maintains a (ring) buffer of in-flight requests and responses. * It maintains a (ring) buffer of in-flight requests and responses.
* *
* We maintain several indexes into the ring buffer: * We maintain several indexes into the ring buffer:
* ring_unused >= ring_receive >= ring_last >= 0 * ring_unused >= ring_flush >= ring_receive >= ring_last >= 0
* *
* ring_unused points to the first unused slot of the buffer * ring_unused points to the first unused slot of the buffer
* ring_receive is the next request that is to be received * ring_receive is the next request that is to be received
@@ -208,6 +208,7 @@ typedef struct PrefetchState {
/* buffer indexes */ /* buffer indexes */
uint64 ring_unused; /* first unused slot */ uint64 ring_unused; /* first unused slot */
uint64 ring_flush; /* next request to flush */
uint64 ring_receive; /* next slot that is to receive a response */ uint64 ring_receive; /* next slot that is to receive a response */
uint64 ring_last; /* min slot with a response value */ uint64 ring_last; /* min slot with a response value */
@@ -577,6 +578,13 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls
prefetch_do_request(slot, force_latest, force_lsn); prefetch_do_request(slot, force_latest, force_lsn);
Assert(slot->status == PRFS_REQUESTED); Assert(slot->status == PRFS_REQUESTED);
Assert(ring_index < MyPState->ring_unused); Assert(ring_index < MyPState->ring_unused);
if (flush_every_n_requests > 0 && MyPState->ring_unused - MyPState->ring_flush >= flush_every_n_requests)
{
page_server->flush();
MyPState->ring_flush = MyPState->ring_unused;
}
return ring_index; return ring_index;
} }
@@ -585,6 +593,7 @@ page_server_request(void const *req)
{ {
page_server->send((NeonRequest *) req); page_server->send((NeonRequest *) req);
page_server->flush(); page_server->flush();
MyPState->ring_flush = MyPState->ring_unused;
consume_prefetch_responses(); consume_prefetch_responses();
return page_server->receive(); return page_server->receive();
} }
@@ -1581,6 +1590,7 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
if (entry->slot->status == PRFS_REQUESTED) if (entry->slot->status == PRFS_REQUESTED)
{ {
page_server->flush(); page_server->flush();
MyPState->ring_flush = MyPState->ring_unused;
prefetch_wait_for(entry->slot->my_ring_index); prefetch_wait_for(entry->slot->my_ring_index);
} }
/* drop caches */ /* drop caches */
@@ -1606,7 +1616,11 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
Assert(slot->status != PRFS_UNUSED); Assert(slot->status != PRFS_UNUSED);
Assert(&MyPState->prf_buffer[(ring_index % READ_BUFFER_SIZE)] == slot); Assert(&MyPState->prf_buffer[(ring_index % READ_BUFFER_SIZE)] == slot);
page_server->flush(); if (ring_index >= MyPState->ring_flush)
{
page_server->flush();
MyPState->ring_flush = MyPState->ring_unused;
}
prefetch_wait_for(ring_index); prefetch_wait_for(ring_index);
Assert(slot->status == PRFS_RECEIVED); Assert(slot->status == PRFS_RECEIVED);