Compare commits

...

5 Commits

Author SHA1 Message Date
Heikki Linnakangas
c2d5cee943 Avoid some vector-growing overhead.
I saw this in 'perf' profile of a sequential scan:

> -   31.93%     0.21%  compute request  pageserver         [.] <pageserver::walredo::PostgresRedoManager as pageserver::walredo::WalRedoManager>::request_redo
>    - 31.72% <pageserver::walredo::PostgresRedoManager as pageserver::walredo::WalRedoManager>::request_redo
>       - 31.26% pageserver::walredo::PostgresRedoManager::apply_batch_postgres
>          + 7.64% <std::process::ChildStdin as std::io::Write>::write
>          + 6.17% nix::poll::poll
>          + 3.58% <std::process::ChildStderr as std::io::Read>::read
>          + 2.96% std::sync::condvar::Condvar::notify_one
>          + 2.48% std::sys::unix::locks::futex::Condvar::wait
>          + 2.19% alloc::raw_vec::RawVec<T,A>::reserve::do_reserve_and_handle
>          + 1.14% std::sys::unix::locks::futex::Mutex::lock_contended
>            0.67% __rust_alloc_zeroed
>            0.62% __stpcpy_ssse3
>            0.56% std::sys::unix::locks::futex::Mutex::wake

Note the 'do_reserve_handle' overhead. That's caused by having to grow
the buffer used to construct the WAL redo request. This commit
eliminates that overhead. It's only about 2% of the overall CPU usage,
but every little helps.

Also reuse the temp buffer when reading records from a DeltaLayer. I
didn't actually see that show up in profiling, but seems like a
trivial change that can't hurt.
2022-11-08 17:55:43 +02:00
Heikki Linnakangas
db7ca18c43 Use a cached WaitEventSet instead of WaitLatchOrSocket.
When we repeatedly wait for the same events, it's faster to create the
event set once and reuse it. While testing with a sequential scan test
case, I saw WaitLatchOrSocket consuming a lot of CPU:

> -   40.52%     0.14%  postgres  postgres           [.] WaitLatchOrSocket
>    - 40.38% WaitLatchOrSocket
>       + 17.83% AddWaitEventToSet
>       + 9.47% close@plt
>       + 8.29% CreateWaitEventSet
>       + 4.57% WaitEventSetWait

This eliminates most of that overhead.
2022-11-08 15:53:12 +02:00
Heikki Linnakangas
a726b37bca Have a pool of WAL redo processes per tenant
To allow more concurrency, have a pool of WAL redo processes that can
grow up to 4 processes per tenant. There's no way to shrink the pool,
that's why I'm capping it at 4 processes, to keep the total number of
processes reasonable.
2022-11-08 15:00:31 +02:00
Heikki Linnakangas
5bd843551c WIP: Process received GetPage requests in parallel 2022-11-08 14:42:40 +02:00
Heikki Linnakangas
c5c4e47edd Fix neon.flush_output_after GUC.
The neon.flush_output_after was not effective, at least not in a sequential
scan, because neon_read_at_lsn() flushed the prefetch queue on every call
anyway. Change neon_read_at_lsn() so that it only flushes the queue if
the GetPage request that it needs to wait for hasn't alrady been flushed.
To make that possible, move the tracking of unflushed requests into a new
ring_flush variable, alongside the other ring buffer indexes.

While we're at it, mark neon.flush_output_after as PGC_USERSET, so that it
can be changed per-session with "SET neon.flush_output_after = ...". Makes
it easier to test different values.
2022-11-08 12:11:17 +02:00
7 changed files with 181 additions and 108 deletions

View File

