mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-19 03:12:55 +00:00
Compare commits
5 Commits
hackathon/
...
parallel-g
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c2d5cee943 | ||
|
|
db7ca18c43 | ||
|
|
a726b37bca | ||
|
|
5bd843551c | ||
|
|
c5c4e47edd |
@@ -13,6 +13,7 @@ use anyhow::{bail, ensure, Context, Result};
|
||||
use bytes::Buf;
|
||||
use bytes::Bytes;
|
||||
use futures::{Stream, StreamExt};
|
||||
use futures::stream::FuturesOrdered;
|
||||
use pageserver_api::models::{
|
||||
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
|
||||
PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
|
||||
@@ -39,10 +40,9 @@ use utils::{
|
||||
};
|
||||
|
||||
use crate::basebackup;
|
||||
use crate::config::{PageServerConf, ProfilingConfig};
|
||||
use crate::config::PageServerConf;
|
||||
use crate::import_datadir::import_wal_from_tar;
|
||||
use crate::metrics::{LIVE_CONNECTIONS_COUNT, SMGR_QUERY_TIME};
|
||||
use crate::profiling::profpoint_start;
|
||||
use crate::task_mgr;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::Timeline;
|
||||
@@ -276,10 +276,10 @@ impl PageServerHandler {
|
||||
pgb.write_message(&BeMessage::CopyBothResponse)?;
|
||||
pgb.flush().await?;
|
||||
|
||||
let metrics = PageRequestMetrics::new(&tenant_id, &timeline_id);
|
||||
let mut inprogress_requests = FuturesOrdered::new();
|
||||
|
||||
loop {
|
||||
let msg = tokio::select! {
|
||||
tokio::select! {
|
||||
biased;
|
||||
|
||||
_ = task_mgr::shutdown_watcher() => {
|
||||
@@ -288,51 +288,58 @@ impl PageServerHandler {
|
||||
break;
|
||||
}
|
||||
|
||||
msg = pgb.read_message() => { msg }
|
||||
};
|
||||
|
||||
let copy_data_bytes = match msg? {
|
||||
Some(FeMessage::CopyData(bytes)) => bytes,
|
||||
Some(m) => {
|
||||
bail!("unexpected message: {m:?} during COPY");
|
||||
response = inprogress_requests.next(), if !inprogress_requests.is_empty() => {
|
||||
let response: Bytes = response.unwrap()?;
|
||||
pgb.write_message(&BeMessage::CopyData(&response))?;
|
||||
pgb.flush().await?;
|
||||
continue;
|
||||
}
|
||||
None => break, // client disconnected
|
||||
};
|
||||
|
||||
trace!("query: {copy_data_bytes:?}");
|
||||
msg = pgb.read_message() => {
|
||||
let copy_data_bytes = match msg? {
|
||||
Some(FeMessage::CopyData(bytes)) => bytes,
|
||||
Some(m) => {
|
||||
bail!("unexpected message: {m:?} during COPY");
|
||||
}
|
||||
None => break, // client disconnected
|
||||
};
|
||||
|
||||
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
|
||||
trace!("query: {copy_data_bytes:?}");
|
||||
|
||||
let response = match neon_fe_msg {
|
||||
PagestreamFeMessage::Exists(req) => {
|
||||
let _timer = metrics.get_rel_exists.start_timer();
|
||||
self.handle_get_rel_exists_request(&timeline, &req).await
|
||||
}
|
||||
PagestreamFeMessage::Nblocks(req) => {
|
||||
let _timer = metrics.get_rel_size.start_timer();
|
||||
self.handle_get_nblocks_request(&timeline, &req).await
|
||||
}
|
||||
PagestreamFeMessage::GetPage(req) => {
|
||||
let _timer = metrics.get_page_at_lsn.start_timer();
|
||||
self.handle_get_page_at_lsn_request(&timeline, &req).await
|
||||
}
|
||||
PagestreamFeMessage::DbSize(req) => {
|
||||
let _timer = metrics.get_db_size.start_timer();
|
||||
self.handle_db_size_request(&timeline, &req).await
|
||||
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
|
||||
|
||||
let timeline = Arc::clone(&timeline);
|
||||
let task = async move {
|
||||
let response = match neon_fe_msg {
|
||||
PagestreamFeMessage::Exists(req) => {
|
||||
Self::handle_get_rel_exists_request(&timeline, &req).await
|
||||
}
|
||||
PagestreamFeMessage::Nblocks(req) => {
|
||||
Self::handle_get_nblocks_request(&timeline, &req).await
|
||||
}
|
||||
PagestreamFeMessage::GetPage(req) => {
|
||||
Self::handle_get_page_at_lsn_request(&timeline, &req).await
|
||||
}
|
||||
PagestreamFeMessage::DbSize(req) => {
|
||||
Self::handle_db_size_request(&timeline, &req).await
|
||||
}
|
||||
};
|
||||
|
||||
let response = response.unwrap_or_else(|e| {
|
||||
// print the all details to the log with {:#}, but for the client the
|
||||
// error message is enough
|
||||
error!("error reading relation or page version: {:?}", e);
|
||||
PagestreamBeMessage::Error(PagestreamErrorResponse {
|
||||
message: e.to_string(),
|
||||
})
|
||||
});
|
||||
let response: Bytes = response.serialize();
|
||||
response
|
||||
};
|
||||
inprogress_requests.push_back(tokio::spawn(task));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let response = response.unwrap_or_else(|e| {
|
||||
// print the all details to the log with {:#}, but for the client the
|
||||
// error message is enough
|
||||
error!("error reading relation or page version: {:?}", e);
|
||||
PagestreamBeMessage::Error(PagestreamErrorResponse {
|
||||
message: e.to_string(),
|
||||
})
|
||||
});
|
||||
|
||||
pgb.write_message(&BeMessage::CopyData(&response.serialize()))?;
|
||||
pgb.flush().await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -504,9 +511,8 @@ impl PageServerHandler {
|
||||
Ok(lsn)
|
||||
}
|
||||
|
||||
#[instrument(skip(self, timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
|
||||
#[instrument(skip(timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
|
||||
async fn handle_get_rel_exists_request(
|
||||
&self,
|
||||
timeline: &Timeline,
|
||||
req: &PagestreamExistsRequest,
|
||||
) -> Result<PagestreamBeMessage> {
|
||||
@@ -521,9 +527,8 @@ impl PageServerHandler {
|
||||
}))
|
||||
}
|
||||
|
||||
#[instrument(skip(self, timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
|
||||
#[instrument(skip(timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
|
||||
async fn handle_get_nblocks_request(
|
||||
&self,
|
||||
timeline: &Timeline,
|
||||
req: &PagestreamNblocksRequest,
|
||||
) -> Result<PagestreamBeMessage> {
|
||||
@@ -538,9 +543,8 @@ impl PageServerHandler {
|
||||
}))
|
||||
}
|
||||
|
||||
#[instrument(skip(self, timeline, req), fields(dbnode = %req.dbnode, req_lsn = %req.lsn))]
|
||||
#[instrument(skip(timeline, req), fields(dbnode = %req.dbnode, req_lsn = %req.lsn))]
|
||||
async fn handle_db_size_request(
|
||||
&self,
|
||||
timeline: &Timeline,
|
||||
req: &PagestreamDbSizeRequest,
|
||||
) -> Result<PagestreamBeMessage> {
|
||||
@@ -558,9 +562,8 @@ impl PageServerHandler {
|
||||
}))
|
||||
}
|
||||
|
||||
#[instrument(skip(self, timeline, req), fields(rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn))]
|
||||
#[instrument(skip(timeline, req), fields(rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn))]
|
||||
async fn handle_get_page_at_lsn_request(
|
||||
&self,
|
||||
timeline: &Timeline,
|
||||
req: &PagestreamGetPageRequest,
|
||||
) -> Result<PagestreamBeMessage> {
|
||||
@@ -579,7 +582,7 @@ impl PageServerHandler {
|
||||
// FIXME: this profiling now happens at different place than it used to. The
|
||||
// current profiling is based on a thread-local variable, so it doesn't work
|
||||
// across awaits
|
||||
let _profiling_guard = profpoint_start(self.conf, ProfilingConfig::PageRequests);
|
||||
//let _profiling_guard = profpoint_start(self.conf, ProfilingConfig::PageRequests);
|
||||
let page = timeline.get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest)?;
|
||||
|
||||
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
|
||||
@@ -587,9 +590,8 @@ impl PageServerHandler {
|
||||
}))
|
||||
}
|
||||
|
||||
#[instrument(skip(self, pgb))]
|
||||
#[instrument(skip(pgb))]
|
||||
async fn handle_basebackup_request(
|
||||
&self,
|
||||
pgb: &mut PostgresBackend,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
@@ -722,7 +724,7 @@ impl postgres_backend_async::Handler for PageServerHandler {
|
||||
};
|
||||
|
||||
// Check that the timeline exists
|
||||
self.handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, None, false)
|
||||
Self::handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, None, false)
|
||||
.await?;
|
||||
pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
}
|
||||
@@ -782,7 +784,7 @@ impl postgres_backend_async::Handler for PageServerHandler {
|
||||
self.check_permission(Some(tenant_id))?;
|
||||
|
||||
// Check that the timeline exists
|
||||
self.handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, prev_lsn, true)
|
||||
Self::handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, prev_lsn, true)
|
||||
.await?;
|
||||
pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
} else if query_string.starts_with("import basebackup ") {
|
||||
|
||||
@@ -74,6 +74,7 @@ where
|
||||
};
|
||||
|
||||
dstbuf.clear();
|
||||
dstbuf.reserve(len);
|
||||
|
||||
// Read the payload
|
||||
let mut remain = len;
|
||||
|
||||
@@ -260,8 +260,9 @@ impl Layer for DeltaLayer {
|
||||
|
||||
// Ok, 'offsets' now contains the offsets of all the entries we need to read
|
||||
let mut cursor = file.block_cursor();
|
||||
let mut buf = Vec::new();
|
||||
for (entry_lsn, pos) in offsets {
|
||||
let buf = cursor.read_blob(pos).with_context(|| {
|
||||
cursor.read_blob_into_buf(pos, &mut buf).with_context(|| {
|
||||
format!(
|
||||
"Failed to read blob from virtual file {}",
|
||||
file.file.path.display()
|
||||
|
||||
@@ -21,6 +21,7 @@
|
||||
use byteorder::{ByteOrder, LittleEndian};
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use nix::poll::*;
|
||||
use once_cell::sync::Lazy;
|
||||
use serde::Serialize;
|
||||
use std::fs;
|
||||
use std::fs::OpenOptions;
|
||||
@@ -31,7 +32,8 @@ use std::os::unix::prelude::CommandExt;
|
||||
use std::path::PathBuf;
|
||||
use std::process::Stdio;
|
||||
use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command};
|
||||
use std::sync::Mutex;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::{Condvar, Mutex};
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use tracing::*;
|
||||
@@ -55,6 +57,9 @@ use postgres_ffi::v14::nonrelfile_utils::{
|
||||
};
|
||||
use postgres_ffi::BLCKSZ;
|
||||
|
||||
/// Maximum number of WAL redo processes to launch for a single tenant.
|
||||
const MAX_PROCESSES: usize = 4;
|
||||
|
||||
///
|
||||
/// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster.
|
||||
///
|
||||
@@ -88,18 +93,32 @@ pub trait WalRedoManager: Send + Sync {
|
||||
) -> Result<Bytes, WalRedoError>;
|
||||
}
|
||||
|
||||
static WAL_REDO_PROCESS_COUNTER: Lazy<AtomicU64> = Lazy::new(|| { AtomicU64::new(0) });
|
||||
|
||||
///
|
||||
/// This is the real implementation that uses a Postgres process to
|
||||
/// perform WAL replay. Only one thread can use the process at a time,
|
||||
/// that is controlled by the Mutex. In the future, we might want to
|
||||
/// launch a pool of processes to allow concurrent replay of multiple
|
||||
/// records.
|
||||
/// This is the real implementation that uses a special Postgres
|
||||
/// process to perform WAL replay. There is a pool of these processes.
|
||||
///
|
||||
pub struct PostgresRedoManager {
|
||||
tenant_id: TenantId,
|
||||
conf: &'static PageServerConf,
|
||||
|
||||
process: Mutex<Option<PostgresRedoProcess>>,
|
||||
/// Pool of processes.
|
||||
process_list: Mutex<ProcessList>,
|
||||
/// Condition variable that can be used to sleep until a process
|
||||
/// becomes available in the pool.
|
||||
condvar: Condvar,
|
||||
}
|
||||
|
||||
// A pool of WAL redo processes
|
||||
#[derive(Default)]
|
||||
struct ProcessList {
|
||||
/// processes that are available for reuse
|
||||
free_processes: Vec<PostgresRedoProcess>,
|
||||
|
||||
/// Total number of processes, including all the processes in
|
||||
/// 'free_processes' list, and any processes that are in use.
|
||||
num_processes: usize,
|
||||
}
|
||||
|
||||
/// Can this request be served by neon redo functions
|
||||
@@ -204,7 +223,32 @@ impl PostgresRedoManager {
|
||||
PostgresRedoManager {
|
||||
tenant_id,
|
||||
conf,
|
||||
process: Mutex::new(None),
|
||||
process_list: Mutex::new(ProcessList::default()),
|
||||
condvar: Condvar::new(),
|
||||
}
|
||||
}
|
||||
|
||||
// Get a handle to a redo process from the pool.
|
||||
fn get_process(&self, pg_version: u32) -> Result<PostgresRedoProcess, WalRedoError> {
|
||||
let mut process_list = self.process_list.lock().unwrap();
|
||||
|
||||
loop {
|
||||
// If there's a free process immediately available, take it.
|
||||
if let Some(process) = process_list.free_processes.pop() {
|
||||
return Ok(process);
|
||||
}
|
||||
|
||||
// All processes are in use. If the pool is at its maximum size
|
||||
// already, wait for a process to become free. Otherwise launch
|
||||
// a new process.
|
||||
if process_list.num_processes >= MAX_PROCESSES {
|
||||
process_list = self.condvar.wait(process_list).unwrap();
|
||||
continue;
|
||||
} else {
|
||||
let process = PostgresRedoProcess::launch(self.conf, &self.tenant_id, pg_version)?;
|
||||
process_list.num_processes += 1;
|
||||
return Ok(process);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -224,15 +268,9 @@ impl PostgresRedoManager {
|
||||
|
||||
let start_time = Instant::now();
|
||||
|
||||
let mut process_guard = self.process.lock().unwrap();
|
||||
let lock_time = Instant::now();
|
||||
let mut process = self.get_process(pg_version)?;
|
||||
|
||||
// launch the WAL redo process on first use
|
||||
if process_guard.is_none() {
|
||||
let p = PostgresRedoProcess::launch(self.conf, &self.tenant_id, pg_version)?;
|
||||
*process_guard = Some(p);
|
||||
}
|
||||
let process = process_guard.as_mut().unwrap();
|
||||
let lock_time = Instant::now();
|
||||
|
||||
WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64());
|
||||
|
||||
@@ -266,8 +304,9 @@ impl PostgresRedoManager {
|
||||
lsn
|
||||
);
|
||||
|
||||
// If something went wrong, don't try to reuse the process. Kill it, and
|
||||
// next request will launch a new one.
|
||||
// If something went wrong, don't try to reuse the
|
||||
// process. Kill it, and next request will launch a new one.
|
||||
// Otherwise return the process to the pool.
|
||||
if result.is_err() {
|
||||
error!(
|
||||
"error applying {} WAL records ({} bytes) to reconstruct page image at LSN {}",
|
||||
@@ -275,8 +314,14 @@ impl PostgresRedoManager {
|
||||
nbytes,
|
||||
lsn
|
||||
);
|
||||
let process = process_guard.take().unwrap();
|
||||
process.kill();
|
||||
let mut process_list = self.process_list.lock().unwrap();
|
||||
process_list.num_processes -= 1;
|
||||
self.condvar.notify_one();
|
||||
} else {
|
||||
let mut process_list = self.process_list.lock().unwrap();
|
||||
process_list.free_processes.push(process);
|
||||
self.condvar.notify_one();
|
||||
}
|
||||
result
|
||||
}
|
||||
@@ -594,11 +639,10 @@ impl PostgresRedoProcess {
|
||||
tenant_id: &TenantId,
|
||||
pg_version: u32,
|
||||
) -> Result<PostgresRedoProcess, Error> {
|
||||
// FIXME: We need a dummy Postgres cluster to run the process in. Currently, we
|
||||
// just create one with constant name. That fails if you try to launch more than
|
||||
// one WAL redo manager concurrently.
|
||||
// We need a dummy Postgres cluster to run the process in.
|
||||
let processno = WAL_REDO_PROCESS_COUNTER.fetch_add(1, Ordering::Relaxed);
|
||||
let datadir = path_with_suffix_extension(
|
||||
conf.tenant_path(tenant_id).join("wal-redo-datadir"),
|
||||
conf.tenant_path(tenant_id).join(format!("wal-redo-datadir-{}", processno)),
|
||||
TEMP_FILE_SUFFIX,
|
||||
);
|
||||
|
||||
@@ -725,7 +769,11 @@ impl PostgresRedoProcess {
|
||||
// This could be problematic if there are millions of records to replay,
|
||||
// but in practice the number of records is usually so small that it doesn't
|
||||
// matter, and it's better to keep this code simple.
|
||||
let mut writebuf: Vec<u8> = Vec::new();
|
||||
//
|
||||
// Most requests start with a before-image with BLCKSZ bytes, followed by
|
||||
// by some other WAL records. Start with a buffer that can hold that
|
||||
// comfortably.
|
||||
let mut writebuf: Vec<u8> = Vec::with_capacity((BLCKSZ as usize) * 3);
|
||||
build_begin_redo_for_block_msg(tag, &mut writebuf);
|
||||
if let Some(img) = base_img {
|
||||
build_push_page_msg(tag, &img, &mut writebuf);
|
||||
|
||||
@@ -40,9 +40,16 @@
|
||||
bool connected = false;
|
||||
PGconn *pageserver_conn = NULL;
|
||||
|
||||
/*
|
||||
* WaitEventSet containing:
|
||||
* - WL_SOCKET_READABLE on pageserver_conn,
|
||||
* - WL_LATCH_SET on MyLatch, and
|
||||
* - WL_EXIT_ON_PM_DEATH.
|
||||
*/
|
||||
WaitEventSet *pageserver_conn_wes = NULL;
|
||||
|
||||
char *page_server_connstring_raw;
|
||||
|
||||
int n_unflushed_requests = 0;
|
||||
int flush_every_n_requests = 8;
|
||||
|
||||
static void pageserver_flush(void);
|
||||
@@ -63,6 +70,7 @@ pageserver_connect()
|
||||
|
||||
PQfinish(pageserver_conn);
|
||||
pageserver_conn = NULL;
|
||||
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
|
||||
errmsg(NEON_TAG "could not establish connection to pageserver"),
|
||||
@@ -78,22 +86,26 @@ pageserver_connect()
|
||||
neon_log(ERROR, "could not send pagestream command to pageserver");
|
||||
}
|
||||
|
||||
pageserver_conn_wes = CreateWaitEventSet(TopMemoryContext, 3);
|
||||
AddWaitEventToSet(pageserver_conn_wes, WL_LATCH_SET, PGINVALID_SOCKET,
|
||||
MyLatch, NULL);
|
||||
AddWaitEventToSet(pageserver_conn_wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
|
||||
NULL, NULL);
|
||||
AddWaitEventToSet(pageserver_conn_wes, WL_SOCKET_READABLE, PQsocket(pageserver_conn), NULL, NULL);
|
||||
|
||||
while (PQisBusy(pageserver_conn))
|
||||
{
|
||||
int wc;
|
||||
WaitEvent event;
|
||||
|
||||
/* Sleep until there's something to do */
|
||||
wc = WaitLatchOrSocket(MyLatch,
|
||||
WL_LATCH_SET | WL_SOCKET_READABLE |
|
||||
WL_EXIT_ON_PM_DEATH,
|
||||
PQsocket(pageserver_conn),
|
||||
-1L, PG_WAIT_EXTENSION);
|
||||
wc = WaitEventSetWait(pageserver_conn_wes, -1L, &event, 1, PG_WAIT_EXTENSION);
|
||||
ResetLatch(MyLatch);
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
/* Data available in socket? */
|
||||
if (wc & WL_SOCKET_READABLE)
|
||||
if (event.events & WL_SOCKET_READABLE)
|
||||
{
|
||||
if (!PQconsumeInput(pageserver_conn))
|
||||
{
|
||||
@@ -101,6 +113,7 @@ pageserver_connect()
|
||||
|
||||
PQfinish(pageserver_conn);
|
||||
pageserver_conn = NULL;
|
||||
FreeWaitEventSet(pageserver_conn_wes);
|
||||
|
||||
neon_log(ERROR, "could not complete handshake with pageserver: %s",
|
||||
msg);
|
||||
@@ -117,33 +130,30 @@ pageserver_connect()
|
||||
* A wrapper around PQgetCopyData that checks for interrupts while sleeping.
|
||||
*/
|
||||
static int
|
||||
call_PQgetCopyData(PGconn *conn, char **buffer)
|
||||
call_PQgetCopyData(char **buffer)
|
||||
{
|
||||
int ret;
|
||||
|
||||
retry:
|
||||
ret = PQgetCopyData(conn, buffer, 1 /* async */ );
|
||||
ret = PQgetCopyData(pageserver_conn, buffer, 1 /* async */ );
|
||||
|
||||
if (ret == 0)
|
||||
{
|
||||
int wc;
|
||||
WaitEvent event;
|
||||
|
||||
/* Sleep until there's something to do */
|
||||
wc = WaitLatchOrSocket(MyLatch,
|
||||
WL_LATCH_SET | WL_SOCKET_READABLE |
|
||||
WL_EXIT_ON_PM_DEATH,
|
||||
PQsocket(conn),
|
||||
-1L, PG_WAIT_EXTENSION);
|
||||
wc = WaitEventSetWait(pageserver_conn_wes, -1L, &event, 1, PG_WAIT_EXTENSION);
|
||||
ResetLatch(MyLatch);
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
/* Data available in socket? */
|
||||
if (wc & WL_SOCKET_READABLE)
|
||||
if (event.events & WL_SOCKET_READABLE)
|
||||
{
|
||||
if (!PQconsumeInput(conn))
|
||||
if (!PQconsumeInput(pageserver_conn))
|
||||
neon_log(ERROR, "could not get response from pageserver: %s",
|
||||
PQerrorMessage(conn));
|
||||
PQerrorMessage(pageserver_conn));
|
||||
}
|
||||
|
||||
goto retry;
|
||||
@@ -172,6 +182,8 @@ pageserver_disconnect(void)
|
||||
|
||||
prefetch_on_ps_disconnect();
|
||||
}
|
||||
if (pageserver_conn_wes != NULL)
|
||||
FreeWaitEventSet(pageserver_conn_wes);
|
||||
}
|
||||
|
||||
static void
|
||||
@@ -205,11 +217,6 @@ pageserver_send(NeonRequest * request)
|
||||
}
|
||||
pfree(req_buff.data);
|
||||
|
||||
n_unflushed_requests++;
|
||||
|
||||
if (flush_every_n_requests > 0 && n_unflushed_requests >= flush_every_n_requests)
|
||||
pageserver_flush();
|
||||
|
||||
if (message_level_is_interesting(PageStoreTrace))
|
||||
{
|
||||
char *msg = nm_to_string((NeonMessage *) request);
|
||||
@@ -228,7 +235,7 @@ pageserver_receive(void)
|
||||
PG_TRY();
|
||||
{
|
||||
/* read response */
|
||||
resp_buff.len = call_PQgetCopyData(pageserver_conn, &resp_buff.data);
|
||||
resp_buff.len = call_PQgetCopyData(&resp_buff.data);
|
||||
resp_buff.cursor = 0;
|
||||
|
||||
if (resp_buff.len < 0)
|
||||
@@ -274,7 +281,6 @@ pageserver_flush(void)
|
||||
pageserver_disconnect();
|
||||
neon_log(ERROR, "failed to flush page requests: %s", msg);
|
||||
}
|
||||
n_unflushed_requests = 0;
|
||||
}
|
||||
|
||||
page_server_api api = {
|
||||
@@ -436,7 +442,7 @@ pg_init_libpagestore(void)
|
||||
NULL,
|
||||
&flush_every_n_requests,
|
||||
8, -1, INT_MAX,
|
||||
PGC_SIGHUP,
|
||||
PGC_USERSET,
|
||||
0, /* no flags required */
|
||||
NULL, NULL, NULL);
|
||||
|
||||
|
||||
@@ -150,6 +150,7 @@ extern void prefetch_on_ps_disconnect(void);
|
||||
extern page_server_api * page_server;
|
||||
|
||||
extern char *page_server_connstring;
|
||||
extern int flush_every_n_requests;
|
||||
extern bool seqscan_prefetch_enabled;
|
||||
extern int seqscan_prefetch_distance;
|
||||
extern char *neon_timeline;
|
||||
|
||||
@@ -192,7 +192,7 @@ typedef struct PrfHashEntry {
|
||||
* It maintains a (ring) buffer of in-flight requests and responses.
|
||||
*
|
||||
* We maintain several indexes into the ring buffer:
|
||||
* ring_unused >= ring_receive >= ring_last >= 0
|
||||
* ring_unused >= ring_flush >= ring_receive >= ring_last >= 0
|
||||
*
|
||||
* ring_unused points to the first unused slot of the buffer
|
||||
* ring_receive is the next request that is to be received
|
||||
@@ -208,6 +208,7 @@ typedef struct PrefetchState {
|
||||
|
||||
/* buffer indexes */
|
||||
uint64 ring_unused; /* first unused slot */
|
||||
uint64 ring_flush; /* next request to flush */
|
||||
uint64 ring_receive; /* next slot that is to receive a response */
|
||||
uint64 ring_last; /* min slot with a response value */
|
||||
|
||||
@@ -577,6 +578,13 @@ prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_ls
|
||||
prefetch_do_request(slot, force_latest, force_lsn);
|
||||
Assert(slot->status == PRFS_REQUESTED);
|
||||
Assert(ring_index < MyPState->ring_unused);
|
||||
|
||||
if (flush_every_n_requests > 0 && MyPState->ring_unused - MyPState->ring_flush >= flush_every_n_requests)
|
||||
{
|
||||
page_server->flush();
|
||||
MyPState->ring_flush = MyPState->ring_unused;
|
||||
}
|
||||
|
||||
return ring_index;
|
||||
}
|
||||
|
||||
@@ -585,6 +593,7 @@ page_server_request(void const *req)
|
||||
{
|
||||
page_server->send((NeonRequest *) req);
|
||||
page_server->flush();
|
||||
MyPState->ring_flush = MyPState->ring_unused;
|
||||
consume_prefetch_responses();
|
||||
return page_server->receive();
|
||||
}
|
||||
@@ -1581,6 +1590,7 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
|
||||
if (entry->slot->status == PRFS_REQUESTED)
|
||||
{
|
||||
page_server->flush();
|
||||
MyPState->ring_flush = MyPState->ring_unused;
|
||||
prefetch_wait_for(entry->slot->my_ring_index);
|
||||
}
|
||||
/* drop caches */
|
||||
@@ -1606,7 +1616,11 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
|
||||
Assert(slot->status != PRFS_UNUSED);
|
||||
Assert(&MyPState->prf_buffer[(ring_index % READ_BUFFER_SIZE)] == slot);
|
||||
|
||||
page_server->flush();
|
||||
if (ring_index >= MyPState->ring_flush)
|
||||
{
|
||||
page_server->flush();
|
||||
MyPState->ring_flush = MyPState->ring_unused;
|
||||
}
|
||||
prefetch_wait_for(ring_index);
|
||||
|
||||
Assert(slot->status == PRFS_RECEIVED);
|
||||
|
||||
Reference in New Issue
Block a user