Compare commits

..

2 Commits

Author SHA1 Message Date
Heikki Linnakangas
004b6bbac7 Add timing 2022-11-07 13:27:58 +02:00
Heikki Linnakangas
d9a14c9521 Strip useless debug sections from binaries, to make them smaller.
The rust compiler or linker (I'm not sure which) emits .debug_pubnames
and .debug_pubtypes sections in the binaries. They are supposed to speed
up launching a debugger, but they're obsolete. They've been superseded
by .debug_names section in more recent DWARF spec and debuggers, and
gdb and lldb just ignores them.

I could not find any way to prevent rustc / ldb / mold from emitting
these sections in the first place. So this commit adds a hack to strip
them off afterwards. This makes the binaries about 30% smaller, with
the downside of adding about 30 s to the build time in CI. That seems
like a good tradeoff, as smaller binaries can speed up other steps.
2022-11-07 13:27:58 +02:00
23 changed files with 620 additions and 1039 deletions

View File

@@ -168,6 +168,11 @@ jobs:
${cov_prefix} cargo test $CARGO_FLAGS
shell: bash -euxo pipefail {0}
- name: Slim binaries
run: |
scripts/strip-useless-debug.py -j$(nproc) target
shell: bash -euxo pipefail {0}
- name: Install rust binaries
run: |
# Install target binaries

View File

@@ -13,7 +13,6 @@ use anyhow::{bail, ensure, Context, Result};
use bytes::Buf;
use bytes::Bytes;
use futures::{Stream, StreamExt};
use futures::stream::FuturesOrdered;
use pageserver_api::models::{
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
@@ -40,9 +39,10 @@ use utils::{
};
use crate::basebackup;
use crate::config::PageServerConf;
use crate::config::{PageServerConf, ProfilingConfig};
use crate::import_datadir::import_wal_from_tar;
use crate::metrics::{LIVE_CONNECTIONS_COUNT, SMGR_QUERY_TIME};
use crate::profiling::profpoint_start;
use crate::task_mgr;
use crate::task_mgr::TaskKind;
use crate::tenant::Timeline;
@@ -276,10 +276,10 @@ impl PageServerHandler {
pgb.write_message(&BeMessage::CopyBothResponse)?;
pgb.flush().await?;
let mut inprogress_requests = FuturesOrdered::new();
let metrics = PageRequestMetrics::new(&tenant_id, &timeline_id);
loop {
tokio::select! {
let msg = tokio::select! {
biased;
_ = task_mgr::shutdown_watcher() => {
@@ -288,58 +288,51 @@ impl PageServerHandler {
break;
}
response = inprogress_requests.next(), if !inprogress_requests.is_empty() => {
let response: Bytes = response.unwrap()?;
pgb.write_message(&BeMessage::CopyData(&response))?;
pgb.flush().await?;
continue;
msg = pgb.read_message() => { msg }
};
let copy_data_bytes = match msg? {
Some(FeMessage::CopyData(bytes)) => bytes,
Some(m) => {
bail!("unexpected message: {m:?} during COPY");
}
None => break, // client disconnected
};
msg = pgb.read_message() => {
let copy_data_bytes = match msg? {
Some(FeMessage::CopyData(bytes)) => bytes,
Some(m) => {
bail!("unexpected message: {m:?} during COPY");
}
None => break, // client disconnected
};
trace!("query: {copy_data_bytes:?}");
trace!("query: {copy_data_bytes:?}");
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
let timeline = Arc::clone(&timeline);
let task = async move {
let response = match neon_fe_msg {
PagestreamFeMessage::Exists(req) => {
Self::handle_get_rel_exists_request(&timeline, &req).await
}
PagestreamFeMessage::Nblocks(req) => {
Self::handle_get_nblocks_request(&timeline, &req).await
}
PagestreamFeMessage::GetPage(req) => {
Self::handle_get_page_at_lsn_request(&timeline, &req).await
}
PagestreamFeMessage::DbSize(req) => {
Self::handle_db_size_request(&timeline, &req).await
}
};
let response = response.unwrap_or_else(|e| {
// print the all details to the log with {:#}, but for the client the
// error message is enough
error!("error reading relation or page version: {:?}", e);
PagestreamBeMessage::Error(PagestreamErrorResponse {
message: e.to_string(),
})
});
let response: Bytes = response.serialize();
response
};
inprogress_requests.push_back(tokio::spawn(task));
continue;
let response = match neon_fe_msg {
PagestreamFeMessage::Exists(req) => {
let _timer = metrics.get_rel_exists.start_timer();
self.handle_get_rel_exists_request(&timeline, &req).await
}
PagestreamFeMessage::Nblocks(req) => {
let _timer = metrics.get_rel_size.start_timer();
self.handle_get_nblocks_request(&timeline, &req).await
}
PagestreamFeMessage::GetPage(req) => {
let _timer = metrics.get_page_at_lsn.start_timer();
self.handle_get_page_at_lsn_request(&timeline, &req).await
}
PagestreamFeMessage::DbSize(req) => {
let _timer = metrics.get_db_size.start_timer();
self.handle_db_size_request(&timeline, &req).await
}
};
let response = response.unwrap_or_else(|e| {
// print the all details to the log with {:#}, but for the client the
// error message is enough
error!("error reading relation or page version: {:?}", e);
PagestreamBeMessage::Error(PagestreamErrorResponse {
message: e.to_string(),
})
});
pgb.write_message(&BeMessage::CopyData(&response.serialize()))?;
pgb.flush().await?;
}
Ok(())
}
@@ -511,8 +504,9 @@ impl PageServerHandler {
Ok(lsn)
}
#[instrument(skip(timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
#[instrument(skip(self, timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
async fn handle_get_rel_exists_request(
&self,
timeline: &Timeline,
req: &PagestreamExistsRequest,
) -> Result<PagestreamBeMessage> {
@@ -527,8 +521,9 @@ impl PageServerHandler {
}))
}
#[instrument(skip(timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
#[instrument(skip(self, timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
async fn handle_get_nblocks_request(
&self,
timeline: &Timeline,
req: &PagestreamNblocksRequest,
) -> Result<PagestreamBeMessage> {
@@ -543,8 +538,9 @@ impl PageServerHandler {
}))
}
#[instrument(skip(timeline, req), fields(dbnode = %req.dbnode, req_lsn = %req.lsn))]
#[instrument(skip(self, timeline, req), fields(dbnode = %req.dbnode, req_lsn = %req.lsn))]
async fn handle_db_size_request(
&self,
timeline: &Timeline,
req: &PagestreamDbSizeRequest,
) -> Result<PagestreamBeMessage> {
@@ -562,8 +558,9 @@ impl PageServerHandler {
}))
}
#[instrument(skip(timeline, req), fields(rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn))]
#[instrument(skip(self, timeline, req), fields(rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn))]
async fn handle_get_page_at_lsn_request(
&self,
timeline: &Timeline,
req: &PagestreamGetPageRequest,
) -> Result<PagestreamBeMessage> {
@@ -582,7 +579,7 @@ impl PageServerHandler {
// FIXME: this profiling now happens at different place than it used to. The
// current profiling is based on a thread-local variable, so it doesn't work
// across awaits
//let _profiling_guard = profpoint_start(self.conf, ProfilingConfig::PageRequests);
let _profiling_guard = profpoint_start(self.conf, ProfilingConfig::PageRequests);
let page = timeline.get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest)?;
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
@@ -590,8 +587,9 @@ impl PageServerHandler {
}))
}
#[instrument(skip(pgb))]
#[instrument(skip(self, pgb))]
async fn handle_basebackup_request(
&self,
pgb: &mut PostgresBackend,
tenant_id: TenantId,
timeline_id: TimelineId,
@@ -724,7 +722,7 @@ impl postgres_backend_async::Handler for PageServerHandler {
};
// Check that the timeline exists
Self::handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, None, false)
self.handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, None, false)
.await?;
pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
}
@@ -784,7 +782,7 @@ impl postgres_backend_async::Handler for PageServerHandler {
self.check_permission(Some(tenant_id))?;
// Check that the timeline exists
Self::handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, prev_lsn, true)
self.handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, prev_lsn, true)
.await?;
pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("import basebackup ") {

View File

@@ -74,7 +74,6 @@ where
};
dstbuf.clear();
dstbuf.reserve(len);
// Read the payload
let mut remain = len;

View File