@@ -13,6 +13,7 @@ use anyhow::{bail, ensure, Context, Result};
use bytes::Buf;
use bytes::Bytes;
use futures::{Stream, StreamExt};
use futures::stream::FuturesOrdered;
use pageserver_api::models::{
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
@@ -39,10 +40,9 @@ use utils::{
};
use crate::basebackup;
use crate::config::{PageServerConf, ProfilingConfig};
use crate::config::PageServerConf;
use crate::import_datadir::import_wal_from_tar;
use crate::metrics::{LIVE_CONNECTIONS_COUNT, SMGR_QUERY_TIME};
use crate::profiling::profpoint_start;
use crate::task_mgr;
use crate::task_mgr::TaskKind;
use crate::tenant::Timeline;
@@ -276,10 +276,10 @@ impl PageServerHandler {
pgb.write_message(&BeMessage::CopyBothResponse)?;
pgb.flush().await?;
let metrics = PageRequestMetrics::new(&tenant_id, &timeline_id);
let mut inprogress_requests = FuturesOrdered::new();
loop {
let msg = tokio::select! {
tokio::select! {
biased;
_ = task_mgr::shutdown_watcher() => {
@@ -288,51 +288,58 @@ impl PageServerHandler {
break;
}
msg = pgb.read_message() => { msg }
};
let copy_data_bytes = match msg? {
Some(FeMessage::CopyData(bytes)) => bytes,
Some(m) => {
bail!("unexpected message: {m:?} during COPY");
response = inprogress_requests.next(), if !inprogress_requests.is_empty() => {
let response: Bytes = response.unwrap()?;
pgb.write_message(&BeMessage::CopyData(&response))?;
pgb.flush().await?;
continue;
}
None => break, // client disconnected
};
trace!("query: {copy_data_bytes:?}");
msg = pgb.read_message() => {
let copy_data_bytes = match msg? {
Some(FeMessage::CopyData(bytes)) => bytes,
Some(m) => {
bail!("unexpected message: {m:?} during COPY");
}
None => break, // client disconnected
};
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
trace!("query: {copy_data_bytes:?}");
let response = match neon_fe_msg {
PagestreamFeMessage::Exists(req) => {
let _timer = metrics.get_rel_exists.start_timer();
self.handle_get_rel_exists_request(&timeline, &req).await
}
PagestreamFeMessage::Nblocks(req) => {
let _timer = metrics.get_rel_size.start_timer();
self.handle_get_nblocks_request(&timeline, &req).await
}
PagestreamFeMessage::GetPage(req) => {
let _timer = metrics.get_page_at_lsn.start_timer();
self.handle_get_page_at_lsn_request(&timeline, &req).await
}
PagestreamFeMessage::DbSize(req) => {
let _timer = metrics.get_db_size.start_timer();
self.handle_db_size_request(&timeline, &req).await
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
let timeline = Arc::clone(&timeline);
let task = async move {
let response = match neon_fe_msg {
PagestreamFeMessage::Exists(req) => {
Self::handle_get_rel_exists_request(&timeline, &req).await
}
PagestreamFeMessage::Nblocks(req) => {
Self::handle_get_nblocks_request(&timeline, &req).await
}
PagestreamFeMessage::GetPage(req) => {
Self::handle_get_page_at_lsn_request(&timeline, &req).await
}
PagestreamFeMessage::DbSize(req) => {
Self::handle_db_size_request(&timeline, &req).await
}
};
let response = response.unwrap_or_else(|e| {
// print the all details to the log with {:#}, but for the client the
// error message is enough
error!("error reading relation or page version: {:?}", e);
PagestreamBeMessage::Error(PagestreamErrorResponse {
message: e.to_string(),
})
});
let response: Bytes = response.serialize();
response
};
inprogress_requests.push_back(tokio::spawn(task));
continue;
}
};
let response = response.unwrap_or_else(|e| {
// print the all details to the log with {:#}, but for the client the
// error message is enough
error!("error reading relation or page version: {:?}", e);
PagestreamBeMessage::Error(PagestreamErrorResponse {
message: e.to_string(),
})
});
pgb.write_message(&BeMessage::CopyData(&response.serialize()))?;
pgb.flush().await?;
}
Ok(())
}
@@ -504,9 +511,8 @@ impl PageServerHandler {
Ok(lsn)
}
#[instrument(skip(self, timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
#[instrument(skip(timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
async fn handle_get_rel_exists_request(
&self,
timeline: &Timeline,
req: &PagestreamExistsRequest,
) -> Result<PagestreamBeMessage> {
@@ -521,9 +527,8 @@ impl PageServerHandler {
}))
}
#[instrument(skip(self, timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
#[instrument(skip(timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
async fn handle_get_nblocks_request(
&self,
timeline: &Timeline,
req: &PagestreamNblocksRequest,
) -> Result<PagestreamBeMessage> {
@@ -538,9 +543,8 @@ impl PageServerHandler {
}))
}
#[instrument(skip(self, timeline, req), fields(dbnode = %req.dbnode, req_lsn = %req.lsn))]
#[instrument(skip(timeline, req), fields(dbnode = %req.dbnode, req_lsn = %req.lsn))]
async fn handle_db_size_request(
&self,
timeline: &Timeline,
req: &PagestreamDbSizeRequest,
) -> Result<PagestreamBeMessage> {
@@ -558,9 +562,8 @@ impl PageServerHandler {
}))
}
#[instrument(skip(self, timeline, req), fields(rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn))]
#[instrument(skip(timeline, req), fields(rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn))]
async fn handle_get_page_at_lsn_request(
&self,
timeline: &Timeline,
req: &PagestreamGetPageRequest,
) -> Result<PagestreamBeMessage> {
@@ -579,7 +582,7 @@ impl PageServerHandler {
// FIXME: this profiling now happens at different place than it used to. The
// current profiling is based on a thread-local variable, so it doesn't work
// across awaits
let _profiling_guard = profpoint_start(self.conf, ProfilingConfig::PageRequests);
//let _profiling_guard = profpoint_start(self.conf, ProfilingConfig::PageRequests);
let page = timeline.get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest)?;
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
@@ -587,9 +590,8 @@ impl PageServerHandler {
}))
}
#[instrument(skip(self, pgb))]
#[instrument(skip(pgb))]
async fn handle_basebackup_request(
&self,
pgb: &mut PostgresBackend,
tenant_id: TenantId,
timeline_id: TimelineId,
@@ -722,7 +724,7 @@ impl postgres_backend_async::Handler for PageServerHandler {
};
// Check that the timeline exists
self.handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, None, false)
Self::handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, None, false)
.await?;
pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
}
@@ -782,7 +784,7 @@ impl postgres_backend_async::Handler for PageServerHandler {
self.check_permission(Some(tenant_id))?;
// Check that the timeline exists
self.handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, prev_lsn, true)
Self::handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, prev_lsn, true)
.await?;
pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("import basebackup ") {

View File

@@ -74,6 +74,7 @@ where
};
dstbuf.clear();
dstbuf.reserve(len);
// Read the payload
let mut remain = len;

View File

@@ -260,8 +260,9 @@ impl Layer for DeltaLayer {
// Ok, 'offsets' now contains the offsets of all the entries we need to read
let mut cursor = file.block_cursor();
let mut buf = Vec::new();
for (entry_lsn, pos) in offsets {
let buf = cursor.read_blob(pos).with_context(|| {
cursor.read_blob_into_buf(pos, &mut buf).with_context(|| {
format!(
"Failed to read blob from virtual file {}",
file.file.path.display()

View File

@@ -21,6 +21,7 @@
use byteorder::{ByteOrder, LittleEndian};
use bytes::{BufMut, Bytes, BytesMut};
use nix::poll::*;
use once_cell::sync::Lazy;
use serde::Serialize;
use std::fs;
use std::fs::OpenOptions;
@@ -31,7 +32,8 @@ use std::os::unix::prelude::CommandExt;
use std::path::PathBuf;
use std::process::Stdio;
use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command};
use std::sync::Mutex;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Condvar, Mutex};
use std::time::Duration;
use std::time::Instant;
use tracing::*;
@@ -55,6 +57,9 @@ use postgres_ffi::v14::nonrelfile_utils::{
};
use postgres_ffi::BLCKSZ;
/// Maximum number of WAL redo processes to launch for a single tenant.
const MAX_PROCESSES: usize = 4;
///
/// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster.
///
@@ -88,18 +93,32 @@ pub trait WalRedoManager: Send + Sync {
) -> Result<Bytes, WalRedoError>;
}
static WAL_REDO_PROCESS_COUNTER: Lazy<AtomicU64> = Lazy::new(|| { AtomicU64::new(0) });
///
/// This is the real implementation that uses a Postgres process to
/// perform WAL replay. Only one thread can use the process at a time,
/// that is controlled by the Mutex. In the future, we might want to
/// launch a pool of processes to allow concurrent replay of multiple
/// records.
/// This is the real implementation that uses a special Postgres
/// process to perform WAL replay. There is a pool of these processes.
///
pub struct PostgresRedoManager {
tenant_id: TenantId,
conf: &'static PageServerConf,
process: Mutex<Option<PostgresRedoProcess>>,
/// Pool of processes.
process_list: Mutex<ProcessList>,
/// Condition variable that can be used to sleep until a process
/// becomes available in the pool.
condvar: Condvar,
}
// A pool of WAL redo processes
#[derive(Default)]
struct ProcessList {
/// processes that are available for reuse
free_processes: Vec<PostgresRedoProcess>,
/// Total number of processes, including all the processes in
/// 'free_processes' list, and any processes that are in use.
num_processes: usize,
}
/// Can this request be served by neon redo functions
@@ -204,7 +223,32 @@ impl PostgresRedoManager {
PostgresRedoManager {
tenant_id,
conf,
process: Mutex::new(None),
process_list: Mutex::new(ProcessList::default()),
condvar: Condvar::new(),
}
}
// Get a handle to a redo process from the pool.
fn get_process(&self, pg_version: u32) -> Result<PostgresRedoProcess, WalRedoError> {
let mut process_list = self.process_list.lock().unwrap();
loop {
// If there's a free process immediately available, take it.
if let Some(process) = process_list.free_processes.pop() {
return Ok(process);
}
// All processes are in use. If the pool is at its maximum size
// already, wait for a process to become free. Otherwise launch
// a new process.
if process_list.num_processes >= MAX_PROCESSES {
process_list = self.condvar.wait(process_list).unwrap();
continue;
} else {
let process = PostgresRedoProcess::launch(self.conf, &self.tenant_id, pg_version)?;
process_list.num_processes += 1;
return Ok(process);
}
}
}
@@ -224,15 +268,9 @@ impl PostgresRedoManager {
let start_time = Instant::now();
let mut process_guard = self.process.lock().unwrap();
let lock_time = Instant::now();
let mut process = self.get_process(pg_version)?;
// launch the WAL redo process on first use
if process_guard.is_none() {
let p = PostgresRedoProcess::launch(self.conf, &self.tenant_id, pg_version)?;
*process_guard = Some(p);
}
let process = process_guard.as_mut().unwrap();
let lock_time = Instant::now();
WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64());
@@ -266,8 +304,9 @@ impl PostgresRedoManager {
lsn
);
// If something went wrong, don't try to reuse the process. Kill it, and
// next request will launch a new one.
// If something went wrong, don't try to reuse the
// process. Kill it, and next request will launch a new one.
// Otherwise return the process to the pool.
if result.is_err() {
error!(
"error applying {} WAL records ({} bytes) to reconstruct page image at LSN {}",
@@ -275,8 +314,14 @@ impl PostgresRedoManager {
nbytes,
lsn
);
let process = process_guard.take().unwrap();
process.kill();
let mut process_list = self.process_list.lock().unwrap();
process_list.num_processes -= 1;
self.condvar.notify_one();
} else {
let mut process_list = self.process_list.lock().unwrap();
process_list.free_processes.push(process);
self.condvar.notify_one();
}
result
}
@@ -594,11 +639,10 @@ impl PostgresRedoProcess {
tenant_id: &TenantId,
pg_version: u32,
) -> Result<PostgresRedoProcess, Error> {
// FIXME: We need a dummy Postgres cluster to run the process in. Currently, we
// just create one with constant name. That fails if you try to launch more than
// one WAL redo manager concurrently.
// We need a dummy Postgres cluster to run the process in.
let processno = WAL_REDO_PROCESS_COUNTER.fetch_add(1, Ordering::Relaxed);
let datadir = path_with_suffix_extension(
conf.tenant_path(tenant_id).join("wal-redo-datadir"),
conf.tenant_path(tenant_id).join(format!("wal-redo-datadir-{}", processno)),
TEMP_FILE_SUFFIX,
);
@@ -725,7 +769,11 @@ impl PostgresRedoProcess {
// This could be problematic if there are millions of records to replay,
// but in practice the number of records is usually so small that it doesn't
// matter, and it's better to keep this code simple.
let mut writebuf: Vec<u8> = Vec::new();
//
// Most requests start with a before-image with BLCKSZ bytes, followed by
// by some other WAL records. Start with a buffer that can hold that
// comfortably.
let mut writebuf: Vec<u8> = Vec::with_capacity((BLCKSZ as usize) * 3);
build_begin_redo_for_block_msg(tag, &mut writebuf);
if let Some(img) = base_img {
build_push_page_msg(tag, &img, &mut writebuf);

View File

@@ -40,9 +40,16 @@
bool connected = false;
PGconn *pageserver_conn = NULL;
/*
* WaitEventSet containing:
* - WL_SOCKET_READABLE on pageserver_conn,
* - WL_LATCH_SET on MyLatch, and
* - WL_EXIT_ON_PM_DEATH.
*/
WaitEventSet *pageserver_conn_wes = NULL;
char *page_server_connstring_raw;
int n_unflushed_requests = 0;
int flush_every_n_requests = 8;
static void pageserver_flush(void);
@@ -63,6 +70,7 @@ pageserver_connect()
PQfinish(pageserver_conn);
pageserver_conn = NULL;
ereport(ERROR,
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
errmsg(NEON_TAG "could not establish connection to pageserver"),
@@ -78,22 +86,26 @@ pageserver_connect()
neon_log(ERROR, "could not send pagestream command to pageserver");
}
pageserver_conn_wes = CreateWaitEventSet(TopMemoryContext, 3);
AddWaitEventToSet(pageserver_conn_wes, WL_LATCH_SET, PGINVALID_SOCKET,
MyLatch, NULL);
AddWaitEventToSet(pageserver_conn_wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
NULL, NULL);
AddWaitEventToSet(pageserver_conn_wes, WL_SOCKET_READABLE, PQsocket(pageserver_conn), NULL, NULL);
while (PQisBusy(pageserver_conn))
{
int wc;
WaitEvent event;
/* Sleep until there's something to do */
wc = WaitLatchOrSocket(MyLatch,
WL_LATCH_SET | WL_SOCKET_READABLE |
WL_EXIT_ON_PM_DEATH,
PQsocket(pageserver_conn),
-1L, PG_WAIT_EXTENSION);
wc = WaitEventSetWait(pageserver_conn_wes, -1L, &event, 1, PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
/* Data available in socket? */
if (wc & WL_SOCKET_READABLE)
if (event.events & WL_SOCKET_READABLE)
{
if (!PQconsumeInput(pageserver_conn))
{
@@ -101,6 +113,7 @@ pageserver_connect()
PQfinish(pageserver_conn);
pageserver_conn = NULL;
FreeWaitEventSet(pageserver_conn_wes);
neon_log(ERROR, "could not complete handshake with pageserver: %s",
msg);
@@ -117,33 +130,30 @@ pageserver_connect()
* A wrapper around PQgetCopyData that checks for interrupts while sleeping.
*/
static int
call_PQgetCopyData(PGconn *conn, char **buffer)
call_PQgetCopyData(char **buffer)
{
int ret;
retry:
ret = PQgetCopyData(conn, buffer, 1 /* async */ );
ret = PQgetCopyData(pageserver_conn, buffer, 1 /* async */ );
if (ret == 0)
{
int wc;
WaitEvent event;
/* Sleep until there's something to do */
wc = WaitLatchOrSocket(MyLatch,
WL_LATCH_SET | WL_SOCKET_READABLE |
WL_EXIT_ON_PM_DEATH,
PQsocket(conn),
-1L, PG_WAIT_EXTENSION);
wc = WaitEventSetWait(pageserver_conn_wes, -1L, &event, 1, PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
/* Data available in socket? */
if (wc & WL_SOCKET_READABLE)
if (event.events & WL_SOCKET_READABLE)
{
if (!PQconsumeInput(conn))
if (!PQconsumeInput(pageserver_conn))
neon_log(ERROR, "could not get response from pageserver: %s",
PQerrorMessage(conn));
PQerrorMessage(pageserver_conn));
}
goto retry;
@@ -172,6 +182,8 @@ pageserver_disconnect(void)
prefetch_on_ps_disconnect();
}
if (pageserver_conn_wes != NULL)
FreeWaitEventSet(pageserver_conn_wes);
}
static void
@@ -205,11 +217,6 @@ pageserver_send(NeonRequest * request)
}
pfree(req_buff.data);
n_unflushed_requests++;
if (flush_every_n_requests > 0 && n_unflushed_requests >= flush_every_n_requests)
pageserver_flush();
if (message_level_is_interesting(PageStoreTrace))
{
char *msg = nm_to_string((NeonMessage *) request);
@@ -228,7 +235,7 @@ pageserver_receive(void)
PG_TRY();
{
/* read response */
resp_buff.len = call_PQgetCopyData(pageserver_conn, &resp_buff.data);
resp_buff.len = call_PQgetCopyData(&resp_buff.data);
resp_buff.cursor = 0;
if (resp_buff.len < 0)
@@ -274,7 +281,6 @@ pageserver_flush(void)
pageserver_disconnect();
neon_log(ERROR, "failed to flush page requests: %s", msg);
}
n_unflushed_requests = 0;
}
page_server_api api = {
@@ -436,7 +442,7 @@ pg_init_libpagestore(void)
NULL,
&flush_every_n_requests,
8, -1, INT_MAX,
PGC_SIGHUP,
PGC_USERSET,
0, /* no flags required */
NULL, NULL, NULL);

View File

@@ -150,6 +150,7 @@ extern void prefetch_on_ps_disconnect(void);
extern page_server_api * page_server;
extern char *page_server_connstring;
extern int flush_every_n_requests;
extern bool seqscan_prefetch_enabled;
extern int seqscan_prefetch_distance;
extern char *neon_timeline;

View File

@@ -192,7 +192,7 @@ typedef struct PrfHashEntry {
* It maintains a (ring) buffer of in-flight requests and responses.
*
* We maintain several indexes into the ring buffer:
* ring_unused >= ring_receive >= ring_last >= 0
* ring_unused >= ring_flush >= ring_receive >= ring_last >= 0
*
* ring_unused points to the first unused slot of the buffer
* ring_receive is the next request that is to be received
@@ -208,6 +208,7 @@ typedef struct PrefetchState {
/* buffer indexes */
uint64 ring_unused; /* first unused slot */
uint64 ring_flush; /* next request to flush */
uint64 ring_receive; /* next slot that is to receive a response */
uint64 ring_last; /* min slot with a response value */
@@ -577,6 +578,13 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls
prefetch_do_request(slot, force_latest, force_lsn);
Assert(slot->status == PRFS_REQUESTED);
Assert(ring_index < MyPState->ring_unused);
if (flush_every_n_requests > 0 && MyPState->ring_unused - MyPState->ring_flush >= flush_every_n_requests)
{
page_server->flush();
MyPState->ring_flush = MyPState->ring_unused;
}
return ring_index;
}
@@ -585,6 +593,7 @@ page_server_request(void const *req)
{
page_server->send((NeonRequest *) req);
page_server->flush();
MyPState->ring_flush = MyPState->ring_unused;
consume_prefetch_responses();
return page_server->receive();
}
@@ -1581,6 +1590,7 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
if (entry->slot->status == PRFS_REQUESTED)
{
page_server->flush();
MyPState->ring_flush = MyPState->ring_unused;
prefetch_wait_for(entry->slot->my_ring_index);
}
/* drop caches */
@@ -1606,7 +1616,11 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
Assert(slot->status != PRFS_UNUSED);
Assert(&MyPState->prf_buffer[(ring_index % READ_BUFFER_SIZE)] == slot);
page_server->flush();
if (ring_index >= MyPState->ring_flush)
{
page_server->flush();
MyPState->ring_flush = MyPState->ring_unused;
}
prefetch_wait_for(ring_index);
Assert(slot->status == PRFS_RECEIVED);