@@ -260,9 +260,8 @@ impl Layer for DeltaLayer {
// Ok, 'offsets' now contains the offsets of all the entries we need to read
let mut cursor = file.block_cursor();
let mut buf = Vec::new();
for (entry_lsn, pos) in offsets {
cursor.read_blob_into_buf(pos, &mut buf).with_context(|| {
let buf = cursor.read_blob(pos).with_context(|| {
format!(
"Failed to read blob from virtual file {}",
file.file.path.display()

View File

@@ -21,7 +21,6 @@
use byteorder::{ByteOrder, LittleEndian};
use bytes::{BufMut, Bytes, BytesMut};
use nix::poll::*;
use once_cell::sync::Lazy;
use serde::Serialize;
use std::fs;
use std::fs::OpenOptions;
@@ -32,8 +31,7 @@ use std::os::unix::prelude::CommandExt;
use std::path::PathBuf;
use std::process::Stdio;
use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Condvar, Mutex};
use std::sync::Mutex;
use std::time::Duration;
use std::time::Instant;
use tracing::*;
@@ -57,9 +55,6 @@ use postgres_ffi::v14::nonrelfile_utils::{
};
use postgres_ffi::BLCKSZ;
/// Maximum number of WAL redo processes to launch for a single tenant.
const MAX_PROCESSES: usize = 4;
///
/// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster.
///
@@ -93,32 +88,18 @@ pub trait WalRedoManager: Send + Sync {
) -> Result<Bytes, WalRedoError>;
}
static WAL_REDO_PROCESS_COUNTER: Lazy<AtomicU64> = Lazy::new(|| { AtomicU64::new(0) });
///
/// This is the real implementation that uses a special Postgres
/// process to perform WAL replay. There is a pool of these processes.
/// This is the real implementation that uses a Postgres process to
/// perform WAL replay. Only one thread can use the process at a time,
/// that is controlled by the Mutex. In the future, we might want to
/// launch a pool of processes to allow concurrent replay of multiple
/// records.
///
pub struct PostgresRedoManager {
tenant_id: TenantId,
conf: &'static PageServerConf,
/// Pool of processes.
process_list: Mutex<ProcessList>,
/// Condition variable that can be used to sleep until a process
/// becomes available in the pool.
condvar: Condvar,
}
// A pool of WAL redo processes
#[derive(Default)]
struct ProcessList {
/// processes that are available for reuse
free_processes: Vec<PostgresRedoProcess>,
/// Total number of processes, including all the processes in
/// 'free_processes' list, and any processes that are in use.
num_processes: usize,
process: Mutex<Option<PostgresRedoProcess>>,
}
/// Can this request be served by neon redo functions
@@ -223,32 +204,7 @@ impl PostgresRedoManager {
PostgresRedoManager {
tenant_id,
conf,
process_list: Mutex::new(ProcessList::default()),
condvar: Condvar::new(),
}
}
// Get a handle to a redo process from the pool.
fn get_process(&self, pg_version: u32) -> Result<PostgresRedoProcess, WalRedoError> {
let mut process_list = self.process_list.lock().unwrap();
loop {
// If there's a free process immediately available, take it.
if let Some(process) = process_list.free_processes.pop() {
return Ok(process);
}
// All processes are in use. If the pool is at its maximum size
// already, wait for a process to become free. Otherwise launch
// a new process.
if process_list.num_processes >= MAX_PROCESSES {
process_list = self.condvar.wait(process_list).unwrap();
continue;
} else {
let process = PostgresRedoProcess::launch(self.conf, &self.tenant_id, pg_version)?;
process_list.num_processes += 1;
return Ok(process);
}
process: Mutex::new(None),
}
}
@@ -268,10 +224,16 @@ impl PostgresRedoManager {
let start_time = Instant::now();
let mut process = self.get_process(pg_version)?;
let mut process_guard = self.process.lock().unwrap();
let lock_time = Instant::now();
// launch the WAL redo process on first use
if process_guard.is_none() {
let p = PostgresRedoProcess::launch(self.conf, &self.tenant_id, pg_version)?;
*process_guard = Some(p);
}
let process = process_guard.as_mut().unwrap();
WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64());
// Relational WAL records are applied using wal-redo-postgres
@@ -304,9 +266,8 @@ impl PostgresRedoManager {
lsn
);
// If something went wrong, don't try to reuse the
// process. Kill it, and next request will launch a new one.
// Otherwise return the process to the pool.
// If something went wrong, don't try to reuse the process. Kill it, and
// next request will launch a new one.
if result.is_err() {
error!(
"error applying {} WAL records ({} bytes) to reconstruct page image at LSN {}",
@@ -314,14 +275,8 @@ impl PostgresRedoManager {
nbytes,
lsn
);
let process = process_guard.take().unwrap();
process.kill();
let mut process_list = self.process_list.lock().unwrap();
process_list.num_processes -= 1;
self.condvar.notify_one();
} else {
let mut process_list = self.process_list.lock().unwrap();
process_list.free_processes.push(process);
self.condvar.notify_one();
}
result
}
@@ -639,10 +594,11 @@ impl PostgresRedoProcess {
tenant_id: &TenantId,
pg_version: u32,
) -> Result<PostgresRedoProcess, Error> {
// We need a dummy Postgres cluster to run the process in.
let processno = WAL_REDO_PROCESS_COUNTER.fetch_add(1, Ordering::Relaxed);
// FIXME: We need a dummy Postgres cluster to run the process in. Currently, we
// just create one with constant name. That fails if you try to launch more than
// one WAL redo manager concurrently.
let datadir = path_with_suffix_extension(
conf.tenant_path(tenant_id).join(format!("wal-redo-datadir-{}", processno)),
conf.tenant_path(tenant_id).join("wal-redo-datadir"),
TEMP_FILE_SUFFIX,
);
@@ -769,11 +725,7 @@ impl PostgresRedoProcess {
// This could be problematic if there are millions of records to replay,
// but in practice the number of records is usually so small that it doesn't
// matter, and it's better to keep this code simple.
//
// Most requests start with a before-image with BLCKSZ bytes, followed by
// by some other WAL records. Start with a buffer that can hold that
// comfortably.
let mut writebuf: Vec<u8> = Vec::with_capacity((BLCKSZ as usize) * 3);
let mut writebuf: Vec<u8> = Vec::new();
build_begin_redo_for_block_msg(tag, &mut writebuf);
if let Some(img) = base_img {
build_push_page_msg(tag, &img, &mut writebuf);

View File

@@ -40,20 +40,8 @@
bool connected = false;
PGconn *pageserver_conn = NULL;
/*
* WaitEventSet containing:
* - WL_SOCKET_READABLE on pageserver_conn,
* - WL_LATCH_SET on MyLatch, and
* - WL_EXIT_ON_PM_DEATH.
*/
WaitEventSet *pageserver_conn_wes = NULL;
char *page_server_connstring_raw;
int flush_every_n_requests = 8;
static void pageserver_flush(void);
static void
pageserver_connect()
{
@@ -70,7 +58,6 @@ pageserver_connect()
PQfinish(pageserver_conn);
pageserver_conn = NULL;
ereport(ERROR,
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
errmsg(NEON_TAG "could not establish connection to pageserver"),
@@ -86,26 +73,22 @@ pageserver_connect()
neon_log(ERROR, "could not send pagestream command to pageserver");
}
pageserver_conn_wes = CreateWaitEventSet(TopMemoryContext, 3);
AddWaitEventToSet(pageserver_conn_wes, WL_LATCH_SET, PGINVALID_SOCKET,
MyLatch, NULL);
AddWaitEventToSet(pageserver_conn_wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
NULL, NULL);
AddWaitEventToSet(pageserver_conn_wes, WL_SOCKET_READABLE, PQsocket(pageserver_conn), NULL, NULL);
while (PQisBusy(pageserver_conn))
{
int wc;
WaitEvent event;
/* Sleep until there's something to do */
wc = WaitEventSetWait(pageserver_conn_wes, -1L, &event, 1, PG_WAIT_EXTENSION);
wc = WaitLatchOrSocket(MyLatch,
WL_LATCH_SET | WL_SOCKET_READABLE |
WL_EXIT_ON_PM_DEATH,
PQsocket(pageserver_conn),
-1L, PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
/* Data available in socket? */
if (event.events & WL_SOCKET_READABLE)
if (wc & WL_SOCKET_READABLE)
{
if (!PQconsumeInput(pageserver_conn))
{
@@ -113,7 +96,6 @@ pageserver_connect()
PQfinish(pageserver_conn);
pageserver_conn = NULL;
FreeWaitEventSet(pageserver_conn_wes);
neon_log(ERROR, "could not complete handshake with pageserver: %s",
msg);
@@ -130,30 +112,33 @@ pageserver_connect()
* A wrapper around PQgetCopyData that checks for interrupts while sleeping.
*/
static int
call_PQgetCopyData(char **buffer)
call_PQgetCopyData(PGconn *conn, char **buffer)
{
int ret;
retry:
ret = PQgetCopyData(pageserver_conn, buffer, 1 /* async */ );
ret = PQgetCopyData(conn, buffer, 1 /* async */ );
if (ret == 0)
{
int wc;
WaitEvent event;
/* Sleep until there's something to do */
wc = WaitEventSetWait(pageserver_conn_wes, -1L, &event, 1, PG_WAIT_EXTENSION);
wc = WaitLatchOrSocket(MyLatch,
WL_LATCH_SET | WL_SOCKET_READABLE |
WL_EXIT_ON_PM_DEATH,
PQsocket(conn),
-1L, PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
/* Data available in socket? */
if (event.events & WL_SOCKET_READABLE)
if (wc & WL_SOCKET_READABLE)
{
if (!PQconsumeInput(pageserver_conn))
if (!PQconsumeInput(conn))
neon_log(ERROR, "could not get response from pageserver: %s",
PQerrorMessage(pageserver_conn));
PQerrorMessage(conn));
}
goto retry;
@@ -179,11 +164,7 @@ pageserver_disconnect(void)
PQfinish(pageserver_conn);
pageserver_conn = NULL;
connected = false;
prefetch_on_ps_disconnect();
}
if (pageserver_conn_wes != NULL)
FreeWaitEventSet(pageserver_conn_wes);
}
static void
@@ -193,7 +174,11 @@ pageserver_send(NeonRequest * request)
/* If the connection was lost for some reason, reconnect */
if (connected && PQstatus(pageserver_conn) == CONNECTION_BAD)
pageserver_disconnect();
{
PQfinish(pageserver_conn);
pageserver_conn = NULL;
connected = false;
}
if (!connected)
pageserver_connect();
@@ -235,7 +220,7 @@ pageserver_receive(void)
PG_TRY();
{
/* read response */
resp_buff.len = call_PQgetCopyData(&resp_buff.data);
resp_buff.len = call_PQgetCopyData(pageserver_conn, &resp_buff.data);
resp_buff.cursor = 0;
if (resp_buff.len < 0)
@@ -270,11 +255,7 @@ pageserver_receive(void)
static void
pageserver_flush(void)
{
if (!connected)
{
neon_log(WARNING, "Tried to flush while disconnected");
}
else if (PQflush(pageserver_conn))
if (PQflush(pageserver_conn))
{
char *msg = PQerrorMessage(pageserver_conn);
@@ -283,7 +264,16 @@ pageserver_flush(void)
}
}
static NeonResponse *
pageserver_call(NeonRequest * request)
{
pageserver_send(request);
pageserver_flush();
return pageserver_receive();
}
page_server_api api = {
.request = pageserver_call,
.send = pageserver_send,
.flush = pageserver_flush,
.receive = pageserver_receive
@@ -437,14 +427,6 @@ pg_init_libpagestore(void)
PGC_SIGHUP,
GUC_UNIT_MB,
NULL, NULL, NULL);
DefineCustomIntVariable("neon.flush_output_after",
"Flush the output buffer after every N unflushed requests",
NULL,
&flush_every_n_requests,
8, -1, INT_MAX,
PGC_USERSET,
0, /* no flags required */
NULL, NULL, NULL);
relsize_hash_init();

View File

@@ -115,8 +115,6 @@ typedef struct
char page[FLEXIBLE_ARRAY_MEMBER];
} NeonGetPageResponse;
#define PS_GETPAGERESPONSE_SIZE (MAXALIGN(offsetof(NeonGetPageResponse, page) + BLCKSZ))
typedef struct
{
NeonMessageTag tag;
@@ -140,19 +138,15 @@ extern char *nm_to_string(NeonMessage * msg);
typedef struct
{
NeonResponse *(*request) (NeonRequest * request);
void (*send) (NeonRequest * request);
NeonResponse *(*receive) (void);
void (*flush) (void);
} page_server_api;
extern void prefetch_on_ps_disconnect(void);
extern page_server_api * page_server;
extern char *page_server_connstring;
extern int flush_every_n_requests;
extern bool seqscan_prefetch_enabled;
extern int seqscan_prefetch_distance;
extern char *neon_timeline;
extern char *neon_tenant;
extern bool wal_redo;
@@ -173,6 +167,7 @@ extern void neon_extend(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, char *buffer, bool skipFsync);
extern bool neon_prefetch(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum);
extern void neon_reset_prefetch(SMgrRelation reln);
extern void neon_read(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
char *buffer);

View File

@@ -49,20 +49,22 @@
#include "access/xlog.h"
#include "access/xloginsert.h"
#include "access/xlog_internal.h"
#include "access/xlogdefs.h"
#include "catalog/pg_class.h"
#include "common/hashfn.h"
#include "pagestore_client.h"
#include "pagestore_client.h"
#include "storage/smgr.h"
#include "access/xlogdefs.h"
#include "postmaster/interrupt.h"
#include "postmaster/autovacuum.h"
#include "replication/walsender.h"
#include "storage/bufmgr.h"
#include "storage/relfilenode.h"
#include "storage/buf_internals.h"
#include "storage/smgr.h"
#include "storage/md.h"
#include "fmgr.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "catalog/pg_tablespace_d.h"
#include "postmaster/autovacuum.h"
#if PG_VERSION_NUM >= 150000
#include "access/xlogutils.h"
@@ -111,491 +113,48 @@ typedef enum
static SMgrRelation unlogged_build_rel = NULL;
static UnloggedBuildPhase unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
/*
* Prefetch implementation:
*
* Prefetch is performed locally by each backend.
*
* There can be up to READ_BUFFER_SIZE active IO requests registered at any
* time. Requests using smgr_prefetch are sent to the pageserver, but we don't
* wait on the response. Requests using smgr_read are either read from the
* buffer, or (if that's not possible) we wait on the response to arrive -
* this also will allow us to receive other prefetched pages.
* Each request is immediately written to the output buffer of the pageserver
* connection, but may not be flushed if smgr_prefetch is used: pageserver
* flushes sent requests on manual flush, or every neon.flush_output_after
* unflushed requests; which is not necessarily always and all the time.
*
* Once we have received a response, this value will be stored in the response
* buffer, indexed in a hash table. This allows us to retain our buffered
* prefetch responses even when we have cache misses.
*
* Reading of prefetch responses is delayed until them are actually needed
* (smgr_read). In case of prefetch miss or any other SMGR request other than
* smgr_read, all prefetch responses in the pipeline will need to be read from
* the connection; the responses are stored for later use.
*
* NOTE: The current implementation of the prefetch system implements a ring
* buffer of up to READ_BUFFER_SIZE requests. If there are more _read and
* _prefetch requests between the initial _prefetch and the _read of a buffer,
* the prefetch request will have been dropped from this prefetch buffer, and
* your prefetch was wasted.
* There can be up to MAX_PREFETCH_REQUESTS registered using smgr_prefetch
* before smgr_read. All this requests are appended to primary smgr_read request.
* It is assumed that pages will be requested in prefetch order.
* Reading of prefetch responses is delayed until them are actually needed (smgr_read).
* It make it possible to parallelize processing and receiving of prefetched pages.
* In case of prefetch miss or any other SMGR request other than smgr_read,
* all prefetch responses has to be consumed.
*/
/* Max amount of tracked buffer reads */
#define READ_BUFFER_SIZE 128
#define MAX_PREFETCH_REQUESTS 128
typedef enum PrefetchStatus {
PRFS_UNUSED = 0, /* unused slot */
PRFS_REQUESTED, /* request was written to the sendbuffer to PS, but not
* necessarily flushed.
* all fields except response valid */
PRFS_RECEIVED, /* all fields valid */
PRFS_TAG_REMAINS, /* only buftag and my_ring_index are still valid */
} PrefetchStatus;
BufferTag prefetch_requests[MAX_PREFETCH_REQUESTS];
BufferTag prefetch_responses[MAX_PREFETCH_REQUESTS];
int n_prefetch_requests;
int n_prefetch_responses;
int n_prefetched_buffers;
int n_prefetch_hits;
int n_prefetch_misses;
XLogRecPtr prefetch_lsn;
typedef struct PrefetchRequest {
BufferTag buftag; /* must be first entry in the struct */
XLogRecPtr effective_request_lsn;
NeonResponse *response; /* may be null */
PrefetchStatus status;
uint64 my_ring_index;
} PrefetchRequest;
/* prefetch buffer lookup hash table */
typedef struct PrfHashEntry {
PrefetchRequest *slot;
uint32 status;
uint32 hash;
} PrfHashEntry;
#define SH_PREFIX prfh
#define SH_ELEMENT_TYPE PrfHashEntry
#define SH_KEY_TYPE PrefetchRequest *
#define SH_KEY slot
#define SH_STORE_HASH
#define SH_GET_HASH(tb, a) ((a)->hash)
#define SH_HASH_KEY(tb, key) hash_bytes( \
((const unsigned char *) &(key)->buftag), \
sizeof(BufferTag) \
)
#define SH_EQUAL(tb, a, b) (BUFFERTAGS_EQUAL((a)->buftag, (b)->buftag))
#define SH_SCOPE static inline
#define SH_DEFINE
#define SH_DECLARE
#include "lib/simplehash.h"
/*
* PrefetchState maintains the state of (prefetch) getPage@LSN requests.
* It maintains a (ring) buffer of in-flight requests and responses.
*
* We maintain several indexes into the ring buffer:
* ring_unused >= ring_flush >= ring_receive >= ring_last >= 0
*
* ring_unused points to the first unused slot of the buffer
* ring_receive is the next request that is to be received
* ring_last is the oldest received entry in the buffer
*
* Apart from being an entry in the ring buffer of prefetch requests, each
* PrefetchRequest that is not UNUSED is indexed in prf_hash by buftag.
*/
typedef struct PrefetchState {
MemoryContext bufctx; /* context for prf_buffer[].response allocations */
MemoryContext errctx; /* context for prf_buffer[].response allocations */
MemoryContext hashctx; /* context for prf_buffer */
/* buffer indexes */
uint64 ring_unused; /* first unused slot */
uint64 ring_flush; /* next request to flush */
uint64 ring_receive; /* next slot that is to receive a response */
uint64 ring_last; /* min slot with a response value */
/* metrics / statistics */
int n_responses_buffered; /* count of PS responses not yet in buffers */
int n_requests_inflight; /* count of PS requests considered in flight */
int n_unused; /* count of buffers < unused, > last, that are also unused */
/* the buffers */
prfh_hash *prf_hash;
PrefetchRequest prf_buffer[READ_BUFFER_SIZE]; /* prefetch buffers */
} PrefetchState;
PrefetchState *MyPState;
int n_prefetch_hits = 0;
int n_prefetch_misses = 0;
int n_prefetch_missed_caches = 0;
int n_prefetch_dupes = 0;
XLogRecPtr prefetch_lsn = 0;
static void consume_prefetch_responses(void);
static uint64 prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_lsn);
static void prefetch_read(PrefetchRequest *slot);
static void prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force_lsn);
static void prefetch_wait_for(uint64 ring_index);
static void prefetch_cleanup(void);
static inline void prefetch_set_unused(uint64 ring_index, bool hash_cleanup);
static XLogRecPtr neon_get_request_lsn(bool *latest, RelFileNode rnode,
ForkNumber forknum, BlockNumber blkno);
/*
* Make sure that there are no responses still in the buffer.
*/
static void
consume_prefetch_responses(void)
{
if (MyPState->ring_receive < MyPState->ring_unused)
prefetch_wait_for(MyPState->ring_unused - 1);
}
static void
prefetch_cleanup(void)
{
int index;
uint64 ring_index;
PrefetchRequest *slot;
while (MyPState->ring_last < MyPState->ring_receive) {
ring_index = MyPState->ring_last;
index = (ring_index % READ_BUFFER_SIZE);
slot = &MyPState->prf_buffer[index];
if (slot->status == PRFS_UNUSED)
MyPState->ring_last += 1;
else
break;
}
}
/*
* Wait for slot of ring_index to have received its response.
* The caller is responsible for making sure the request buffer is flushed.
*/
static void
prefetch_wait_for(uint64 ring_index)
{
int index;
PrefetchRequest *entry;
Assert(MyPState->ring_unused > ring_index);
while (MyPState->ring_receive <= ring_index)
for (int i = n_prefetched_buffers; i < n_prefetch_responses; i++)
{
index = (MyPState->ring_receive % READ_BUFFER_SIZE);
entry = &MyPState->prf_buffer[index];
NeonResponse *resp = page_server->receive();
Assert(entry->status == PRFS_REQUESTED);
prefetch_read(entry);
pfree(resp);
}
}
/*
* Read the response of a prefetch request into its slot.
*
* The caller is responsible for making sure that the request for this buffer
* was flushed to the PageServer.
*/
static void
prefetch_read(PrefetchRequest *slot)
{
NeonResponse *response;
MemoryContext old;
Assert(slot->status == PRFS_REQUESTED);
Assert(slot->response == NULL);
Assert(slot->my_ring_index == MyPState->ring_receive);
old = MemoryContextSwitchTo(MyPState->errctx);
response = (NeonResponse *) page_server->receive();
MemoryContextSwitchTo(old);
/* update prefetch state */
MyPState->n_responses_buffered += 1;
MyPState->n_requests_inflight -= 1;
MyPState->ring_receive += 1;
/* update slot state */
slot->status = PRFS_RECEIVED;
slot->response = response;
}
/*
* Disconnect hook - drop prefetches when the connection drops
*
* If we don't remove the failed prefetches, we'd be serving incorrect
* data to the smgr.
*/
void
prefetch_on_ps_disconnect(void)
{
for (; MyPState->ring_receive < MyPState->ring_unused; MyPState->ring_receive++)
{
PrefetchRequest *slot;
int index = MyPState->ring_receive % READ_BUFFER_SIZE;
slot = &MyPState->prf_buffer[index];
Assert(slot->status == PRFS_REQUESTED);
Assert(slot->my_ring_index == MyPState->ring_receive);
/* clean up the request */
slot->status = PRFS_TAG_REMAINS;
MyPState->n_requests_inflight--;
prefetch_set_unused(MyPState->ring_receive, true);
}
}
/*
* prefetch_set_unused() - clear a received prefetch slot
*
* The slot at ring_index must be a current member of the ring buffer,
* and may not be in the PRFS_REQUESTED state.
*/
static inline void
prefetch_set_unused(uint64 ring_index, bool hash_cleanup)
{
PrefetchRequest *slot = &MyPState->prf_buffer[ring_index % READ_BUFFER_SIZE];
Assert(MyPState->ring_last <= ring_index &&
MyPState->ring_unused > ring_index);
if (slot->status == PRFS_UNUSED)
return;
Assert(slot->status == PRFS_RECEIVED || slot->status == PRFS_TAG_REMAINS);
Assert(ring_index >= MyPState->ring_last &&
ring_index < MyPState->ring_unused);
if (slot->status == PRFS_RECEIVED)
{
pfree(slot->response);
slot->response = NULL;
MyPState->n_responses_buffered -= 1;
MyPState->n_unused += 1;
}
else
{
Assert(slot->response == NULL);
}
if (hash_cleanup)
prfh_delete(MyPState->prf_hash, slot);
/* clear all fields */
MemSet(slot, 0, sizeof(PrefetchRequest));
slot->status = PRFS_UNUSED;
/* run cleanup if we're holding back ring_last */
if (MyPState->ring_last == ring_index)
prefetch_cleanup();
}
static void
prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force_lsn)
{
NeonGetPageRequest request = {
.req.tag = T_NeonGetPageRequest,
.req.latest = false,
.req.lsn = 0,
.rnode = slot->buftag.rnode,
.forknum = slot->buftag.forkNum,
.blkno = slot->buftag.blockNum,
};
if (force_lsn && force_latest)
{
request.req.lsn = *force_lsn;
request.req.latest = *force_latest;
slot->effective_request_lsn = *force_lsn;
}
else
{
XLogRecPtr lsn = neon_get_request_lsn(
&request.req.latest,
slot->buftag.rnode,
slot->buftag.forkNum,
slot->buftag.blockNum
);
/*
* Note: effective_request_lsn is potentially higher than the requested
* LSN, but still correct:
*
* We know there are no changes between the actual requested LSN and
* the value of effective_request_lsn: If there were, the page would
* have been in cache and evicted between those LSN values, which
* then would have had to result in a larger request LSN for this page.
*
* It is possible that a concurrent backend loads the page, modifies
* it and then evicts it again, but the LSN of that eviction cannot be
* smaller than the current WAL insert/redo pointer, which is already
* larger than this prefetch_lsn. So in any case, that would
* invalidate this cache.
*
* The best LSN to use for effective_request_lsn would be
* XLogCtl->Insert.RedoRecPtr, but that's expensive to access.
*/
request.req.lsn = lsn;
prefetch_lsn = Max(prefetch_lsn, lsn);
slot->effective_request_lsn = prefetch_lsn;
}
Assert(slot->response == NULL);
Assert(slot->my_ring_index == MyPState->ring_unused);
page_server->send((NeonRequest *) &request);
/* update prefetch state */
MyPState->n_requests_inflight += 1;
MyPState->n_unused -= 1;
MyPState->ring_unused += 1;
/* update slot state */
slot->status = PRFS_REQUESTED;
}
/*
* prefetch_register_buffer() - register and prefetch buffer
*
* Register that we may want the contents of BufferTag in the near future.
*
* If force_latest and force_lsn are not NULL, those values are sent to the
* pageserver. If they are NULL, we utilize the lastWrittenLsn -infrastructure
* to fill in these values manually.
*/
static uint64
prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_lsn)
{
int index;
bool found;
uint64 ring_index;
PrefetchRequest req;
PrefetchRequest *slot;
PrfHashEntry *entry;
/* use an intermediate PrefetchRequest struct to ensure correct alignment */
req.buftag = tag;
entry = prfh_lookup(MyPState->prf_hash, (PrefetchRequest *) &req);
if (entry != NULL)
{
slot = entry->slot;
ring_index = slot->my_ring_index;
index = (ring_index % READ_BUFFER_SIZE);
Assert(slot == &MyPState->prf_buffer[index]);
Assert(slot->status != PRFS_UNUSED);
Assert(BUFFERTAGS_EQUAL(slot->buftag, tag));
/*
* If we want a specific lsn, we do not accept requests that were made
* with a potentially different LSN.
*/
if (force_lsn && slot->effective_request_lsn != *force_lsn)
{
prefetch_wait_for(ring_index);
prefetch_set_unused(ring_index, true);
}
/*
* We received a prefetch for a page that was recently read and
* removed from the buffers. Remove that request from the buffers.
*/
else if (slot->status == PRFS_TAG_REMAINS)
{
prefetch_set_unused(ring_index, true);
}
else
{
/* The buffered request is good enough, return that index */
n_prefetch_dupes++;
return ring_index;
}
}
/*
* If the prefetch queue is full, we need to make room by clearing the
* oldest slot. If the oldest slot holds a buffer that was already
* received, we can just throw it away; we fetched the page unnecessarily
* in that case. If the oldest slot holds a request that we haven't
* received a response for yet, we have to wait for the response to that
* before we can continue. We might not have even flushed the request to
* the pageserver yet, it might be just sitting in the output buffer. In
* that case, we flush it and wait for the response. (We could decide not
* to send it, but it's hard to abort when the request is already in the
* output buffer, and 'not sending' a prefetch request kind of goes
* against the principles of prefetching)
*/
if (MyPState->ring_last + READ_BUFFER_SIZE - 1 == MyPState->ring_unused)
{
slot = &MyPState->prf_buffer[(MyPState->ring_last % READ_BUFFER_SIZE)];
Assert(slot->status != PRFS_UNUSED);
/* We have the slot for ring_last, so that must still be in progress */
switch (slot->status)
{
case PRFS_REQUESTED:
Assert(MyPState->ring_receive == MyPState->ring_last);
prefetch_wait_for(MyPState->ring_last);
prefetch_set_unused(MyPState->ring_last, true);
break;
case PRFS_RECEIVED:
case PRFS_TAG_REMAINS:
prefetch_set_unused(MyPState->ring_last, true);
break;
default:
pg_unreachable();
}
}
/*
* The next buffer pointed to by `ring_unused` is now unused, so we can insert
* the new request to it.
*/
ring_index = MyPState->ring_unused;
index = (ring_index % READ_BUFFER_SIZE);
slot = &MyPState->prf_buffer[index];
Assert(MyPState->ring_last <= ring_index);
Assert(slot->status == PRFS_UNUSED);
/*
* We must update the slot data before insertion, because the hash
* function reads the buffer tag from the slot.
*/
slot->buftag = tag;
slot->my_ring_index = ring_index;
prfh_insert(MyPState->prf_hash, slot, &found);
Assert(!found);
prefetch_do_request(slot, force_latest, force_lsn);
Assert(slot->status == PRFS_REQUESTED);
Assert(ring_index < MyPState->ring_unused);
if (flush_every_n_requests > 0 && MyPState->ring_unused - MyPState->ring_flush >= flush_every_n_requests)
{
page_server->flush();
MyPState->ring_flush = MyPState->ring_unused;
}
return ring_index;
n_prefetched_buffers = 0;
n_prefetch_responses = 0;
}
static NeonResponse *
page_server_request(void const *req)
{
page_server->send((NeonRequest *) req);
page_server->flush();
MyPState->ring_flush = MyPState->ring_unused;
consume_prefetch_responses();
return page_server->receive();
return page_server->request((NeonRequest *) req);
}
@@ -709,15 +268,12 @@ nm_unpack_response(StringInfo s)
case T_NeonGetPageResponse:
{
NeonGetPageResponse *msg_resp;
NeonGetPageResponse *msg_resp = palloc0(offsetof(NeonGetPageResponse, page) + BLCKSZ);
msg_resp = MemoryContextAllocZero(MyPState->bufctx, PS_GETPAGERESPONSE_SIZE);
msg_resp->tag = tag;
/* XXX: should be varlena */
memcpy(msg_resp->page, pq_getmsgbytes(s, BLCKSZ), BLCKSZ);
pq_getmsgend(s);
Assert(msg_resp->tag == T_NeonGetPageResponse);
resp = (NeonResponse *) msg_resp;
break;
@@ -1061,32 +617,7 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, ch
void
neon_init(void)
{
HASHCTL info;
if (MyPState != NULL)
return;
MyPState = MemoryContextAllocZero(TopMemoryContext, sizeof(PrefetchState));
MyPState->n_unused = READ_BUFFER_SIZE;
MyPState->bufctx = SlabContextCreate(TopMemoryContext,
"NeonSMGR/prefetch",
SLAB_DEFAULT_BLOCK_SIZE * 17,
PS_GETPAGERESPONSE_SIZE);
MyPState->errctx = AllocSetContextCreate(TopMemoryContext,
"NeonSMGR/errors",
ALLOCSET_DEFAULT_SIZES);
MyPState->hashctx = AllocSetContextCreate(TopMemoryContext,
"NeonSMGR/prefetch",
ALLOCSET_DEFAULT_SIZES);
info.keysize = sizeof(BufferTag);
info.entrysize = sizeof(uint64);
MyPState->prf_hash = prfh_create(MyPState->hashctx,
READ_BUFFER_SIZE, NULL);
/* noop */
#ifdef DEBUG_COMPARE_LOCAL
mdinit();
#endif
@@ -1473,17 +1004,27 @@ neon_close(SMgrRelation reln, ForkNumber forknum)
}
/*
* neon_reset_prefetch() -- reoe all previously rgistered prefeth requests
*/
void
neon_reset_prefetch(SMgrRelation reln)
{
n_prefetch_requests = 0;
}
/*
* neon_prefetch() -- Initiate asynchronous read of the specified block of a relation
*/
bool
neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum)
{
uint64 ring_index;
switch (reln->smgr_relpersistence)
{
case 0: /* probably shouldn't happen, but ignore it */
case 0:
/* probably shouldn't happen, but ignore it */
break;
case RELPERSISTENCE_PERMANENT:
break;
@@ -1495,17 +1036,14 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum)
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
}
BufferTag tag = (BufferTag) {
.rnode = reln->smgr_rnode.node,
.forkNum = forknum,
.blockNum = blocknum
};
ring_index = prefetch_register_buffer(tag, NULL, NULL);
Assert(ring_index < MyPState->ring_unused &&
MyPState->ring_last <= ring_index);
if (n_prefetch_requests < MAX_PREFETCH_REQUESTS)
{
prefetch_requests[n_prefetch_requests].rnode = reln->smgr_rnode.node;
prefetch_requests[n_prefetch_requests].forkNum = forknum;
prefetch_requests[n_prefetch_requests].blockNum = blocknum;
n_prefetch_requests += 1;
return true;
}
return false;
}
@@ -1556,77 +1094,81 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
XLogRecPtr request_lsn, bool request_latest, char *buffer)
{
NeonResponse *resp;
BufferTag buftag;
uint64 ring_index;
PrfHashEntry *entry;
PrefetchRequest *slot;
buftag = (BufferTag) {
.rnode = rnode,
.forkNum = forkNum,
.blockNum = blkno,
};
int i;
/*
* Try to find prefetched page in the list of received pages.
* Try to find prefetched page. It is assumed that pages will be requested
* in the same order as them are prefetched, but some other backend may
* load page in shared buffers, so some prefetch responses should be
* skipped.
*/
entry = prfh_lookup(MyPState->prf_hash, (PrefetchRequest *) &buftag);
if (entry != NULL)
for (i = n_prefetched_buffers; i < n_prefetch_responses; i++)
{
if (entry->slot->effective_request_lsn >= prefetch_lsn)
{
slot = entry->slot;
ring_index = slot->my_ring_index;
n_prefetch_hits += 1;
}
else /* the current prefetch LSN is not large enough, so drop the prefetch */
resp = page_server->receive();
if (resp->tag == T_NeonGetPageResponse &&
RelFileNodeEquals(prefetch_responses[i].rnode, rnode) &&
prefetch_responses[i].forkNum == forkNum &&
prefetch_responses[i].blockNum == blkno)
{
char *page = ((NeonGetPageResponse *) resp)->page;
/*
* We can't drop cache for not-yet-received requested items. It is
* unlikely this happens, but it can happen if prefetch distance is
* large enough and a backend didn't consume all prefetch requests.
* Check if prefetched page is still relevant. If it is updated by
* some other backend, then it should not be requested from smgr
* unless it is evicted from shared buffers. In the last case
* last_evicted_lsn should be updated and request_lsn should be
* greater than prefetch_lsn. Maximum with page LSN is used
* because page returned by page server may have LSN either
* greater either smaller than requested.
*/
if (entry->slot->status == PRFS_REQUESTED)
if (Max(prefetch_lsn, PageGetLSN(page)) >= request_lsn)
{
page_server->flush();
MyPState->ring_flush = MyPState->ring_unused;
prefetch_wait_for(entry->slot->my_ring_index);
n_prefetched_buffers = i + 1;
n_prefetch_hits += 1;
n_prefetch_requests = 0;
memcpy(buffer, page, BLCKSZ);
pfree(resp);
return;
}
/* drop caches */
prefetch_set_unused(entry->slot->my_ring_index, true);
n_prefetch_missed_caches += 1;
/* make it look like a prefetch cache miss */
entry = NULL;
}
pfree(resp);
}
n_prefetched_buffers = 0;
n_prefetch_responses = 0;
n_prefetch_misses += 1;
{
NeonGetPageRequest request = {
.req.tag = T_NeonGetPageRequest,
.req.latest = request_latest,
.req.lsn = request_lsn,
.rnode = rnode,
.forknum = forkNum,
.blkno = blkno
};
if (n_prefetch_requests > 0)
{
/* Combine all prefetch requests with primary request */
page_server->send((NeonRequest *) & request);
for (i = 0; i < n_prefetch_requests; i++)
{
request.rnode = prefetch_requests[i].rnode;
request.forknum = prefetch_requests[i].forkNum;
request.blkno = prefetch_requests[i].blockNum;
prefetch_responses[i] = prefetch_requests[i];
page_server->send((NeonRequest *) & request);
}
page_server->flush();
n_prefetch_responses = n_prefetch_requests;
n_prefetch_requests = 0;
prefetch_lsn = request_lsn;
resp = page_server->receive();
}
else
{
resp = page_server->request((NeonRequest *) & request);
}
}
if (entry == NULL)
{
n_prefetch_misses += 1;
ring_index = prefetch_register_buffer(buftag, &request_latest,
&request_lsn);
slot = &MyPState->prf_buffer[(ring_index % READ_BUFFER_SIZE)];
}
Assert(MyPState->ring_last <= ring_index &&
MyPState->ring_unused > ring_index);
Assert(slot->my_ring_index == ring_index);
Assert(slot->status != PRFS_UNUSED);
Assert(&MyPState->prf_buffer[(ring_index % READ_BUFFER_SIZE)] == slot);
if (ring_index >= MyPState->ring_flush)
{
page_server->flush();
MyPState->ring_flush = MyPState->ring_unused;
}
prefetch_wait_for(ring_index);
Assert(slot->status == PRFS_RECEIVED);
resp = slot->response;
switch (resp->tag)
{
case T_NeonGetPageResponse:
@@ -1646,13 +1188,12 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
errdetail("page server returned error: %s",
((NeonErrorResponse *) resp)->message)));
break;
default:
elog(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
}
/* buffer was used, clean up for later reuse */
prefetch_set_unused(ring_index, true);
prefetch_cleanup();
pfree(resp);
}
/*
@@ -2274,6 +1815,7 @@ static const struct f_smgr neon_smgr =
.smgr_unlink = neon_unlink,
.smgr_extend = neon_extend,
.smgr_prefetch = neon_prefetch,
.smgr_reset_prefetch = neon_reset_prefetch,
.smgr_read = neon_read,
.smgr_write = neon_write,
.smgr_writeback = neon_writeback,

179
scripts/strip-useless-debug.py Executable file
View File

@@ -0,0 +1,179 @@
#!/usr/bin/env python3
# Strip useless .debug_pubnames and .debug_pubtypes from all binaries.
# They bloat the binaries, and are not used by modern debuggers anyway.
# This makes the resulting binaries about 30% smaller, and also makes
# the cargo cache smaller.
#
# See also https://github.com/rust-lang/rust/issues/46034
#
# Usage:
# ./scripts/strip-useless-debug.py target
#
#
# Why is this script needed?
# --------------------------
#
# The simplest way to do this would be just:
#
# find target -executable -type f -size +0 | \
# xargs -IPATH objcopy -R .debug_pubnames -R .debug_pubtypes -p PATH
#
# However, objcopy is not very fast, so we want to run it in parallel.
# That would be straightforward to do with the xargs -P option, except
# that the rust target directory contains hard links. Running objcopy
# on multiple paths that are hardlinked to the same underlying file
# doesn't work, because one objcopy could be overwriting the file while
# the other one is trying to read it.
#
# To work around that, this script scans the target directory and
# collects paths of all executables, except that when multiple paths
# point to the same underlying inode, i.e. if two paths are hard links
# to the same file, only one of the paths is collected. Then, it runs
# objcopy on each of the unique files.
#
# There's one more subtle problem with hardlinks. The GNU objcopy man
# page says that:
#
# If you do not specify outfile, objcopy creates a temporary file and
# destructively renames the result with the name of infile.
#
# That is a problem: renaming over the file will create a new inode
# for the path, and leave the other hardlinked paths unchanged. We
# want to modify all the hard linked copies, and we also don't want to
# remove the hard linking, as that saves a lot of space. In testing,
# at least some versions of GNU objcopy seem to actually behave
# differently if the file has hard links, copying over the file
# instead of renaming if it has. So that text in the man page isn't
# totally accurate. But that's hardly something we should rely on:
# llvm-objcopy for example always renames. To avoid that problem, we
# specify a temporary file as the destination, and copy it over the
# original file in this python script. That way, it is independent of
# objcopy's behavior.
import argparse
import asyncio
import os
import time
import shutil
import subprocess
import tempfile
from pathlib import Path
async def main():
parser = argparse.ArgumentParser(
description="Strip useless .debug_pubnames and .debug_putypes sections from binaries",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
"-j", metavar="NUM", default=os.cpu_count(), type=int, help="number of parallel processes"
)
parser.add_argument("target", type=Path, help="target directory")
args = parser.parse_args()
max_parallel_processes = args.j
target_dir = args.target
# Collect list of executables in the target dir. Make note of the inode of
# each path, and only record one path with the same inode. This ensures that
# if there are hard links in the directory tree, we only remember one path
# for each underlying file.
inode_paths = {} # inode -> path dictionary
def onerror(err):
raise err
for currentpath, folders, files in os.walk(target_dir, onerror=onerror):
for file in files:
path = os.path.join(currentpath, file)
if os.access(path, os.X_OK):
stat = os.stat(path)
# If multiple paths ar hardlinked to the same underlying file,
# only we remember the first one that we see. It's arbitrary
# which one we will see first, but that's ok.
#
# Skip empty files while we're at it. There are some .lock files
# in the target directory that are marked as executable, but are
# are binaries so objcopy would complain about them.
if stat.st_size > 0:
prev = inode_paths.get(stat.st_ino)
if prev:
print(f"{path} is a hard link to {prev}, skipping")
else:
inode_paths[stat.st_ino] = path
# This function runs "objcopy -R .debug_pubnames -R .debug_pubtypes" on a file.
#
# Returns (original size, new size)
async def run_objcopy(path) -> (int, int):
stat = os.stat(path)
orig_size = stat.st_size
if orig_size == 0:
return (0, 0)
# Write the output to a temp file first, and then copy it over the original.
# objcopy could modify the file in place, but that's not reliable with hard
# links. (See comment at beginning of this file.)
with tempfile.NamedTemporaryFile() as tmpfile:
cmd = [
"objcopy",
"-R",
".debug_pubnames",
"-R",
".debug_pubtypes",
"-p",
path,
tmpfile.name,
]
proc = await asyncio.create_subprocess_exec(*cmd)
rc = await proc.wait()
if rc != 0:
raise subprocess.CalledProcessError(rc, cmd)
# If the file got smaller, copy it over the original.
# Otherwise keep the original
stat = os.stat(tmpfile.name)
new_size = stat.st_size
if new_size < orig_size:
with open(path, "wb") as orig_dst:
shutil.copyfileobj(tmpfile, orig_dst)
return (orig_size, new_size)
else:
return (orig_size, orig_size)
# convert the inode->path dictionary into plain list of paths.
paths = []
for path in inode_paths.values():
paths.append(path)
# Launch worker processes to process the list of files
before_total = 0
after_total = 0
async def runner_subproc():
nonlocal before_total
nonlocal after_total
while len(paths) > 0:
path = paths.pop()
start_time = time.perf_counter_ns()
(before_size, after_size) = await run_objcopy(path)
end_time = time.perf_counter_ns()
before_total += before_size
after_total += after_size
duration_ms = round((end_time-start_time) / 1000000)
print(f"{path}: {before_size} to {after_size} bytes ({duration_ms} ms)")
active_workers = []
for i in range(max_parallel_processes):
active_workers.append(asyncio.create_task(runner_subproc()))
done, () = await asyncio.wait(active_workers)
# all done!
print(f"total size before {before_total} after: {after_total}")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -65,8 +65,17 @@ BASE_PORT = 15000
WORKER_PORT_NUM = 1000
# These are set in pytest_configure()
base_dir = ""
neon_binpath = ""
pg_distrib_dir = ""
top_output_dir = ""
default_pg_version = ""
def pytest_configure(config):
"""
Ensure that no unwanted daemons are running before we start testing.
Check that we do not overflow available ports range.
"""
@@ -76,89 +85,67 @@ def pytest_configure(config):
): # do not use ephemeral ports
raise Exception("Too many workers configured. Cannot distribute ports for services.")
@pytest.fixture(scope="session")
def base_dir() -> Iterator[Path]:
# find the base directory (currently this is the git root)
base_dir = get_self_dir().parent.parent
global base_dir
base_dir = os.path.normpath(os.path.join(get_self_dir(), "../.."))
log.info(f"base_dir is {base_dir}")
yield base_dir
# Compute the top-level directory for all tests.
global top_output_dir
env_test_output = os.environ.get("TEST_OUTPUT")
if env_test_output is not None:
top_output_dir = env_test_output
else:
top_output_dir = os.path.join(base_dir, DEFAULT_OUTPUT_DIR)
Path(top_output_dir).mkdir(exist_ok=True)
# Find the postgres installation.
global default_pg_version
log.info(f"default_pg_version is {default_pg_version}")
env_default_pg_version = os.environ.get("DEFAULT_PG_VERSION")
if env_default_pg_version:
default_pg_version = env_default_pg_version
log.info(f"default_pg_version is set to {default_pg_version}")
else:
default_pg_version = DEFAULT_PG_VERSION_DEFAULT
global pg_distrib_dir
env_postgres_bin = os.environ.get("POSTGRES_DISTRIB_DIR")
if env_postgres_bin:
pg_distrib_dir = env_postgres_bin
else:
pg_distrib_dir = os.path.normpath(os.path.join(base_dir, "pg_install"))
log.info(f"pg_distrib_dir is {pg_distrib_dir}")
psql_bin_path = os.path.join(pg_distrib_dir, "v{}".format(default_pg_version), "bin/psql")
postgres_bin_path = os.path.join(
pg_distrib_dir, "v{}".format(default_pg_version), "bin/postgres"
)
if os.getenv("REMOTE_ENV"):
# When testing against a remote server, we only need the client binary.
if not os.path.exists(psql_bin_path):
raise Exception('psql not found at "{}"'.format(psql_bin_path))
else:
if not os.path.exists(postgres_bin_path):
raise Exception('postgres not found at "{}"'.format(postgres_bin_path))
@pytest.fixture(scope="session")
def neon_binpath(base_dir: Path) -> Iterator[Path]:
if os.getenv("REMOTE_ENV"):
# we are in remote env and do not have neon binaries locally
# this is the case for benchmarks run on self-hosted runner
return
# Find the neon binaries.
if env_neon_bin := os.environ.get("NEON_BIN"):
binpath = Path(env_neon_bin)
global neon_binpath
env_neon_bin = os.environ.get("NEON_BIN")
if env_neon_bin:
neon_binpath = env_neon_bin
else:
build_type = os.environ.get("BUILD_TYPE", "debug")
binpath = base_dir / "target" / build_type
log.info(f"neon_binpath is {binpath}")
if not (binpath / "pageserver").exists():
raise Exception(f"neon binaries not found at '{binpath}'")
yield binpath
@pytest.fixture(scope="session")
def pg_distrib_dir(base_dir: Path) -> Iterator[Path]:
if env_postgres_bin := os.environ.get("POSTGRES_DISTRIB_DIR"):
distrib_dir = Path(env_postgres_bin).resolve()
else:
distrib_dir = base_dir / "pg_install"
log.info(f"pg_distrib_dir is {distrib_dir}")
yield distrib_dir
@pytest.fixture(scope="session")
def top_output_dir(base_dir: Path) -> Iterator[Path]:
# Compute the top-level directory for all tests.
if env_test_output := os.environ.get("TEST_OUTPUT"):
output_dir = Path(env_test_output).resolve()
else:
output_dir = base_dir / DEFAULT_OUTPUT_DIR
output_dir.mkdir(exist_ok=True)
log.info(f"top_output_dir is {output_dir}")
yield output_dir
@pytest.fixture(scope="session")
def pg_version() -> Iterator[str]:
if env_default_pg_version := os.environ.get("DEFAULT_PG_VERSION"):
version = env_default_pg_version
else:
version = DEFAULT_PG_VERSION_DEFAULT
log.info(f"pg_version is {version}")
yield version
@pytest.fixture(scope="session")
def versioned_pg_distrib_dir(pg_distrib_dir: Path, pg_version: str) -> Iterator[Path]:
versioned_dir = pg_distrib_dir / f"v{pg_version}"
psql_bin_path = versioned_dir / "bin/psql"
postgres_bin_path = versioned_dir / "bin/postgres"
if os.getenv("REMOTE_ENV"):
# When testing against a remote server, we only need the client binary.
if not psql_bin_path.exists():
raise Exception(f"psql not found at '{psql_bin_path}'")
else:
if not postgres_bin_path.exists:
raise Exception(f"postgres not found at '{postgres_bin_path}'")
log.info(f"versioned_pg_distrib_dir is {versioned_dir}")
yield versioned_dir
neon_binpath = os.path.join(base_dir, "target", build_type)
log.info(f"neon_binpath is {neon_binpath}")
if not os.path.exists(os.path.join(neon_binpath, "pageserver")):
raise Exception('neon binaries not found at "{}"'.format(neon_binpath))
def shareable_scope(fixture_name, config) -> Literal["session", "function"]:
@@ -245,18 +232,16 @@ def port_distributor(worker_base_port):
@pytest.fixture(scope="session")
def default_broker(request: Any, port_distributor: PortDistributor, top_output_dir: Path):
def default_broker(request: Any, port_distributor: PortDistributor):
client_port = port_distributor.get_port()
# multiple pytest sessions could get launched in parallel, get them different datadirs
etcd_datadir = get_test_output_dir(request, top_output_dir) / f"etcd_datadir_{client_port}"
etcd_datadir.mkdir(exist_ok=True, parents=True)
etcd_datadir = os.path.join(get_test_output_dir(request), f"etcd_datadir_{client_port}")
Path(etcd_datadir).mkdir(exist_ok=True, parents=True)
broker = Etcd(
datadir=str(etcd_datadir), port=client_port, peer_port=port_distributor.get_port()
)
broker = Etcd(datadir=etcd_datadir, port=client_port, peer_port=port_distributor.get_port())
yield broker
broker.stop()
allure_attach_from_dir(etcd_datadir)
allure_attach_from_dir(Path(etcd_datadir))
@pytest.fixture(scope="session")
@@ -536,9 +521,6 @@ class NeonEnvBuilder:
broker: Etcd,
run_id: uuid.UUID,
mock_s3_server: MockS3Server,
neon_binpath: Path,
pg_distrib_dir: Path,
pg_version: str,
remote_storage: Optional[RemoteStorage] = None,
remote_storage_users: RemoteStorageUsers = RemoteStorageUsers.PAGESERVER,
pageserver_config_override: Optional[str] = None,
@@ -568,9 +550,7 @@ class NeonEnvBuilder:
self.env: Optional[NeonEnv] = None
self.remote_storage_prefix: Optional[str] = None
self.keep_remote_storage_contents: bool = True
self.neon_binpath = neon_binpath
self.pg_distrib_dir = pg_distrib_dir
self.pg_version = pg_version
self.pg_version = default_pg_version
def init(self) -> NeonEnv:
# Cannot create more than one environment from one builder
@@ -786,8 +766,6 @@ class NeonEnv:
self.remote_storage = config.remote_storage
self.remote_storage_users = config.remote_storage_users
self.pg_version = config.pg_version
self.neon_binpath = config.neon_binpath
self.pg_distrib_dir = config.pg_distrib_dir
# generate initial tenant ID here instead of letting 'neon init' generate it,
# so that we don't need to dig it out of the config file afterwards.
@@ -883,7 +861,7 @@ class NeonEnv:
return self.repo_dir / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
def get_pageserver_version(self) -> str:
bin_pageserver = str(self.neon_binpath / "pageserver")
bin_pageserver = os.path.join(str(neon_binpath), "pageserver")
res = subprocess.run(
[bin_pageserver, "--version"],
check=True,
@@ -907,10 +885,6 @@ def _shared_simple_env(
mock_s3_server: MockS3Server,
default_broker: Etcd,
run_id: uuid.UUID,
top_output_dir: Path,
neon_binpath: Path,
pg_distrib_dir: Path,
pg_version: str,
) -> Iterator[NeonEnv]:
"""
# Internal fixture backing the `neon_simple_env` fixture. If TEST_SHARED_FIXTURES
@@ -919,20 +893,17 @@ def _shared_simple_env(
if os.environ.get("TEST_SHARED_FIXTURES") is None:
# Create the environment in the per-test output directory
repo_dir = get_test_output_dir(request, top_output_dir) / "repo"
repo_dir = os.path.join(get_test_output_dir(request), "repo")
else:
# We're running shared fixtures. Share a single directory.
repo_dir = top_output_dir / "shared_repo"
repo_dir = os.path.join(str(top_output_dir), "shared_repo")
shutil.rmtree(repo_dir, ignore_errors=True)
with NeonEnvBuilder(
repo_dir=repo_dir,
repo_dir=Path(repo_dir),
port_distributor=port_distributor,
broker=default_broker,
mock_s3_server=mock_s3_server,
neon_binpath=neon_binpath,
pg_distrib_dir=pg_distrib_dir,
pg_version=pg_version,
run_id=run_id,
) as builder:
env = builder.init_start()
@@ -963,9 +934,6 @@ def neon_env_builder(
test_output_dir,
port_distributor: PortDistributor,
mock_s3_server: MockS3Server,
neon_binpath: Path,
pg_distrib_dir: Path,
pg_version: str,
default_broker: Etcd,
run_id: uuid.UUID,
) -> Iterator[NeonEnvBuilder]:
@@ -990,9 +958,6 @@ def neon_env_builder(
repo_dir=Path(repo_dir),
port_distributor=port_distributor,
mock_s3_server=mock_s3_server,
neon_binpath=neon_binpath,
pg_distrib_dir=pg_distrib_dir,
pg_version=pg_version,
broker=default_broker,
run_id=run_id,
) as builder:
@@ -1275,7 +1240,7 @@ class AbstractNeonCli(abc.ABC):
assert type(arguments) == list
assert type(self.COMMAND) == str
bin_neon = str(self.env.neon_binpath / self.COMMAND)
bin_neon = os.path.join(str(neon_binpath), self.COMMAND)
args = [bin_neon] + arguments
log.info('Running command "{}"'.format(" ".join(args)))
@@ -1283,7 +1248,7 @@ class AbstractNeonCli(abc.ABC):
env_vars = os.environ.copy()
env_vars["NEON_REPO_DIR"] = str(self.env.repo_dir)
env_vars["POSTGRES_DISTRIB_DIR"] = str(self.env.pg_distrib_dir)
env_vars["POSTGRES_DISTRIB_DIR"] = str(pg_distrib_dir)
if self.env.rust_log_override is not None:
env_vars["RUST_LOG"] = self.env.rust_log_override
for (extra_env_key, extra_env_value) in (extra_env_vars or {}).items():
@@ -1758,17 +1723,17 @@ def append_pageserver_param_overrides(
class PgBin:
"""A helper class for executing postgres binaries"""
def __init__(self, log_dir: Path, pg_distrib_dir: Path, pg_version: str):
def __init__(self, log_dir: Path, pg_version: str):
self.log_dir = log_dir
self.pg_version = pg_version
self.pg_bin_path = pg_distrib_dir / f"v{pg_version}" / "bin"
self.pg_lib_dir = pg_distrib_dir / f"v{pg_version}" / "lib"
self.pg_bin_path = os.path.join(str(pg_distrib_dir), "v{}".format(pg_version), "bin")
self.pg_lib_dir = os.path.join(str(pg_distrib_dir), "v{}".format(pg_version), "lib")
self.env = os.environ.copy()
self.env["LD_LIBRARY_PATH"] = str(self.pg_lib_dir)
self.env["LD_LIBRARY_PATH"] = self.pg_lib_dir
def _fixpath(self, command: List[str]):
if "/" not in str(command[0]):
command[0] = str(self.pg_bin_path / command[0])
if "/" not in command[0]:
command[0] = os.path.join(self.pg_bin_path, command[0])
def _build_env(self, env_add: Optional[Env]) -> Env:
if env_add is None:
@@ -1792,7 +1757,7 @@ class PgBin:
"""
self._fixpath(command)
log.info(f"Running command '{' '.join(command)}'")
log.info('Running command "{}"'.format(" ".join(command)))
env = self._build_env(env)
subprocess.run(command, env=env, cwd=cwd, check=True)
@@ -1811,14 +1776,16 @@ class PgBin:
"""
self._fixpath(command)
log.info(f"Running command '{' '.join(command)}'")
log.info('Running command "{}"'.format(" ".join(command)))
env = self._build_env(env)
return subprocess_capture(self.log_dir, command, env=env, cwd=cwd, check=True, **kwargs)
return subprocess_capture(
str(self.log_dir), command, env=env, cwd=cwd, check=True, **kwargs
)
@pytest.fixture(scope="function")
def pg_bin(test_output_dir: Path, pg_distrib_dir: Path, pg_version: str) -> PgBin:
return PgBin(test_output_dir, pg_distrib_dir, pg_version)
def pg_bin(test_output_dir: Path, pg_version: str) -> PgBin:
return PgBin(test_output_dir, pg_version)
class VanillaPostgres(PgProtocol):
@@ -1865,15 +1832,19 @@ class VanillaPostgres(PgProtocol):
self.stop()
@pytest.fixture(scope="session")
def pg_version() -> str:
return default_pg_version
@pytest.fixture(scope="function")
def vanilla_pg(
test_output_dir: Path,
port_distributor: PortDistributor,
pg_distrib_dir: Path,
pg_version: str,
) -> Iterator[VanillaPostgres]:
pgdatadir = test_output_dir / "pgdata-vanilla"
pg_bin = PgBin(test_output_dir, pg_distrib_dir, pg_version)
pg_bin = PgBin(test_output_dir, pg_version)
port = port_distributor.get_port()
with VanillaPostgres(pgdatadir, pg_bin, port) as vanilla_pg:
yield vanilla_pg
@@ -1909,10 +1880,8 @@ class RemotePostgres(PgProtocol):
@pytest.fixture(scope="function")
def remote_pg(
test_output_dir: Path, pg_distrib_dir: Path, pg_version: str
) -> Iterator[RemotePostgres]:
pg_bin = PgBin(test_output_dir, pg_distrib_dir, pg_version)
def remote_pg(test_output_dir: Path, pg_version: str) -> Iterator[RemotePostgres]:
pg_bin = PgBin(test_output_dir, pg_version)
connstr = os.getenv("BENCHMARK_CONNSTR")
if connstr is None:
@@ -1957,18 +1926,10 @@ class PSQL:
class NeonProxy(PgProtocol):
def __init__(
self,
proxy_port: int,
http_port: int,
neon_binpath: Path,
auth_endpoint=None,
mgmt_port=None,
):
def __init__(self, proxy_port: int, http_port: int, auth_endpoint=None, mgmt_port=None):
super().__init__(dsn=auth_endpoint, port=proxy_port)
self.host = "127.0.0.1"
self.http_port = http_port
self.neon_binpath = neon_binpath
self.proxy_port = proxy_port
self.mgmt_port = mgmt_port
self.auth_endpoint = auth_endpoint
@@ -1984,7 +1945,7 @@ class NeonProxy(PgProtocol):
# Start proxy
args = [
str(self.neon_binpath / "proxy"),
os.path.join(neon_binpath, "proxy"),
*["--http", f"{self.host}:{self.http_port}"],
*["--proxy", f"{self.host}:{self.proxy_port}"],
*["--auth-backend", "postgres"],
@@ -2000,7 +1961,7 @@ class NeonProxy(PgProtocol):
assert self._popen is None
# Start proxy
bin_proxy = str(self.neon_binpath / "proxy")
bin_proxy = os.path.join(str(neon_binpath), "proxy")
args = [bin_proxy]
args.extend(["--http", f"{self.host}:{self.http_port}"])
args.extend(["--proxy", f"{self.host}:{self.proxy_port}"])
@@ -2032,18 +1993,18 @@ class NeonProxy(PgProtocol):
@pytest.fixture(scope="function")
def link_proxy(port_distributor, neon_binpath: Path) -> Iterator[NeonProxy]:
def link_proxy(port_distributor) -> Iterator[NeonProxy]:
"""Neon proxy that routes through link auth."""
http_port = port_distributor.get_port()
proxy_port = port_distributor.get_port()
mgmt_port = port_distributor.get_port()
with NeonProxy(proxy_port, http_port, neon_binpath=neon_binpath, mgmt_port=mgmt_port) as proxy:
with NeonProxy(proxy_port, http_port, mgmt_port=mgmt_port) as proxy:
proxy.start_with_link_auth()
yield proxy
@pytest.fixture(scope="function")
def static_proxy(vanilla_pg, port_distributor, neon_binpath: Path) -> Iterator[NeonProxy]:
def static_proxy(vanilla_pg, port_distributor) -> Iterator[NeonProxy]:
"""Neon proxy that routes directly to vanilla postgres."""
# For simplicity, we use the same user for both `--auth-endpoint` and `safe_psql`
@@ -2059,10 +2020,7 @@ def static_proxy(vanilla_pg, port_distributor, neon_binpath: Path) -> Iterator[N
http_port = port_distributor.get_port()
with NeonProxy(
proxy_port=proxy_port,
http_port=http_port,
neon_binpath=neon_binpath,
auth_endpoint=auth_endpoint,
proxy_port=proxy_port, http_port=http_port, auth_endpoint=auth_endpoint
) as proxy:
proxy.start()
yield proxy
@@ -2565,10 +2523,10 @@ class Etcd:
self.handle.wait()
def get_test_output_dir(request: Any, top_output_dir: Path) -> Path:
def get_test_output_dir(request: Any) -> Path:
"""Compute the working directory for an individual test."""
test_name = request.node.name
test_dir = top_output_dir / test_name.replace("/", "-")
test_dir = Path(top_output_dir) / test_name.replace("/", "-")
log.info(f"get_test_output_dir is {test_dir}")
# make mypy happy
assert isinstance(test_dir, Path)
@@ -2585,11 +2543,11 @@ def get_test_output_dir(request: Any, top_output_dir: Path) -> Path:
# this fixture ensures that the directory exists. That works because
# 'autouse' fixtures are run before other fixtures.
@pytest.fixture(scope="function", autouse=True)
def test_output_dir(request: Any, top_output_dir: Path) -> Iterator[Path]:
def test_output_dir(request: Any) -> Iterator[Path]:
"""Create the working directory for an individual test."""
# one directory per test
test_dir = get_test_output_dir(request, top_output_dir)
test_dir = get_test_output_dir(request)
log.info(f"test_output_dir is {test_dir}")
shutil.rmtree(test_dir, ignore_errors=True)
test_dir.mkdir()
@@ -2681,7 +2639,7 @@ def check_restored_datadir_content(
restored_dir_path = env.repo_dir / f"{pg.node_name}_restored_datadir"
restored_dir_path.mkdir(exist_ok=True)
pg_bin = PgBin(test_output_dir, env.pg_distrib_dir, env.pg_version)
pg_bin = PgBin(test_output_dir, env.pg_version)
psql_path = os.path.join(pg_bin.pg_bin_path, "psql")
cmd = rf"""

View File

@@ -15,13 +15,12 @@ from psycopg2.extensions import cursor
Fn = TypeVar("Fn", bound=Callable[..., Any])
def get_self_dir() -> Path:
def get_self_dir() -> str:
"""Get the path to the directory where this script lives."""
# return os.path.dirname(os.path.abspath(__file__))
return Path(__file__).resolve().parent
return os.path.dirname(os.path.abspath(__file__))
def subprocess_capture(capture_dir: Path, cmd: List[str], **kwargs: Any) -> str:
def subprocess_capture(capture_dir: str, cmd: List[str], **kwargs: Any) -> str:
"""Run a process and capture its output
Output will go to files named "cmd_NNN.stdout" and "cmd_NNN.stderr"

View File

@@ -46,9 +46,9 @@ def test_pg_clients(test_output_dir: Path, remote_pg: RemotePostgres, client: st
raise RuntimeError("docker is required for running this test")
build_cmd = [docker_bin, "build", "--tag", image_tag, f"{Path(__file__).parent / client}"]
subprocess_capture(test_output_dir, build_cmd, check=True)
subprocess_capture(str(test_output_dir), build_cmd, check=True)
run_cmd = [docker_bin, "run", "--rm", "--env-file", env_file, image_tag]
basepath = subprocess_capture(test_output_dir, run_cmd, check=True)
basepath = subprocess_capture(str(test_output_dir), run_cmd, check=True)
assert Path(f"{basepath}.stdout").read_text().strip() == "1"

View File

@@ -80,12 +80,7 @@ class PortReplacer(object):
@pytest.mark.order(after="test_prepare_snapshot")
def test_backward_compatibility(
pg_bin: PgBin,
port_distributor: PortDistributor,
test_output_dir: Path,
request: FixtureRequest,
neon_binpath: Path,
pg_distrib_dir: Path,
pg_bin: PgBin, port_distributor: PortDistributor, test_output_dir: Path, request: FixtureRequest
):
compatibility_snapshot_dir = Path(
os.environ.get("COMPATIBILITY_SNAPSHOT_DIR", DEFAILT_LOCAL_SNAPSHOT_DIR)
@@ -175,8 +170,6 @@ def test_backward_compatibility(
config.repo_dir = repo_dir
config.pg_version = "14" # Note: `pg_dumpall` (from pg_bin) version is set by DEFAULT_PG_VERSION_DEFAULT and can be overriden by DEFAULT_PG_VERSION env var
config.initial_tenant = snapshot_config["default_tenant_id"]
config.neon_binpath = neon_binpath
config.pg_distrib_dir = pg_distrib_dir
# Check that we can start the project
cli = NeonCli(config)

View File

@@ -1,8 +1,13 @@
import os
from pathlib import Path
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, PortDistributor, VanillaPostgres
from fixtures.neon_fixtures import (
NeonEnvBuilder,
PgBin,
PortDistributor,
VanillaPostgres,
pg_distrib_dir,
)
from fixtures.types import Lsn, TimelineId
from fixtures.utils import query_scalar, subprocess_capture
@@ -11,10 +16,7 @@ num_rows = 1000
# Ensure that regular postgres can start from fullbackup
def test_fullbackup(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
port_distributor: PortDistributor,
pg_distrib_dir: Path,
neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, port_distributor: PortDistributor
):
env = neon_env_builder.init_start()
@@ -38,7 +40,7 @@ def test_fullbackup(
# Set LD_LIBRARY_PATH in the env properly, otherwise we may use the wrong libpq.
# PgBin sets it automatically, but here we need to pipe psql output to the tar command.
psql_env = {"LD_LIBRARY_PATH": str(pg_distrib_dir / "lib")}
psql_env = {"LD_LIBRARY_PATH": os.path.join(str(pg_distrib_dir), "lib")}
# Get and unpack fullbackup from pageserver
restored_dir_path = env.repo_dir / "restored_datadir"
@@ -47,7 +49,9 @@ def test_fullbackup(
cmd = ["psql", "--no-psqlrc", env.pageserver.connstr(), "-c", query]
result_basepath = pg_bin.run_capture(cmd, env=psql_env)
tar_output_file = result_basepath + ".stdout"
subprocess_capture(env.repo_dir, ["tar", "-xf", tar_output_file, "-C", str(restored_dir_path)])
subprocess_capture(
str(env.repo_dir), ["tar", "-xf", tar_output_file, "-C", str(restored_dir_path)]
)
# HACK
# fullbackup returns neon specific pg_control and first WAL segment

View File

@@ -13,6 +13,7 @@ from fixtures.neon_fixtures import (
NeonEnvBuilder,
PgBin,
Postgres,
pg_distrib_dir,
wait_for_last_record_lsn,
wait_for_upload,
)
@@ -127,7 +128,7 @@ def test_import_from_pageserver_small(pg_bin: PgBin, neon_env_builder: NeonEnvBu
num_rows = 3000
lsn = _generate_data(num_rows, pg)
_import(num_rows, lsn, env, pg_bin, timeline, env.pg_distrib_dir)
_import(num_rows, lsn, env, pg_bin, timeline)
@pytest.mark.timeout(1800)
@@ -155,7 +156,7 @@ def test_import_from_pageserver_multisegment(pg_bin: PgBin, neon_env_builder: Ne
log.info(f"timeline logical size = {logical_size / (1024 ** 2)}MB")
assert logical_size > 1024**3 # = 1GB
tar_output_file = _import(num_rows, lsn, env, pg_bin, timeline, env.pg_distrib_dir)
tar_output_file = _import(num_rows, lsn, env, pg_bin, timeline)
# Check if the backup data contains multiple segment files
cnt_seg_files = 0
@@ -190,12 +191,7 @@ def _generate_data(num_rows: int, pg: Postgres) -> Lsn:
def _import(
expected_num_rows: int,
lsn: Lsn,
env: NeonEnv,
pg_bin: PgBin,
timeline: TimelineId,
pg_distrib_dir: Path,
expected_num_rows: int, lsn: Lsn, env: NeonEnv, pg_bin: PgBin, timeline: TimelineId
) -> str:
"""Test importing backup data to the pageserver.
@@ -209,7 +205,7 @@ def _import(
# Set LD_LIBRARY_PATH in the env properly, otherwise we may use the wrong libpq.
# PgBin sets it automatically, but here we need to pipe psql output to the tar command.
psql_env = {"LD_LIBRARY_PATH": str(pg_distrib_dir / "lib")}
psql_env = {"LD_LIBRARY_PATH": os.path.join(str(pg_distrib_dir), "lib")}
# Get a fullbackup from pageserver
query = f"fullbackup { env.initial_tenant} {timeline} {lsn}"

View File

@@ -1,5 +1,5 @@
import pathlib
import subprocess
from pathlib import Path
from typing import Optional
from fixtures.neon_fixtures import (
@@ -7,18 +7,18 @@ from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
PageserverHttpClient,
neon_binpath,
pg_distrib_dir,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import wait_until
# test that we cannot override node id after init
def test_pageserver_init_node_id(
neon_simple_env: NeonEnv, neon_binpath: Path, pg_distrib_dir: Path
):
def test_pageserver_init_node_id(neon_simple_env: NeonEnv):
repo_dir = neon_simple_env.repo_dir
pageserver_config = repo_dir / "pageserver.toml"
pageserver_bin = neon_binpath / "pageserver"
pageserver_bin = pathlib.Path(neon_binpath) / "pageserver"
def run_pageserver(args):
return subprocess.run(

View File

@@ -1,10 +1,11 @@
#
# This file runs pg_regress-based tests.
#
import os
from pathlib import Path
import pytest
from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
from fixtures.neon_fixtures import NeonEnv, base_dir, check_restored_datadir_content, pg_distrib_dir
# Run the main PostgreSQL regression tests, in src/test/regress.
@@ -12,14 +13,7 @@ from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
# This runs for a long time, especially in debug mode, so use a larger-than-default
# timeout.
@pytest.mark.timeout(1800)
def test_pg_regress(
neon_simple_env: NeonEnv,
test_output_dir: Path,
pg_bin,
capsys,
base_dir: Path,
pg_distrib_dir: Path,
):
def test_pg_regress(neon_simple_env: NeonEnv, test_output_dir: Path, pg_bin, capsys):
env = neon_simple_env
env.neon_cli.create_branch("test_pg_regress", "empty")
@@ -32,20 +26,20 @@ def test_pg_regress(
(runpath / "testtablespace").mkdir(parents=True)
# Compute all the file locations that pg_regress will need.
build_path = pg_distrib_dir / f"build/v{env.pg_version}/src/test/regress"
src_path = base_dir / f"vendor/postgres-v{env.pg_version}/src/test/regress"
bindir = pg_distrib_dir / f"v{env.pg_version}/bin"
schedule = src_path / "parallel_schedule"
pg_regress = build_path / "pg_regress"
build_path = os.path.join(pg_distrib_dir, "build/v{}/src/test/regress").format(env.pg_version)
src_path = os.path.join(base_dir, "vendor/postgres-v{}/src/test/regress").format(env.pg_version)
bindir = os.path.join(pg_distrib_dir, "v{}".format(env.pg_version), "bin")
schedule = os.path.join(src_path, "parallel_schedule")
pg_regress = os.path.join(build_path, "pg_regress")
pg_regress_command = [
str(pg_regress),
pg_regress,
'--bindir=""',
"--use-existing",
f"--bindir={bindir}",
f"--dlpath={build_path}",
f"--schedule={schedule}",
f"--inputdir={src_path}",
"--bindir={}".format(bindir),
"--dlpath={}".format(build_path),
"--schedule={}".format(schedule),
"--inputdir={}".format(src_path),
]
env_vars = {
@@ -72,14 +66,7 @@ def test_pg_regress(
# This runs for a long time, especially in debug mode, so use a larger-than-default
# timeout.
@pytest.mark.timeout(1800)
def test_isolation(
neon_simple_env: NeonEnv,
test_output_dir: Path,
pg_bin,
capsys,
base_dir: Path,
pg_distrib_dir: Path,
):
def test_isolation(neon_simple_env: NeonEnv, test_output_dir: Path, pg_bin, capsys):
env = neon_simple_env
env.neon_cli.create_branch("test_isolation", "empty")
@@ -93,19 +80,21 @@ def test_isolation(
(runpath / "testtablespace").mkdir(parents=True)
# Compute all the file locations that pg_isolation_regress will need.
build_path = pg_distrib_dir / f"build/v{env.pg_version}/src/test/isolation"
src_path = base_dir / f"vendor/postgres-v{env.pg_version}/src/test/isolation"
bindir = pg_distrib_dir / f"v{env.pg_version}/bin"
schedule = src_path / "isolation_schedule"
pg_isolation_regress = build_path / "pg_isolation_regress"
build_path = os.path.join(pg_distrib_dir, "build/v{}/src/test/isolation".format(env.pg_version))
src_path = os.path.join(
base_dir, "vendor/postgres-v{}/src/test/isolation".format(env.pg_version)
)
bindir = os.path.join(pg_distrib_dir, "v{}".format(env.pg_version), "bin")
schedule = os.path.join(src_path, "isolation_schedule")
pg_isolation_regress = os.path.join(build_path, "pg_isolation_regress")
pg_isolation_regress_command = [
str(pg_isolation_regress),
pg_isolation_regress,
"--use-existing",
f"--bindir={bindir}",
f"--dlpath={build_path}",
f"--inputdir={src_path}",
f"--schedule={schedule}",
"--bindir={}".format(bindir),
"--dlpath={}".format(build_path),
"--inputdir={}".format(src_path),
"--schedule={}".format(schedule),
]
env_vars = {
@@ -123,14 +112,7 @@ def test_isolation(
# Run extra Neon-specific pg_regress-based tests. The tests and their
# schedule file are in the sql_regress/ directory.
def test_sql_regress(
neon_simple_env: NeonEnv,
test_output_dir: Path,
pg_bin,
capsys,
base_dir: Path,
pg_distrib_dir: Path,
):
def test_sql_regress(neon_simple_env: NeonEnv, test_output_dir: Path, pg_bin, capsys):
env = neon_simple_env
env.neon_cli.create_branch("test_sql_regress", "empty")
@@ -144,19 +126,19 @@ def test_sql_regress(
# Compute all the file locations that pg_regress will need.
# This test runs neon specific tests
build_path = pg_distrib_dir / f"build/v{env.pg_version}/src/test/regress"
src_path = base_dir / "test_runner/sql_regress"
bindir = pg_distrib_dir / f"v{env.pg_version}/bin"
schedule = src_path / "parallel_schedule"
pg_regress = build_path / "pg_regress"
build_path = os.path.join(pg_distrib_dir, "build/v{}/src/test/regress").format(env.pg_version)
src_path = os.path.join(base_dir, "test_runner/sql_regress")
bindir = os.path.join(pg_distrib_dir, "v{}".format(env.pg_version), "bin")
schedule = os.path.join(src_path, "parallel_schedule")
pg_regress = os.path.join(build_path, "pg_regress")
pg_regress_command = [
str(pg_regress),
pg_regress,
"--use-existing",
f"--bindir={bindir}",
f"--dlpath={build_path}",
f"--schedule={schedule}",
f"--inputdir={src_path}",
"--bindir={}".format(bindir),
"--dlpath={}".format(build_path),
"--schedule={}".format(schedule),
"--inputdir={}".format(src_path),
]
env_vars = {

View File

@@ -1,7 +1,7 @@
import os
import pathlib
import threading
from contextlib import closing, contextmanager
from pathlib import Path
from typing import Any, Dict, Optional, Tuple
import pytest
@@ -14,6 +14,9 @@ from fixtures.neon_fixtures import (
PortDistributor,
Postgres,
assert_no_in_progress_downloads_for_tenant,
base_dir,
neon_binpath,
pg_distrib_dir,
wait_for_last_record_lsn,
wait_for_upload,
)
@@ -27,13 +30,12 @@ def assert_abs_margin_ratio(a: float, b: float, margin_ratio: float):
@contextmanager
def new_pageserver_service(
new_pageserver_dir: Path,
pageserver_bin: Path,
remote_storage_mock_path: Path,
new_pageserver_dir: pathlib.Path,
pageserver_bin: pathlib.Path,
remote_storage_mock_path: pathlib.Path,
pg_port: int,
http_port: int,
broker: Optional[Etcd],
pg_distrib_dir: Path,
):
"""
cannot use NeonPageserver yet because it depends on neon cli
@@ -191,10 +193,10 @@ def switch_pg_to_new_pageserver(
new_pageserver_port: int,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Path:
) -> pathlib.Path:
pg.stop()
pg_config_file_path = Path(pg.config_file_path())
pg_config_file_path = pathlib.Path(pg.config_file_path())
pg_config_file_path.open("a").write(
f"\nneon.pageserver_connstring = 'postgresql://no_user:@localhost:{new_pageserver_port}'"
)
@@ -217,7 +219,7 @@ def switch_pg_to_new_pageserver(
return timeline_to_detach_local_path
def post_migration_check(pg: Postgres, sum_before_migration: int, old_local_path: Path):
def post_migration_check(pg: Postgres, sum_before_migration: int, old_local_path: pathlib.Path):
with pg_cur(pg) as cur:
# check that data is still there
cur.execute("SELECT sum(key) FROM t")
@@ -249,9 +251,7 @@ def post_migration_check(pg: Postgres, sum_before_migration: int, old_local_path
def test_tenant_relocation(
neon_env_builder: NeonEnvBuilder,
port_distributor: PortDistributor,
test_output_dir: Path,
neon_binpath: Path,
base_dir: Path,
test_output_dir,
method: str,
with_load: str,
):
@@ -350,7 +350,7 @@ def test_tenant_relocation(
new_pageserver_pg_port = port_distributor.get_port()
new_pageserver_http_port = port_distributor.get_port()
log.info("new pageserver ports pg %s http %s", new_pageserver_pg_port, new_pageserver_http_port)
pageserver_bin = neon_binpath / "pageserver"
pageserver_bin = pathlib.Path(neon_binpath) / "pageserver"
new_pageserver_http = PageserverHttpClient(
port=new_pageserver_http_port,
@@ -365,7 +365,6 @@ def test_tenant_relocation(
new_pageserver_pg_port,
new_pageserver_http_port,
neon_env_builder.broker,
neon_env_builder.pg_distrib_dir,
):
# Migrate either by attaching from s3 or import/export basebackup
@@ -374,7 +373,7 @@ def test_tenant_relocation(
"poetry",
"run",
"python",
str(base_dir / "scripts/export_import_between_pageservers.py"),
os.path.join(base_dir, "scripts/export_import_between_pageservers.py"),
"--tenant-id",
str(tenant_id),
"--from-host",
@@ -390,9 +389,9 @@ def test_tenant_relocation(
"--to-pg-port",
str(new_pageserver_pg_port),
"--pg-distrib-dir",
str(neon_env_builder.pg_distrib_dir),
pg_distrib_dir,
"--work-dir",
str(test_output_dir),
os.path.join(test_output_dir),
"--tmp-pg-port",
str(port_distributor.get_port()),
]

View File

@@ -338,7 +338,6 @@ def test_timeline_size_metrics(
neon_simple_env: NeonEnv,
test_output_dir: Path,
port_distributor: PortDistributor,
pg_distrib_dir: Path,
pg_version: str,
):
env = neon_simple_env
@@ -383,7 +382,7 @@ def test_timeline_size_metrics(
tl_logical_size_metric = int(matches.group(1))
pgdatadir = test_output_dir / "pgdata-vanilla"
pg_bin = PgBin(test_output_dir, pg_distrib_dir, pg_version)
pg_bin = PgBin(test_output_dir, pg_version)
port = port_distributor.get_port()
with VanillaPostgres(pgdatadir, pg_bin, port) as vanilla_pg:
vanilla_pg.configure([f"port={port}"])

View File

@@ -30,6 +30,7 @@ from fixtures.neon_fixtures import (
SafekeeperHttpClient,
SafekeeperPort,
available_remote_storages,
neon_binpath,
wait_for_last_record_lsn,
wait_for_upload,
)
@@ -796,7 +797,6 @@ class SafekeeperEnv:
repo_dir: Path,
port_distributor: PortDistributor,
pg_bin: PgBin,
neon_binpath: Path,
num_safekeepers: int = 1,
):
self.repo_dir = repo_dir
@@ -808,7 +808,7 @@ class SafekeeperEnv:
)
self.pg_bin = pg_bin
self.num_safekeepers = num_safekeepers
self.bin_safekeeper = str(neon_binpath / "safekeeper")
self.bin_safekeeper = os.path.join(str(neon_binpath), "safekeeper")
self.safekeepers: Optional[List[subprocess.CompletedProcess[Any]]] = None
self.postgres: Optional[ProposerPostgres] = None
self.tenant_id: Optional[TenantId] = None
@@ -911,10 +911,7 @@ class SafekeeperEnv:
def test_safekeeper_without_pageserver(
test_output_dir: str,
port_distributor: PortDistributor,
pg_bin: PgBin,
neon_binpath: Path,
test_output_dir: str, port_distributor: PortDistributor, pg_bin: PgBin
):
# Create the environment in the test-specific output dir
repo_dir = Path(os.path.join(test_output_dir, "repo"))
@@ -923,7 +920,6 @@ def test_safekeeper_without_pageserver(
repo_dir,
port_distributor,
pg_bin,
neon_binpath,
)
with env:

View File

@@ -1,6 +1,14 @@
import os
from pathlib import Path
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, PortDistributor, VanillaPostgres
from fixtures.neon_fixtures import (
NeonEnvBuilder,
PgBin,
PortDistributor,
VanillaPostgres,
base_dir,
pg_distrib_dir,
)
from fixtures.types import TenantId
@@ -9,8 +17,6 @@ def test_wal_restore(
pg_bin: PgBin,
test_output_dir: Path,
port_distributor: PortDistributor,
base_dir: Path,
pg_distrib_dir: Path,
):
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_wal_restore")
@@ -20,13 +26,11 @@ def test_wal_restore(
env.neon_cli.pageserver_stop()
port = port_distributor.get_port()
data_dir = test_output_dir / "pgsql.restored"
with VanillaPostgres(
data_dir, PgBin(test_output_dir, env.pg_distrib_dir, env.pg_version), port
) as restored:
with VanillaPostgres(data_dir, PgBin(test_output_dir, env.pg_version), port) as restored:
pg_bin.run_capture(
[
str(base_dir / "libs/utils/scripts/restore_from_wal.sh"),
str(pg_distrib_dir / f"v{env.pg_version}/bin"),
os.path.join(base_dir, "libs/utils/scripts/restore_from_wal.sh"),
os.path.join(pg_distrib_dir, "v{}".format(env.pg_version), "bin"),
str(test_output_dir / "repo" / "safekeepers" / "sk1" / str(tenant_id) / "*"),
str(data_dir),
str(port),