mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-29 08:10:38 +00:00
Compare commits
2 Commits
parallel-g
...
strip-pubn
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
004b6bbac7 | ||
|
|
d9a14c9521 |
5
.github/workflows/build_and_test.yml
vendored
5
.github/workflows/build_and_test.yml
vendored
@@ -168,6 +168,11 @@ jobs:
|
||||
${cov_prefix} cargo test $CARGO_FLAGS
|
||||
shell: bash -euxo pipefail {0}
|
||||
|
||||
- name: Slim binaries
|
||||
run: |
|
||||
scripts/strip-useless-debug.py -j$(nproc) target
|
||||
shell: bash -euxo pipefail {0}
|
||||
|
||||
- name: Install rust binaries
|
||||
run: |
|
||||
# Install target binaries
|
||||
|
||||
@@ -13,7 +13,6 @@ use anyhow::{bail, ensure, Context, Result};
|
||||
use bytes::Buf;
|
||||
use bytes::Bytes;
|
||||
use futures::{Stream, StreamExt};
|
||||
use futures::stream::FuturesOrdered;
|
||||
use pageserver_api::models::{
|
||||
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
|
||||
PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
|
||||
@@ -40,9 +39,10 @@ use utils::{
|
||||
};
|
||||
|
||||
use crate::basebackup;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::config::{PageServerConf, ProfilingConfig};
|
||||
use crate::import_datadir::import_wal_from_tar;
|
||||
use crate::metrics::{LIVE_CONNECTIONS_COUNT, SMGR_QUERY_TIME};
|
||||
use crate::profiling::profpoint_start;
|
||||
use crate::task_mgr;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::Timeline;
|
||||
@@ -276,10 +276,10 @@ impl PageServerHandler {
|
||||
pgb.write_message(&BeMessage::CopyBothResponse)?;
|
||||
pgb.flush().await?;
|
||||
|
||||
let mut inprogress_requests = FuturesOrdered::new();
|
||||
let metrics = PageRequestMetrics::new(&tenant_id, &timeline_id);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
let msg = tokio::select! {
|
||||
biased;
|
||||
|
||||
_ = task_mgr::shutdown_watcher() => {
|
||||
@@ -288,58 +288,51 @@ impl PageServerHandler {
|
||||
break;
|
||||
}
|
||||
|
||||
response = inprogress_requests.next(), if !inprogress_requests.is_empty() => {
|
||||
let response: Bytes = response.unwrap()?;
|
||||
pgb.write_message(&BeMessage::CopyData(&response))?;
|
||||
pgb.flush().await?;
|
||||
continue;
|
||||
msg = pgb.read_message() => { msg }
|
||||
};
|
||||
|
||||
let copy_data_bytes = match msg? {
|
||||
Some(FeMessage::CopyData(bytes)) => bytes,
|
||||
Some(m) => {
|
||||
bail!("unexpected message: {m:?} during COPY");
|
||||
}
|
||||
None => break, // client disconnected
|
||||
};
|
||||
|
||||
msg = pgb.read_message() => {
|
||||
let copy_data_bytes = match msg? {
|
||||
Some(FeMessage::CopyData(bytes)) => bytes,
|
||||
Some(m) => {
|
||||
bail!("unexpected message: {m:?} during COPY");
|
||||
}
|
||||
None => break, // client disconnected
|
||||
};
|
||||
trace!("query: {copy_data_bytes:?}");
|
||||
|
||||
trace!("query: {copy_data_bytes:?}");
|
||||
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
|
||||
|
||||
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
|
||||
|
||||
let timeline = Arc::clone(&timeline);
|
||||
let task = async move {
|
||||
let response = match neon_fe_msg {
|
||||
PagestreamFeMessage::Exists(req) => {
|
||||
Self::handle_get_rel_exists_request(&timeline, &req).await
|
||||
}
|
||||
PagestreamFeMessage::Nblocks(req) => {
|
||||
Self::handle_get_nblocks_request(&timeline, &req).await
|
||||
}
|
||||
PagestreamFeMessage::GetPage(req) => {
|
||||
Self::handle_get_page_at_lsn_request(&timeline, &req).await
|
||||
}
|
||||
PagestreamFeMessage::DbSize(req) => {
|
||||
Self::handle_db_size_request(&timeline, &req).await
|
||||
}
|
||||
};
|
||||
|
||||
let response = response.unwrap_or_else(|e| {
|
||||
// print the all details to the log with {:#}, but for the client the
|
||||
// error message is enough
|
||||
error!("error reading relation or page version: {:?}", e);
|
||||
PagestreamBeMessage::Error(PagestreamErrorResponse {
|
||||
message: e.to_string(),
|
||||
})
|
||||
});
|
||||
let response: Bytes = response.serialize();
|
||||
response
|
||||
};
|
||||
inprogress_requests.push_back(tokio::spawn(task));
|
||||
continue;
|
||||
let response = match neon_fe_msg {
|
||||
PagestreamFeMessage::Exists(req) => {
|
||||
let _timer = metrics.get_rel_exists.start_timer();
|
||||
self.handle_get_rel_exists_request(&timeline, &req).await
|
||||
}
|
||||
PagestreamFeMessage::Nblocks(req) => {
|
||||
let _timer = metrics.get_rel_size.start_timer();
|
||||
self.handle_get_nblocks_request(&timeline, &req).await
|
||||
}
|
||||
PagestreamFeMessage::GetPage(req) => {
|
||||
let _timer = metrics.get_page_at_lsn.start_timer();
|
||||
self.handle_get_page_at_lsn_request(&timeline, &req).await
|
||||
}
|
||||
PagestreamFeMessage::DbSize(req) => {
|
||||
let _timer = metrics.get_db_size.start_timer();
|
||||
self.handle_db_size_request(&timeline, &req).await
|
||||
}
|
||||
};
|
||||
|
||||
let response = response.unwrap_or_else(|e| {
|
||||
// print the all details to the log with {:#}, but for the client the
|
||||
// error message is enough
|
||||
error!("error reading relation or page version: {:?}", e);
|
||||
PagestreamBeMessage::Error(PagestreamErrorResponse {
|
||||
message: e.to_string(),
|
||||
})
|
||||
});
|
||||
|
||||
pgb.write_message(&BeMessage::CopyData(&response.serialize()))?;
|
||||
pgb.flush().await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -511,8 +504,9 @@ impl PageServerHandler {
|
||||
Ok(lsn)
|
||||
}
|
||||
|
||||
#[instrument(skip(timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
|
||||
#[instrument(skip(self, timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
|
||||
async fn handle_get_rel_exists_request(
|
||||
&self,
|
||||
timeline: &Timeline,
|
||||
req: &PagestreamExistsRequest,
|
||||
) -> Result<PagestreamBeMessage> {
|
||||
@@ -527,8 +521,9 @@ impl PageServerHandler {
|
||||
}))
|
||||
}
|
||||
|
||||
#[instrument(skip(timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
|
||||
#[instrument(skip(self, timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
|
||||
async fn handle_get_nblocks_request(
|
||||
&self,
|
||||
timeline: &Timeline,
|
||||
req: &PagestreamNblocksRequest,
|
||||
) -> Result<PagestreamBeMessage> {
|
||||
@@ -543,8 +538,9 @@ impl PageServerHandler {
|
||||
}))
|
||||
}
|
||||
|
||||
#[instrument(skip(timeline, req), fields(dbnode = %req.dbnode, req_lsn = %req.lsn))]
|
||||
#[instrument(skip(self, timeline, req), fields(dbnode = %req.dbnode, req_lsn = %req.lsn))]
|
||||
async fn handle_db_size_request(
|
||||
&self,
|
||||
timeline: &Timeline,
|
||||
req: &PagestreamDbSizeRequest,
|
||||
) -> Result<PagestreamBeMessage> {
|
||||
@@ -562,8 +558,9 @@ impl PageServerHandler {
|
||||
}))
|
||||
}
|
||||
|
||||
#[instrument(skip(timeline, req), fields(rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn))]
|
||||
#[instrument(skip(self, timeline, req), fields(rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn))]
|
||||
async fn handle_get_page_at_lsn_request(
|
||||
&self,
|
||||
timeline: &Timeline,
|
||||
req: &PagestreamGetPageRequest,
|
||||
) -> Result<PagestreamBeMessage> {
|
||||
@@ -582,7 +579,7 @@ impl PageServerHandler {
|
||||
// FIXME: this profiling now happens at different place than it used to. The
|
||||
// current profiling is based on a thread-local variable, so it doesn't work
|
||||
// across awaits
|
||||
//let _profiling_guard = profpoint_start(self.conf, ProfilingConfig::PageRequests);
|
||||
let _profiling_guard = profpoint_start(self.conf, ProfilingConfig::PageRequests);
|
||||
let page = timeline.get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest)?;
|
||||
|
||||
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
|
||||
@@ -590,8 +587,9 @@ impl PageServerHandler {
|
||||
}))
|
||||
}
|
||||
|
||||
#[instrument(skip(pgb))]
|
||||
#[instrument(skip(self, pgb))]
|
||||
async fn handle_basebackup_request(
|
||||
&self,
|
||||
pgb: &mut PostgresBackend,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
@@ -724,7 +722,7 @@ impl postgres_backend_async::Handler for PageServerHandler {
|
||||
};
|
||||
|
||||
// Check that the timeline exists
|
||||
Self::handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, None, false)
|
||||
self.handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, None, false)
|
||||
.await?;
|
||||
pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
}
|
||||
@@ -784,7 +782,7 @@ impl postgres_backend_async::Handler for PageServerHandler {
|
||||
self.check_permission(Some(tenant_id))?;
|
||||
|
||||
// Check that the timeline exists
|
||||
Self::handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, prev_lsn, true)
|
||||
self.handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, prev_lsn, true)
|
||||
.await?;
|
||||
pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
} else if query_string.starts_with("import basebackup ") {
|
||||
|
||||
@@ -74,7 +74,6 @@ where
|
||||
};
|
||||
|
||||
dstbuf.clear();
|
||||
dstbuf.reserve(len);
|
||||
|
||||
// Read the payload
|
||||
let mut remain = len;
|
||||
|
||||
@@ -260,9 +260,8 @@ impl Layer for DeltaLayer {
|
||||
|
||||
// Ok, 'offsets' now contains the offsets of all the entries we need to read
|
||||
let mut cursor = file.block_cursor();
|
||||
let mut buf = Vec::new();
|
||||
for (entry_lsn, pos) in offsets {
|
||||
cursor.read_blob_into_buf(pos, &mut buf).with_context(|| {
|
||||
let buf = cursor.read_blob(pos).with_context(|| {
|
||||
format!(
|
||||
"Failed to read blob from virtual file {}",
|
||||
file.file.path.display()
|
||||
|
||||
@@ -21,7 +21,6 @@
|
||||
use byteorder::{ByteOrder, LittleEndian};
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use nix::poll::*;
|
||||
use once_cell::sync::Lazy;
|
||||
use serde::Serialize;
|
||||
use std::fs;
|
||||
use std::fs::OpenOptions;
|
||||
@@ -32,8 +31,7 @@ use std::os::unix::prelude::CommandExt;
|
||||
use std::path::PathBuf;
|
||||
use std::process::Stdio;
|
||||
use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command};
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::{Condvar, Mutex};
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use tracing::*;
|
||||
@@ -57,9 +55,6 @@ use postgres_ffi::v14::nonrelfile_utils::{
|
||||
};
|
||||
use postgres_ffi::BLCKSZ;
|
||||
|
||||
/// Maximum number of WAL redo processes to launch for a single tenant.
|
||||
const MAX_PROCESSES: usize = 4;
|
||||
|
||||
///
|
||||
/// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster.
|
||||
///
|
||||
@@ -93,32 +88,18 @@ pub trait WalRedoManager: Send + Sync {
|
||||
) -> Result<Bytes, WalRedoError>;
|
||||
}
|
||||
|
||||
static WAL_REDO_PROCESS_COUNTER: Lazy<AtomicU64> = Lazy::new(|| { AtomicU64::new(0) });
|
||||
|
||||
///
|
||||
/// This is the real implementation that uses a special Postgres
|
||||
/// process to perform WAL replay. There is a pool of these processes.
|
||||
/// This is the real implementation that uses a Postgres process to
|
||||
/// perform WAL replay. Only one thread can use the process at a time,
|
||||
/// that is controlled by the Mutex. In the future, we might want to
|
||||
/// launch a pool of processes to allow concurrent replay of multiple
|
||||
/// records.
|
||||
///
|
||||
pub struct PostgresRedoManager {
|
||||
tenant_id: TenantId,
|
||||
conf: &'static PageServerConf,
|
||||
|
||||
/// Pool of processes.
|
||||
process_list: Mutex<ProcessList>,
|
||||
/// Condition variable that can be used to sleep until a process
|
||||
/// becomes available in the pool.
|
||||
condvar: Condvar,
|
||||
}
|
||||
|
||||
// A pool of WAL redo processes
|
||||
#[derive(Default)]
|
||||
struct ProcessList {
|
||||
/// processes that are available for reuse
|
||||
free_processes: Vec<PostgresRedoProcess>,
|
||||
|
||||
/// Total number of processes, including all the processes in
|
||||
/// 'free_processes' list, and any processes that are in use.
|
||||
num_processes: usize,
|
||||
process: Mutex<Option<PostgresRedoProcess>>,
|
||||
}
|
||||
|
||||
/// Can this request be served by neon redo functions
|
||||
@@ -223,32 +204,7 @@ impl PostgresRedoManager {
|
||||
PostgresRedoManager {
|
||||
tenant_id,
|
||||
conf,
|
||||
process_list: Mutex::new(ProcessList::default()),
|
||||
condvar: Condvar::new(),
|
||||
}
|
||||
}
|
||||
|
||||
// Get a handle to a redo process from the pool.
|
||||
fn get_process(&self, pg_version: u32) -> Result<PostgresRedoProcess, WalRedoError> {
|
||||
let mut process_list = self.process_list.lock().unwrap();
|
||||
|
||||
loop {
|
||||
// If there's a free process immediately available, take it.
|
||||
if let Some(process) = process_list.free_processes.pop() {
|
||||
return Ok(process);
|
||||
}
|
||||
|
||||
// All processes are in use. If the pool is at its maximum size
|
||||
// already, wait for a process to become free. Otherwise launch
|
||||
// a new process.
|
||||
if process_list.num_processes >= MAX_PROCESSES {
|
||||
process_list = self.condvar.wait(process_list).unwrap();
|
||||
continue;
|
||||
} else {
|
||||
let process = PostgresRedoProcess::launch(self.conf, &self.tenant_id, pg_version)?;
|
||||
process_list.num_processes += 1;
|
||||
return Ok(process);
|
||||
}
|
||||
process: Mutex::new(None),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -268,10 +224,16 @@ impl PostgresRedoManager {
|
||||
|
||||
let start_time = Instant::now();
|
||||
|
||||
let mut process = self.get_process(pg_version)?;
|
||||
|
||||
let mut process_guard = self.process.lock().unwrap();
|
||||
let lock_time = Instant::now();
|
||||
|
||||
// launch the WAL redo process on first use
|
||||
if process_guard.is_none() {
|
||||
let p = PostgresRedoProcess::launch(self.conf, &self.tenant_id, pg_version)?;
|
||||
*process_guard = Some(p);
|
||||
}
|
||||
let process = process_guard.as_mut().unwrap();
|
||||
|
||||
WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64());
|
||||
|
||||
// Relational WAL records are applied using wal-redo-postgres
|
||||
@@ -304,9 +266,8 @@ impl PostgresRedoManager {
|
||||
lsn
|
||||
);
|
||||
|
||||
// If something went wrong, don't try to reuse the
|
||||
// process. Kill it, and next request will launch a new one.
|
||||
// Otherwise return the process to the pool.
|
||||
// If something went wrong, don't try to reuse the process. Kill it, and
|
||||
// next request will launch a new one.
|
||||
if result.is_err() {
|
||||
error!(
|
||||
"error applying {} WAL records ({} bytes) to reconstruct page image at LSN {}",
|
||||
@@ -314,14 +275,8 @@ impl PostgresRedoManager {
|
||||
nbytes,
|
||||
lsn
|
||||
);
|
||||
let process = process_guard.take().unwrap();
|
||||
process.kill();
|
||||
let mut process_list = self.process_list.lock().unwrap();
|
||||
process_list.num_processes -= 1;
|
||||
self.condvar.notify_one();
|
||||
} else {
|
||||
let mut process_list = self.process_list.lock().unwrap();
|
||||
process_list.free_processes.push(process);
|
||||
self.condvar.notify_one();
|
||||
}
|
||||
result
|
||||
}
|
||||
@@ -639,10 +594,11 @@ impl PostgresRedoProcess {
|
||||
tenant_id: &TenantId,
|
||||
pg_version: u32,
|
||||
) -> Result<PostgresRedoProcess, Error> {
|
||||
// We need a dummy Postgres cluster to run the process in.
|
||||
let processno = WAL_REDO_PROCESS_COUNTER.fetch_add(1, Ordering::Relaxed);
|
||||
// FIXME: We need a dummy Postgres cluster to run the process in. Currently, we
|
||||
// just create one with constant name. That fails if you try to launch more than
|
||||
// one WAL redo manager concurrently.
|
||||
let datadir = path_with_suffix_extension(
|
||||
conf.tenant_path(tenant_id).join(format!("wal-redo-datadir-{}", processno)),
|
||||
conf.tenant_path(tenant_id).join("wal-redo-datadir"),
|
||||
TEMP_FILE_SUFFIX,
|
||||
);
|
||||
|
||||
@@ -769,11 +725,7 @@ impl PostgresRedoProcess {
|
||||
// This could be problematic if there are millions of records to replay,
|
||||
// but in practice the number of records is usually so small that it doesn't
|
||||
// matter, and it's better to keep this code simple.
|
||||
//
|
||||
// Most requests start with a before-image with BLCKSZ bytes, followed by
|
||||
// by some other WAL records. Start with a buffer that can hold that
|
||||
// comfortably.
|
||||
let mut writebuf: Vec<u8> = Vec::with_capacity((BLCKSZ as usize) * 3);
|
||||
let mut writebuf: Vec<u8> = Vec::new();
|
||||
build_begin_redo_for_block_msg(tag, &mut writebuf);
|
||||
if let Some(img) = base_img {
|
||||
build_push_page_msg(tag, &img, &mut writebuf);
|
||||
|
||||
@@ -40,20 +40,8 @@
|
||||
bool connected = false;
|
||||
PGconn *pageserver_conn = NULL;
|
||||
|
||||
/*
|
||||
* WaitEventSet containing:
|
||||
* - WL_SOCKET_READABLE on pageserver_conn,
|
||||
* - WL_LATCH_SET on MyLatch, and
|
||||
* - WL_EXIT_ON_PM_DEATH.
|
||||
*/
|
||||
WaitEventSet *pageserver_conn_wes = NULL;
|
||||
|
||||
char *page_server_connstring_raw;
|
||||
|
||||
int flush_every_n_requests = 8;
|
||||
|
||||
static void pageserver_flush(void);
|
||||
|
||||
static void
|
||||
pageserver_connect()
|
||||
{
|
||||
@@ -70,7 +58,6 @@ pageserver_connect()
|
||||
|
||||
PQfinish(pageserver_conn);
|
||||
pageserver_conn = NULL;
|
||||
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
|
||||
errmsg(NEON_TAG "could not establish connection to pageserver"),
|
||||
@@ -86,26 +73,22 @@ pageserver_connect()
|
||||
neon_log(ERROR, "could not send pagestream command to pageserver");
|
||||
}
|
||||
|
||||
pageserver_conn_wes = CreateWaitEventSet(TopMemoryContext, 3);
|
||||
AddWaitEventToSet(pageserver_conn_wes, WL_LATCH_SET, PGINVALID_SOCKET,
|
||||
MyLatch, NULL);
|
||||
AddWaitEventToSet(pageserver_conn_wes, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
|
||||
NULL, NULL);
|
||||
AddWaitEventToSet(pageserver_conn_wes, WL_SOCKET_READABLE, PQsocket(pageserver_conn), NULL, NULL);
|
||||
|
||||
while (PQisBusy(pageserver_conn))
|
||||
{
|
||||
int wc;
|
||||
WaitEvent event;
|
||||
|
||||
/* Sleep until there's something to do */
|
||||
wc = WaitEventSetWait(pageserver_conn_wes, -1L, &event, 1, PG_WAIT_EXTENSION);
|
||||
wc = WaitLatchOrSocket(MyLatch,
|
||||
WL_LATCH_SET | WL_SOCKET_READABLE |
|
||||
WL_EXIT_ON_PM_DEATH,
|
||||
PQsocket(pageserver_conn),
|
||||
-1L, PG_WAIT_EXTENSION);
|
||||
ResetLatch(MyLatch);
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
/* Data available in socket? */
|
||||
if (event.events & WL_SOCKET_READABLE)
|
||||
if (wc & WL_SOCKET_READABLE)
|
||||
{
|
||||
if (!PQconsumeInput(pageserver_conn))
|
||||
{
|
||||
@@ -113,7 +96,6 @@ pageserver_connect()
|
||||
|
||||
PQfinish(pageserver_conn);
|
||||
pageserver_conn = NULL;
|
||||
FreeWaitEventSet(pageserver_conn_wes);
|
||||
|
||||
neon_log(ERROR, "could not complete handshake with pageserver: %s",
|
||||
msg);
|
||||
@@ -130,30 +112,33 @@ pageserver_connect()
|
||||
* A wrapper around PQgetCopyData that checks for interrupts while sleeping.
|
||||
*/
|
||||
static int
|
||||
call_PQgetCopyData(char **buffer)
|
||||
call_PQgetCopyData(PGconn *conn, char **buffer)
|
||||
{
|
||||
int ret;
|
||||
|
||||
retry:
|
||||
ret = PQgetCopyData(pageserver_conn, buffer, 1 /* async */ );
|
||||
ret = PQgetCopyData(conn, buffer, 1 /* async */ );
|
||||
|
||||
if (ret == 0)
|
||||
{
|
||||
int wc;
|
||||
WaitEvent event;
|
||||
|
||||
/* Sleep until there's something to do */
|
||||
wc = WaitEventSetWait(pageserver_conn_wes, -1L, &event, 1, PG_WAIT_EXTENSION);
|
||||
wc = WaitLatchOrSocket(MyLatch,
|
||||
WL_LATCH_SET | WL_SOCKET_READABLE |
|
||||
WL_EXIT_ON_PM_DEATH,
|
||||
PQsocket(conn),
|
||||
-1L, PG_WAIT_EXTENSION);
|
||||
ResetLatch(MyLatch);
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
/* Data available in socket? */
|
||||
if (event.events & WL_SOCKET_READABLE)
|
||||
if (wc & WL_SOCKET_READABLE)
|
||||
{
|
||||
if (!PQconsumeInput(pageserver_conn))
|
||||
if (!PQconsumeInput(conn))
|
||||
neon_log(ERROR, "could not get response from pageserver: %s",
|
||||
PQerrorMessage(pageserver_conn));
|
||||
PQerrorMessage(conn));
|
||||
}
|
||||
|
||||
goto retry;
|
||||
@@ -179,11 +164,7 @@ pageserver_disconnect(void)
|
||||
PQfinish(pageserver_conn);
|
||||
pageserver_conn = NULL;
|
||||
connected = false;
|
||||
|
||||
prefetch_on_ps_disconnect();
|
||||
}
|
||||
if (pageserver_conn_wes != NULL)
|
||||
FreeWaitEventSet(pageserver_conn_wes);
|
||||
}
|
||||
|
||||
static void
|
||||
@@ -193,7 +174,11 @@ pageserver_send(NeonRequest * request)
|
||||
|
||||
/* If the connection was lost for some reason, reconnect */
|
||||
if (connected && PQstatus(pageserver_conn) == CONNECTION_BAD)
|
||||
pageserver_disconnect();
|
||||
{
|
||||
PQfinish(pageserver_conn);
|
||||
pageserver_conn = NULL;
|
||||
connected = false;
|
||||
}
|
||||
|
||||
if (!connected)
|
||||
pageserver_connect();
|
||||
@@ -235,7 +220,7 @@ pageserver_receive(void)
|
||||
PG_TRY();
|
||||
{
|
||||
/* read response */
|
||||
resp_buff.len = call_PQgetCopyData(&resp_buff.data);
|
||||
resp_buff.len = call_PQgetCopyData(pageserver_conn, &resp_buff.data);
|
||||
resp_buff.cursor = 0;
|
||||
|
||||
if (resp_buff.len < 0)
|
||||
@@ -270,11 +255,7 @@ pageserver_receive(void)
|
||||
static void
|
||||
pageserver_flush(void)
|
||||
{
|
||||
if (!connected)
|
||||
{
|
||||
neon_log(WARNING, "Tried to flush while disconnected");
|
||||
}
|
||||
else if (PQflush(pageserver_conn))
|
||||
if (PQflush(pageserver_conn))
|
||||
{
|
||||
char *msg = PQerrorMessage(pageserver_conn);
|
||||
|
||||
@@ -283,7 +264,16 @@ pageserver_flush(void)
|
||||
}
|
||||
}
|
||||
|
||||
static NeonResponse *
|
||||
pageserver_call(NeonRequest * request)
|
||||
{
|
||||
pageserver_send(request);
|
||||
pageserver_flush();
|
||||
return pageserver_receive();
|
||||
}
|
||||
|
||||
page_server_api api = {
|
||||
.request = pageserver_call,
|
||||
.send = pageserver_send,
|
||||
.flush = pageserver_flush,
|
||||
.receive = pageserver_receive
|
||||
@@ -437,14 +427,6 @@ pg_init_libpagestore(void)
|
||||
PGC_SIGHUP,
|
||||
GUC_UNIT_MB,
|
||||
NULL, NULL, NULL);
|
||||
DefineCustomIntVariable("neon.flush_output_after",
|
||||
"Flush the output buffer after every N unflushed requests",
|
||||
NULL,
|
||||
&flush_every_n_requests,
|
||||
8, -1, INT_MAX,
|
||||
PGC_USERSET,
|
||||
0, /* no flags required */
|
||||
NULL, NULL, NULL);
|
||||
|
||||
relsize_hash_init();
|
||||
|
||||
|
||||
@@ -115,8 +115,6 @@ typedef struct
|
||||
char page[FLEXIBLE_ARRAY_MEMBER];
|
||||
} NeonGetPageResponse;
|
||||
|
||||
#define PS_GETPAGERESPONSE_SIZE (MAXALIGN(offsetof(NeonGetPageResponse, page) + BLCKSZ))
|
||||
|
||||
typedef struct
|
||||
{
|
||||
NeonMessageTag tag;
|
||||
@@ -140,19 +138,15 @@ extern char *nm_to_string(NeonMessage * msg);
|
||||
|
||||
typedef struct
|
||||
{
|
||||
NeonResponse *(*request) (NeonRequest * request);
|
||||
void (*send) (NeonRequest * request);
|
||||
NeonResponse *(*receive) (void);
|
||||
void (*flush) (void);
|
||||
} page_server_api;
|
||||
|
||||
extern void prefetch_on_ps_disconnect(void);
|
||||
|
||||
extern page_server_api * page_server;
|
||||
|
||||
extern char *page_server_connstring;
|
||||
extern int flush_every_n_requests;
|
||||
extern bool seqscan_prefetch_enabled;
|
||||
extern int seqscan_prefetch_distance;
|
||||
extern char *neon_timeline;
|
||||
extern char *neon_tenant;
|
||||
extern bool wal_redo;
|
||||
@@ -173,6 +167,7 @@ extern void neon_extend(SMgrRelation reln, ForkNumber forknum,
|
||||
BlockNumber blocknum, char *buffer, bool skipFsync);
|
||||
extern bool neon_prefetch(SMgrRelation reln, ForkNumber forknum,
|
||||
BlockNumber blocknum);
|
||||
extern void neon_reset_prefetch(SMgrRelation reln);
|
||||
extern void neon_read(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
char *buffer);
|
||||
|
||||
|
||||
@@ -49,20 +49,22 @@
|
||||
#include "access/xlog.h"
|
||||
#include "access/xloginsert.h"
|
||||
#include "access/xlog_internal.h"
|
||||
#include "access/xlogdefs.h"
|
||||
#include "catalog/pg_class.h"
|
||||
#include "common/hashfn.h"
|
||||
#include "pagestore_client.h"
|
||||
#include "pagestore_client.h"
|
||||
#include "storage/smgr.h"
|
||||
#include "access/xlogdefs.h"
|
||||
#include "postmaster/interrupt.h"
|
||||
#include "postmaster/autovacuum.h"
|
||||
#include "replication/walsender.h"
|
||||
#include "storage/bufmgr.h"
|
||||
#include "storage/relfilenode.h"
|
||||
#include "storage/buf_internals.h"
|
||||
#include "storage/smgr.h"
|
||||
#include "storage/md.h"
|
||||
#include "fmgr.h"
|
||||
#include "miscadmin.h"
|
||||
#include "pgstat.h"
|
||||
|
||||
#include "catalog/pg_tablespace_d.h"
|
||||
#include "postmaster/autovacuum.h"
|
||||
|
||||
#if PG_VERSION_NUM >= 150000
|
||||
#include "access/xlogutils.h"
|
||||
@@ -111,491 +113,48 @@ typedef enum
|
||||
static SMgrRelation unlogged_build_rel = NULL;
|
||||
static UnloggedBuildPhase unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
|
||||
|
||||
|
||||
/*
|
||||
* Prefetch implementation:
|
||||
*
|
||||
* Prefetch is performed locally by each backend.
|
||||
*
|
||||
* There can be up to READ_BUFFER_SIZE active IO requests registered at any
|
||||
* time. Requests using smgr_prefetch are sent to the pageserver, but we don't
|
||||
* wait on the response. Requests using smgr_read are either read from the
|
||||
* buffer, or (if that's not possible) we wait on the response to arrive -
|
||||
* this also will allow us to receive other prefetched pages.
|
||||
* Each request is immediately written to the output buffer of the pageserver
|
||||
* connection, but may not be flushed if smgr_prefetch is used: pageserver
|
||||
* flushes sent requests on manual flush, or every neon.flush_output_after
|
||||
* unflushed requests; which is not necessarily always and all the time.
|
||||
*
|
||||
* Once we have received a response, this value will be stored in the response
|
||||
* buffer, indexed in a hash table. This allows us to retain our buffered
|
||||
* prefetch responses even when we have cache misses.
|
||||
*
|
||||
* Reading of prefetch responses is delayed until them are actually needed
|
||||
* (smgr_read). In case of prefetch miss or any other SMGR request other than
|
||||
* smgr_read, all prefetch responses in the pipeline will need to be read from
|
||||
* the connection; the responses are stored for later use.
|
||||
*
|
||||
* NOTE: The current implementation of the prefetch system implements a ring
|
||||
* buffer of up to READ_BUFFER_SIZE requests. If there are more _read and
|
||||
* _prefetch requests between the initial _prefetch and the _read of a buffer,
|
||||
* the prefetch request will have been dropped from this prefetch buffer, and
|
||||
* your prefetch was wasted.
|
||||
* There can be up to MAX_PREFETCH_REQUESTS registered using smgr_prefetch
|
||||
* before smgr_read. All this requests are appended to primary smgr_read request.
|
||||
* It is assumed that pages will be requested in prefetch order.
|
||||
* Reading of prefetch responses is delayed until them are actually needed (smgr_read).
|
||||
* It make it possible to parallelize processing and receiving of prefetched pages.
|
||||
* In case of prefetch miss or any other SMGR request other than smgr_read,
|
||||
* all prefetch responses has to be consumed.
|
||||
*/
|
||||
|
||||
/* Max amount of tracked buffer reads */
|
||||
#define READ_BUFFER_SIZE 128
|
||||
#define MAX_PREFETCH_REQUESTS 128
|
||||
|
||||
typedef enum PrefetchStatus {
|
||||
PRFS_UNUSED = 0, /* unused slot */
|
||||
PRFS_REQUESTED, /* request was written to the sendbuffer to PS, but not
|
||||
* necessarily flushed.
|
||||
* all fields except response valid */
|
||||
PRFS_RECEIVED, /* all fields valid */
|
||||
PRFS_TAG_REMAINS, /* only buftag and my_ring_index are still valid */
|
||||
} PrefetchStatus;
|
||||
BufferTag prefetch_requests[MAX_PREFETCH_REQUESTS];
|
||||
BufferTag prefetch_responses[MAX_PREFETCH_REQUESTS];
|
||||
int n_prefetch_requests;
|
||||
int n_prefetch_responses;
|
||||
int n_prefetched_buffers;
|
||||
int n_prefetch_hits;
|
||||
int n_prefetch_misses;
|
||||
XLogRecPtr prefetch_lsn;
|
||||
|
||||
typedef struct PrefetchRequest {
|
||||
BufferTag buftag; /* must be first entry in the struct */
|
||||
XLogRecPtr effective_request_lsn;
|
||||
NeonResponse *response; /* may be null */
|
||||
PrefetchStatus status;
|
||||
uint64 my_ring_index;
|
||||
} PrefetchRequest;
|
||||
|
||||
/* prefetch buffer lookup hash table */
|
||||
|
||||
typedef struct PrfHashEntry {
|
||||
PrefetchRequest *slot;
|
||||
uint32 status;
|
||||
uint32 hash;
|
||||
} PrfHashEntry;
|
||||
|
||||
#define SH_PREFIX prfh
|
||||
#define SH_ELEMENT_TYPE PrfHashEntry
|
||||
#define SH_KEY_TYPE PrefetchRequest *
|
||||
#define SH_KEY slot
|
||||
#define SH_STORE_HASH
|
||||
#define SH_GET_HASH(tb, a) ((a)->hash)
|
||||
#define SH_HASH_KEY(tb, key) hash_bytes( \
|
||||
((const unsigned char *) &(key)->buftag), \
|
||||
sizeof(BufferTag) \
|
||||
)
|
||||
|
||||
#define SH_EQUAL(tb, a, b) (BUFFERTAGS_EQUAL((a)->buftag, (b)->buftag))
|
||||
#define SH_SCOPE static inline
|
||||
#define SH_DEFINE
|
||||
#define SH_DECLARE
|
||||
#include "lib/simplehash.h"
|
||||
|
||||
/*
|
||||
* PrefetchState maintains the state of (prefetch) getPage@LSN requests.
|
||||
* It maintains a (ring) buffer of in-flight requests and responses.
|
||||
*
|
||||
* We maintain several indexes into the ring buffer:
|
||||
* ring_unused >= ring_flush >= ring_receive >= ring_last >= 0
|
||||
*
|
||||
* ring_unused points to the first unused slot of the buffer
|
||||
* ring_receive is the next request that is to be received
|
||||
* ring_last is the oldest received entry in the buffer
|
||||
*
|
||||
* Apart from being an entry in the ring buffer of prefetch requests, each
|
||||
* PrefetchRequest that is not UNUSED is indexed in prf_hash by buftag.
|
||||
*/
|
||||
typedef struct PrefetchState {
|
||||
MemoryContext bufctx; /* context for prf_buffer[].response allocations */
|
||||
MemoryContext errctx; /* context for prf_buffer[].response allocations */
|
||||
MemoryContext hashctx; /* context for prf_buffer */
|
||||
|
||||
/* buffer indexes */
|
||||
uint64 ring_unused; /* first unused slot */
|
||||
uint64 ring_flush; /* next request to flush */
|
||||
uint64 ring_receive; /* next slot that is to receive a response */
|
||||
uint64 ring_last; /* min slot with a response value */
|
||||
|
||||
/* metrics / statistics */
|
||||
int n_responses_buffered; /* count of PS responses not yet in buffers */
|
||||
int n_requests_inflight; /* count of PS requests considered in flight */
|
||||
int n_unused; /* count of buffers < unused, > last, that are also unused */
|
||||
|
||||
/* the buffers */
|
||||
prfh_hash *prf_hash;
|
||||
PrefetchRequest prf_buffer[READ_BUFFER_SIZE]; /* prefetch buffers */
|
||||
} PrefetchState;
|
||||
|
||||
PrefetchState *MyPState;
|
||||
|
||||
int n_prefetch_hits = 0;
|
||||
int n_prefetch_misses = 0;
|
||||
int n_prefetch_missed_caches = 0;
|
||||
int n_prefetch_dupes = 0;
|
||||
|
||||
XLogRecPtr prefetch_lsn = 0;
|
||||
|
||||
static void consume_prefetch_responses(void);
|
||||
static uint64 prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_lsn);
|
||||
static void prefetch_read(PrefetchRequest *slot);
|
||||
static void prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force_lsn);
|
||||
static void prefetch_wait_for(uint64 ring_index);
|
||||
static void prefetch_cleanup(void);
|
||||
static inline void prefetch_set_unused(uint64 ring_index, bool hash_cleanup);
|
||||
|
||||
static XLogRecPtr neon_get_request_lsn(bool *latest, RelFileNode rnode,
|
||||
ForkNumber forknum, BlockNumber blkno);
|
||||
|
||||
|
||||
/*
|
||||
* Make sure that there are no responses still in the buffer.
|
||||
*/
|
||||
static void
|
||||
consume_prefetch_responses(void)
|
||||
{
|
||||
if (MyPState->ring_receive < MyPState->ring_unused)
|
||||
prefetch_wait_for(MyPState->ring_unused - 1);
|
||||
}
|
||||
|
||||
static void
|
||||
prefetch_cleanup(void)
|
||||
{
|
||||
int index;
|
||||
uint64 ring_index;
|
||||
PrefetchRequest *slot;
|
||||
|
||||
while (MyPState->ring_last < MyPState->ring_receive) {
|
||||
ring_index = MyPState->ring_last;
|
||||
index = (ring_index % READ_BUFFER_SIZE);
|
||||
slot = &MyPState->prf_buffer[index];
|
||||
|
||||
if (slot->status == PRFS_UNUSED)
|
||||
MyPState->ring_last += 1;
|
||||
else
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Wait for slot of ring_index to have received its response.
|
||||
* The caller is responsible for making sure the request buffer is flushed.
|
||||
*/
|
||||
static void
|
||||
prefetch_wait_for(uint64 ring_index)
|
||||
{
|
||||
int index;
|
||||
PrefetchRequest *entry;
|
||||
|
||||
Assert(MyPState->ring_unused > ring_index);
|
||||
|
||||
while (MyPState->ring_receive <= ring_index)
|
||||
for (int i = n_prefetched_buffers; i < n_prefetch_responses; i++)
|
||||
{
|
||||
index = (MyPState->ring_receive % READ_BUFFER_SIZE);
|
||||
entry = &MyPState->prf_buffer[index];
|
||||
NeonResponse *resp = page_server->receive();
|
||||
|
||||
Assert(entry->status == PRFS_REQUESTED);
|
||||
prefetch_read(entry);
|
||||
pfree(resp);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Read the response of a prefetch request into its slot.
|
||||
*
|
||||
* The caller is responsible for making sure that the request for this buffer
|
||||
* was flushed to the PageServer.
|
||||
*/
|
||||
static void
|
||||
prefetch_read(PrefetchRequest *slot)
|
||||
{
|
||||
NeonResponse *response;
|
||||
MemoryContext old;
|
||||
|
||||
Assert(slot->status == PRFS_REQUESTED);
|
||||
Assert(slot->response == NULL);
|
||||
Assert(slot->my_ring_index == MyPState->ring_receive);
|
||||
|
||||
old = MemoryContextSwitchTo(MyPState->errctx);
|
||||
response = (NeonResponse *) page_server->receive();
|
||||
MemoryContextSwitchTo(old);
|
||||
|
||||
/* update prefetch state */
|
||||
MyPState->n_responses_buffered += 1;
|
||||
MyPState->n_requests_inflight -= 1;
|
||||
MyPState->ring_receive += 1;
|
||||
|
||||
/* update slot state */
|
||||
slot->status = PRFS_RECEIVED;
|
||||
slot->response = response;
|
||||
}
|
||||
|
||||
/*
|
||||
* Disconnect hook - drop prefetches when the connection drops
|
||||
*
|
||||
* If we don't remove the failed prefetches, we'd be serving incorrect
|
||||
* data to the smgr.
|
||||
*/
|
||||
void
|
||||
prefetch_on_ps_disconnect(void)
|
||||
{
|
||||
for (; MyPState->ring_receive < MyPState->ring_unused; MyPState->ring_receive++)
|
||||
{
|
||||
PrefetchRequest *slot;
|
||||
int index = MyPState->ring_receive % READ_BUFFER_SIZE;
|
||||
|
||||
slot = &MyPState->prf_buffer[index];
|
||||
Assert(slot->status == PRFS_REQUESTED);
|
||||
Assert(slot->my_ring_index == MyPState->ring_receive);
|
||||
|
||||
/* clean up the request */
|
||||
slot->status = PRFS_TAG_REMAINS;
|
||||
MyPState->n_requests_inflight--;
|
||||
prefetch_set_unused(MyPState->ring_receive, true);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* prefetch_set_unused() - clear a received prefetch slot
|
||||
*
|
||||
* The slot at ring_index must be a current member of the ring buffer,
|
||||
* and may not be in the PRFS_REQUESTED state.
|
||||
*/
|
||||
static inline void
|
||||
prefetch_set_unused(uint64 ring_index, bool hash_cleanup)
|
||||
{
|
||||
PrefetchRequest *slot = &MyPState->prf_buffer[ring_index % READ_BUFFER_SIZE];
|
||||
|
||||
Assert(MyPState->ring_last <= ring_index &&
|
||||
MyPState->ring_unused > ring_index);
|
||||
|
||||
if (slot->status == PRFS_UNUSED)
|
||||
return;
|
||||
|
||||
Assert(slot->status == PRFS_RECEIVED || slot->status == PRFS_TAG_REMAINS);
|
||||
Assert(ring_index >= MyPState->ring_last &&
|
||||
ring_index < MyPState->ring_unused);
|
||||
|
||||
if (slot->status == PRFS_RECEIVED)
|
||||
{
|
||||
pfree(slot->response);
|
||||
slot->response = NULL;
|
||||
|
||||
MyPState->n_responses_buffered -= 1;
|
||||
MyPState->n_unused += 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
Assert(slot->response == NULL);
|
||||
}
|
||||
|
||||
if (hash_cleanup)
|
||||
prfh_delete(MyPState->prf_hash, slot);
|
||||
|
||||
/* clear all fields */
|
||||
MemSet(slot, 0, sizeof(PrefetchRequest));
|
||||
slot->status = PRFS_UNUSED;
|
||||
|
||||
/* run cleanup if we're holding back ring_last */
|
||||
if (MyPState->ring_last == ring_index)
|
||||
prefetch_cleanup();
|
||||
}
|
||||
|
||||
static void
|
||||
prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force_lsn)
|
||||
{
|
||||
NeonGetPageRequest request = {
|
||||
.req.tag = T_NeonGetPageRequest,
|
||||
.req.latest = false,
|
||||
.req.lsn = 0,
|
||||
.rnode = slot->buftag.rnode,
|
||||
.forknum = slot->buftag.forkNum,
|
||||
.blkno = slot->buftag.blockNum,
|
||||
};
|
||||
|
||||
if (force_lsn && force_latest)
|
||||
{
|
||||
request.req.lsn = *force_lsn;
|
||||
request.req.latest = *force_latest;
|
||||
slot->effective_request_lsn = *force_lsn;
|
||||
}
|
||||
else
|
||||
{
|
||||
XLogRecPtr lsn = neon_get_request_lsn(
|
||||
&request.req.latest,
|
||||
slot->buftag.rnode,
|
||||
slot->buftag.forkNum,
|
||||
slot->buftag.blockNum
|
||||
);
|
||||
/*
|
||||
* Note: effective_request_lsn is potentially higher than the requested
|
||||
* LSN, but still correct:
|
||||
*
|
||||
* We know there are no changes between the actual requested LSN and
|
||||
* the value of effective_request_lsn: If there were, the page would
|
||||
* have been in cache and evicted between those LSN values, which
|
||||
* then would have had to result in a larger request LSN for this page.
|
||||
*
|
||||
* It is possible that a concurrent backend loads the page, modifies
|
||||
* it and then evicts it again, but the LSN of that eviction cannot be
|
||||
* smaller than the current WAL insert/redo pointer, which is already
|
||||
* larger than this prefetch_lsn. So in any case, that would
|
||||
* invalidate this cache.
|
||||
*
|
||||
* The best LSN to use for effective_request_lsn would be
|
||||
* XLogCtl->Insert.RedoRecPtr, but that's expensive to access.
|
||||
*/
|
||||
request.req.lsn = lsn;
|
||||
prefetch_lsn = Max(prefetch_lsn, lsn);
|
||||
slot->effective_request_lsn = prefetch_lsn;
|
||||
}
|
||||
|
||||
Assert(slot->response == NULL);
|
||||
Assert(slot->my_ring_index == MyPState->ring_unused);
|
||||
page_server->send((NeonRequest *) &request);
|
||||
|
||||
/* update prefetch state */
|
||||
MyPState->n_requests_inflight += 1;
|
||||
MyPState->n_unused -= 1;
|
||||
MyPState->ring_unused += 1;
|
||||
|
||||
/* update slot state */
|
||||
slot->status = PRFS_REQUESTED;
|
||||
}
|
||||
|
||||
/*
|
||||
* prefetch_register_buffer() - register and prefetch buffer
|
||||
*
|
||||
* Register that we may want the contents of BufferTag in the near future.
|
||||
*
|
||||
* If force_latest and force_lsn are not NULL, those values are sent to the
|
||||
* pageserver. If they are NULL, we utilize the lastWrittenLsn -infrastructure
|
||||
* to fill in these values manually.
|
||||
*/
|
||||
|
||||
static uint64
|
||||
prefetch_register_buffer(BufferTag tag, bool *force_latest, XLogRecPtr *force_lsn)
|
||||
{
|
||||
int index;
|
||||
bool found;
|
||||
uint64 ring_index;
|
||||
PrefetchRequest req;
|
||||
PrefetchRequest *slot;
|
||||
PrfHashEntry *entry;
|
||||
|
||||
/* use an intermediate PrefetchRequest struct to ensure correct alignment */
|
||||
req.buftag = tag;
|
||||
|
||||
entry = prfh_lookup(MyPState->prf_hash, (PrefetchRequest *) &req);
|
||||
|
||||
if (entry != NULL)
|
||||
{
|
||||
slot = entry->slot;
|
||||
ring_index = slot->my_ring_index;
|
||||
index = (ring_index % READ_BUFFER_SIZE);
|
||||
Assert(slot == &MyPState->prf_buffer[index]);
|
||||
|
||||
Assert(slot->status != PRFS_UNUSED);
|
||||
Assert(BUFFERTAGS_EQUAL(slot->buftag, tag));
|
||||
|
||||
/*
|
||||
* If we want a specific lsn, we do not accept requests that were made
|
||||
* with a potentially different LSN.
|
||||
*/
|
||||
if (force_lsn && slot->effective_request_lsn != *force_lsn)
|
||||
{
|
||||
prefetch_wait_for(ring_index);
|
||||
prefetch_set_unused(ring_index, true);
|
||||
}
|
||||
/*
|
||||
* We received a prefetch for a page that was recently read and
|
||||
* removed from the buffers. Remove that request from the buffers.
|
||||
*/
|
||||
else if (slot->status == PRFS_TAG_REMAINS)
|
||||
{
|
||||
prefetch_set_unused(ring_index, true);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* The buffered request is good enough, return that index */
|
||||
n_prefetch_dupes++;
|
||||
return ring_index;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* If the prefetch queue is full, we need to make room by clearing the
|
||||
* oldest slot. If the oldest slot holds a buffer that was already
|
||||
* received, we can just throw it away; we fetched the page unnecessarily
|
||||
* in that case. If the oldest slot holds a request that we haven't
|
||||
* received a response for yet, we have to wait for the response to that
|
||||
* before we can continue. We might not have even flushed the request to
|
||||
* the pageserver yet, it might be just sitting in the output buffer. In
|
||||
* that case, we flush it and wait for the response. (We could decide not
|
||||
* to send it, but it's hard to abort when the request is already in the
|
||||
* output buffer, and 'not sending' a prefetch request kind of goes
|
||||
* against the principles of prefetching)
|
||||
*/
|
||||
if (MyPState->ring_last + READ_BUFFER_SIZE - 1 == MyPState->ring_unused)
|
||||
{
|
||||
slot = &MyPState->prf_buffer[(MyPState->ring_last % READ_BUFFER_SIZE)];
|
||||
|
||||
Assert(slot->status != PRFS_UNUSED);
|
||||
|
||||
/* We have the slot for ring_last, so that must still be in progress */
|
||||
switch (slot->status)
|
||||
{
|
||||
case PRFS_REQUESTED:
|
||||
Assert(MyPState->ring_receive == MyPState->ring_last);
|
||||
prefetch_wait_for(MyPState->ring_last);
|
||||
prefetch_set_unused(MyPState->ring_last, true);
|
||||
break;
|
||||
case PRFS_RECEIVED:
|
||||
case PRFS_TAG_REMAINS:
|
||||
prefetch_set_unused(MyPState->ring_last, true);
|
||||
break;
|
||||
default:
|
||||
pg_unreachable();
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* The next buffer pointed to by `ring_unused` is now unused, so we can insert
|
||||
* the new request to it.
|
||||
*/
|
||||
ring_index = MyPState->ring_unused;
|
||||
index = (ring_index % READ_BUFFER_SIZE);
|
||||
slot = &MyPState->prf_buffer[index];
|
||||
|
||||
Assert(MyPState->ring_last <= ring_index);
|
||||
|
||||
Assert(slot->status == PRFS_UNUSED);
|
||||
|
||||
/*
|
||||
* We must update the slot data before insertion, because the hash
|
||||
* function reads the buffer tag from the slot.
|
||||
*/
|
||||
slot->buftag = tag;
|
||||
slot->my_ring_index = ring_index;
|
||||
|
||||
prfh_insert(MyPState->prf_hash, slot, &found);
|
||||
Assert(!found);
|
||||
|
||||
prefetch_do_request(slot, force_latest, force_lsn);
|
||||
Assert(slot->status == PRFS_REQUESTED);
|
||||
Assert(ring_index < MyPState->ring_unused);
|
||||
|
||||
if (flush_every_n_requests > 0 && MyPState->ring_unused - MyPState->ring_flush >= flush_every_n_requests)
|
||||
{
|
||||
page_server->flush();
|
||||
MyPState->ring_flush = MyPState->ring_unused;
|
||||
}
|
||||
|
||||
return ring_index;
|
||||
n_prefetched_buffers = 0;
|
||||
n_prefetch_responses = 0;
|
||||
}
|
||||
|
||||
static NeonResponse *
|
||||
page_server_request(void const *req)
|
||||
{
|
||||
page_server->send((NeonRequest *) req);
|
||||
page_server->flush();
|
||||
MyPState->ring_flush = MyPState->ring_unused;
|
||||
consume_prefetch_responses();
|
||||
return page_server->receive();
|
||||
return page_server->request((NeonRequest *) req);
|
||||
}
|
||||
|
||||
|
||||
@@ -709,15 +268,12 @@ nm_unpack_response(StringInfo s)
|
||||
|
||||
case T_NeonGetPageResponse:
|
||||
{
|
||||
NeonGetPageResponse *msg_resp;
|
||||
NeonGetPageResponse *msg_resp = palloc0(offsetof(NeonGetPageResponse, page) + BLCKSZ);
|
||||
|
||||
msg_resp = MemoryContextAllocZero(MyPState->bufctx, PS_GETPAGERESPONSE_SIZE);
|
||||
msg_resp->tag = tag;
|
||||
/* XXX: should be varlena */
|
||||
memcpy(msg_resp->page, pq_getmsgbytes(s, BLCKSZ), BLCKSZ);
|
||||
pq_getmsgend(s);
|
||||
|
||||
Assert(msg_resp->tag == T_NeonGetPageResponse);
|
||||
|
||||
resp = (NeonResponse *) msg_resp;
|
||||
break;
|
||||
@@ -1061,32 +617,7 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, ch
|
||||
void
|
||||
neon_init(void)
|
||||
{
|
||||
HASHCTL info;
|
||||
|
||||
if (MyPState != NULL)
|
||||
return;
|
||||
|
||||
MyPState = MemoryContextAllocZero(TopMemoryContext, sizeof(PrefetchState));
|
||||
|
||||
MyPState->n_unused = READ_BUFFER_SIZE;
|
||||
|
||||
MyPState->bufctx = SlabContextCreate(TopMemoryContext,
|
||||
"NeonSMGR/prefetch",
|
||||
SLAB_DEFAULT_BLOCK_SIZE * 17,
|
||||
PS_GETPAGERESPONSE_SIZE);
|
||||
MyPState->errctx = AllocSetContextCreate(TopMemoryContext,
|
||||
"NeonSMGR/errors",
|
||||
ALLOCSET_DEFAULT_SIZES);
|
||||
MyPState->hashctx = AllocSetContextCreate(TopMemoryContext,
|
||||
"NeonSMGR/prefetch",
|
||||
ALLOCSET_DEFAULT_SIZES);
|
||||
|
||||
info.keysize = sizeof(BufferTag);
|
||||
info.entrysize = sizeof(uint64);
|
||||
|
||||
MyPState->prf_hash = prfh_create(MyPState->hashctx,
|
||||
READ_BUFFER_SIZE, NULL);
|
||||
|
||||
/* noop */
|
||||
#ifdef DEBUG_COMPARE_LOCAL
|
||||
mdinit();
|
||||
#endif
|
||||
@@ -1473,17 +1004,27 @@ neon_close(SMgrRelation reln, ForkNumber forknum)
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* neon_reset_prefetch() -- reoe all previously rgistered prefeth requests
|
||||
*/
|
||||
void
|
||||
neon_reset_prefetch(SMgrRelation reln)
|
||||
{
|
||||
n_prefetch_requests = 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* neon_prefetch() -- Initiate asynchronous read of the specified block of a relation
|
||||
*/
|
||||
bool
|
||||
neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum)
|
||||
{
|
||||
uint64 ring_index;
|
||||
|
||||
switch (reln->smgr_relpersistence)
|
||||
{
|
||||
case 0: /* probably shouldn't happen, but ignore it */
|
||||
case 0:
|
||||
/* probably shouldn't happen, but ignore it */
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_PERMANENT:
|
||||
break;
|
||||
|
||||
@@ -1495,17 +1036,14 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum)
|
||||
elog(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
|
||||
}
|
||||
|
||||
BufferTag tag = (BufferTag) {
|
||||
.rnode = reln->smgr_rnode.node,
|
||||
.forkNum = forknum,
|
||||
.blockNum = blocknum
|
||||
};
|
||||
|
||||
ring_index = prefetch_register_buffer(tag, NULL, NULL);
|
||||
|
||||
Assert(ring_index < MyPState->ring_unused &&
|
||||
MyPState->ring_last <= ring_index);
|
||||
|
||||
if (n_prefetch_requests < MAX_PREFETCH_REQUESTS)
|
||||
{
|
||||
prefetch_requests[n_prefetch_requests].rnode = reln->smgr_rnode.node;
|
||||
prefetch_requests[n_prefetch_requests].forkNum = forknum;
|
||||
prefetch_requests[n_prefetch_requests].blockNum = blocknum;
|
||||
n_prefetch_requests += 1;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -1556,77 +1094,81 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
|
||||
XLogRecPtr request_lsn, bool request_latest, char *buffer)
|
||||
{
|
||||
NeonResponse *resp;
|
||||
BufferTag buftag;
|
||||
uint64 ring_index;
|
||||
PrfHashEntry *entry;
|
||||
PrefetchRequest *slot;
|
||||
|
||||
buftag = (BufferTag) {
|
||||
.rnode = rnode,
|
||||
.forkNum = forkNum,
|
||||
.blockNum = blkno,
|
||||
};
|
||||
int i;
|
||||
|
||||
/*
|
||||
* Try to find prefetched page in the list of received pages.
|
||||
* Try to find prefetched page. It is assumed that pages will be requested
|
||||
* in the same order as them are prefetched, but some other backend may
|
||||
* load page in shared buffers, so some prefetch responses should be
|
||||
* skipped.
|
||||
*/
|
||||
entry = prfh_lookup(MyPState->prf_hash, (PrefetchRequest *) &buftag);
|
||||
|
||||
if (entry != NULL)
|
||||
for (i = n_prefetched_buffers; i < n_prefetch_responses; i++)
|
||||
{
|
||||
if (entry->slot->effective_request_lsn >= prefetch_lsn)
|
||||
{
|
||||
slot = entry->slot;
|
||||
ring_index = slot->my_ring_index;
|
||||
n_prefetch_hits += 1;
|
||||
}
|
||||
else /* the current prefetch LSN is not large enough, so drop the prefetch */
|
||||
resp = page_server->receive();
|
||||
if (resp->tag == T_NeonGetPageResponse &&
|
||||
RelFileNodeEquals(prefetch_responses[i].rnode, rnode) &&
|
||||
prefetch_responses[i].forkNum == forkNum &&
|
||||
prefetch_responses[i].blockNum == blkno)
|
||||
{
|
||||
char *page = ((NeonGetPageResponse *) resp)->page;
|
||||
|
||||
/*
|
||||
* We can't drop cache for not-yet-received requested items. It is
|
||||
* unlikely this happens, but it can happen if prefetch distance is
|
||||
* large enough and a backend didn't consume all prefetch requests.
|
||||
* Check if prefetched page is still relevant. If it is updated by
|
||||
* some other backend, then it should not be requested from smgr
|
||||
* unless it is evicted from shared buffers. In the last case
|
||||
* last_evicted_lsn should be updated and request_lsn should be
|
||||
* greater than prefetch_lsn. Maximum with page LSN is used
|
||||
* because page returned by page server may have LSN either
|
||||
* greater either smaller than requested.
|
||||
*/
|
||||
if (entry->slot->status == PRFS_REQUESTED)
|
||||
if (Max(prefetch_lsn, PageGetLSN(page)) >= request_lsn)
|
||||
{
|
||||
page_server->flush();
|
||||
MyPState->ring_flush = MyPState->ring_unused;
|
||||
prefetch_wait_for(entry->slot->my_ring_index);
|
||||
n_prefetched_buffers = i + 1;
|
||||
n_prefetch_hits += 1;
|
||||
n_prefetch_requests = 0;
|
||||
memcpy(buffer, page, BLCKSZ);
|
||||
pfree(resp);
|
||||
return;
|
||||
}
|
||||
/* drop caches */
|
||||
prefetch_set_unused(entry->slot->my_ring_index, true);
|
||||
n_prefetch_missed_caches += 1;
|
||||
/* make it look like a prefetch cache miss */
|
||||
entry = NULL;
|
||||
}
|
||||
pfree(resp);
|
||||
}
|
||||
n_prefetched_buffers = 0;
|
||||
n_prefetch_responses = 0;
|
||||
n_prefetch_misses += 1;
|
||||
{
|
||||
NeonGetPageRequest request = {
|
||||
.req.tag = T_NeonGetPageRequest,
|
||||
.req.latest = request_latest,
|
||||
.req.lsn = request_lsn,
|
||||
.rnode = rnode,
|
||||
.forknum = forkNum,
|
||||
.blkno = blkno
|
||||
};
|
||||
|
||||
if (n_prefetch_requests > 0)
|
||||
{
|
||||
/* Combine all prefetch requests with primary request */
|
||||
page_server->send((NeonRequest *) & request);
|
||||
for (i = 0; i < n_prefetch_requests; i++)
|
||||
{
|
||||
request.rnode = prefetch_requests[i].rnode;
|
||||
request.forknum = prefetch_requests[i].forkNum;
|
||||
request.blkno = prefetch_requests[i].blockNum;
|
||||
prefetch_responses[i] = prefetch_requests[i];
|
||||
page_server->send((NeonRequest *) & request);
|
||||
}
|
||||
page_server->flush();
|
||||
n_prefetch_responses = n_prefetch_requests;
|
||||
n_prefetch_requests = 0;
|
||||
prefetch_lsn = request_lsn;
|
||||
resp = page_server->receive();
|
||||
}
|
||||
else
|
||||
{
|
||||
resp = page_server->request((NeonRequest *) & request);
|
||||
}
|
||||
}
|
||||
|
||||
if (entry == NULL)
|
||||
{
|
||||
n_prefetch_misses += 1;
|
||||
|
||||
ring_index = prefetch_register_buffer(buftag, &request_latest,
|
||||
&request_lsn);
|
||||
slot = &MyPState->prf_buffer[(ring_index % READ_BUFFER_SIZE)];
|
||||
}
|
||||
|
||||
Assert(MyPState->ring_last <= ring_index &&
|
||||
MyPState->ring_unused > ring_index);
|
||||
Assert(slot->my_ring_index == ring_index);
|
||||
Assert(slot->status != PRFS_UNUSED);
|
||||
Assert(&MyPState->prf_buffer[(ring_index % READ_BUFFER_SIZE)] == slot);
|
||||
|
||||
if (ring_index >= MyPState->ring_flush)
|
||||
{
|
||||
page_server->flush();
|
||||
MyPState->ring_flush = MyPState->ring_unused;
|
||||
}
|
||||
prefetch_wait_for(ring_index);
|
||||
|
||||
Assert(slot->status == PRFS_RECEIVED);
|
||||
|
||||
resp = slot->response;
|
||||
|
||||
switch (resp->tag)
|
||||
{
|
||||
case T_NeonGetPageResponse:
|
||||
@@ -1646,13 +1188,12 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
|
||||
errdetail("page server returned error: %s",
|
||||
((NeonErrorResponse *) resp)->message)));
|
||||
break;
|
||||
|
||||
default:
|
||||
elog(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
|
||||
}
|
||||
|
||||
/* buffer was used, clean up for later reuse */
|
||||
prefetch_set_unused(ring_index, true);
|
||||
prefetch_cleanup();
|
||||
pfree(resp);
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -2274,6 +1815,7 @@ static const struct f_smgr neon_smgr =
|
||||
.smgr_unlink = neon_unlink,
|
||||
.smgr_extend = neon_extend,
|
||||
.smgr_prefetch = neon_prefetch,
|
||||
.smgr_reset_prefetch = neon_reset_prefetch,
|
||||
.smgr_read = neon_read,
|
||||
.smgr_write = neon_write,
|
||||
.smgr_writeback = neon_writeback,
|
||||
|
||||
179
scripts/strip-useless-debug.py
Executable file
179
scripts/strip-useless-debug.py
Executable file
@@ -0,0 +1,179 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# Strip useless .debug_pubnames and .debug_pubtypes from all binaries.
|
||||
# They bloat the binaries, and are not used by modern debuggers anyway.
|
||||
# This makes the resulting binaries about 30% smaller, and also makes
|
||||
# the cargo cache smaller.
|
||||
#
|
||||
# See also https://github.com/rust-lang/rust/issues/46034
|
||||
#
|
||||
# Usage:
|
||||
# ./scripts/strip-useless-debug.py target
|
||||
#
|
||||
#
|
||||
# Why is this script needed?
|
||||
# --------------------------
|
||||
#
|
||||
# The simplest way to do this would be just:
|
||||
#
|
||||
# find target -executable -type f -size +0 | \
|
||||
# xargs -IPATH objcopy -R .debug_pubnames -R .debug_pubtypes -p PATH
|
||||
#
|
||||
# However, objcopy is not very fast, so we want to run it in parallel.
|
||||
# That would be straightforward to do with the xargs -P option, except
|
||||
# that the rust target directory contains hard links. Running objcopy
|
||||
# on multiple paths that are hardlinked to the same underlying file
|
||||
# doesn't work, because one objcopy could be overwriting the file while
|
||||
# the other one is trying to read it.
|
||||
#
|
||||
# To work around that, this script scans the target directory and
|
||||
# collects paths of all executables, except that when multiple paths
|
||||
# point to the same underlying inode, i.e. if two paths are hard links
|
||||
# to the same file, only one of the paths is collected. Then, it runs
|
||||
# objcopy on each of the unique files.
|
||||
#
|
||||
# There's one more subtle problem with hardlinks. The GNU objcopy man
|
||||
# page says that:
|
||||
#
|
||||
# If you do not specify outfile, objcopy creates a temporary file and
|
||||
# destructively renames the result with the name of infile.
|
||||
#
|
||||
# That is a problem: renaming over the file will create a new inode
|
||||
# for the path, and leave the other hardlinked paths unchanged. We
|
||||
# want to modify all the hard linked copies, and we also don't want to
|
||||
# remove the hard linking, as that saves a lot of space. In testing,
|
||||
# at least some versions of GNU objcopy seem to actually behave
|
||||
# differently if the file has hard links, copying over the file
|
||||
# instead of renaming if it has. So that text in the man page isn't
|
||||
# totally accurate. But that's hardly something we should rely on:
|
||||
# llvm-objcopy for example always renames. To avoid that problem, we
|
||||
# specify a temporary file as the destination, and copy it over the
|
||||
# original file in this python script. That way, it is independent of
|
||||
# objcopy's behavior.
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import os
|
||||
import time
|
||||
import shutil
|
||||
import subprocess
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
async def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Strip useless .debug_pubnames and .debug_putypes sections from binaries",
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
)
|
||||
parser.add_argument(
|
||||
"-j", metavar="NUM", default=os.cpu_count(), type=int, help="number of parallel processes"
|
||||
)
|
||||
parser.add_argument("target", type=Path, help="target directory")
|
||||
|
||||
args = parser.parse_args()
|
||||
max_parallel_processes = args.j
|
||||
target_dir = args.target
|
||||
|
||||
# Collect list of executables in the target dir. Make note of the inode of
|
||||
# each path, and only record one path with the same inode. This ensures that
|
||||
# if there are hard links in the directory tree, we only remember one path
|
||||
# for each underlying file.
|
||||
inode_paths = {} # inode -> path dictionary
|
||||
|
||||
def onerror(err):
|
||||
raise err
|
||||
|
||||
for currentpath, folders, files in os.walk(target_dir, onerror=onerror):
|
||||
for file in files:
|
||||
path = os.path.join(currentpath, file)
|
||||
if os.access(path, os.X_OK):
|
||||
stat = os.stat(path)
|
||||
# If multiple paths ar hardlinked to the same underlying file,
|
||||
# only we remember the first one that we see. It's arbitrary
|
||||
# which one we will see first, but that's ok.
|
||||
#
|
||||
# Skip empty files while we're at it. There are some .lock files
|
||||
# in the target directory that are marked as executable, but are
|
||||
# are binaries so objcopy would complain about them.
|
||||
if stat.st_size > 0:
|
||||
prev = inode_paths.get(stat.st_ino)
|
||||
if prev:
|
||||
print(f"{path} is a hard link to {prev}, skipping")
|
||||
else:
|
||||
inode_paths[stat.st_ino] = path
|
||||
|
||||
# This function runs "objcopy -R .debug_pubnames -R .debug_pubtypes" on a file.
|
||||
#
|
||||
# Returns (original size, new size)
|
||||
async def run_objcopy(path) -> (int, int):
|
||||
stat = os.stat(path)
|
||||
orig_size = stat.st_size
|
||||
if orig_size == 0:
|
||||
return (0, 0)
|
||||
|
||||
# Write the output to a temp file first, and then copy it over the original.
|
||||
# objcopy could modify the file in place, but that's not reliable with hard
|
||||
# links. (See comment at beginning of this file.)
|
||||
with tempfile.NamedTemporaryFile() as tmpfile:
|
||||
cmd = [
|
||||
"objcopy",
|
||||
"-R",
|
||||
".debug_pubnames",
|
||||
"-R",
|
||||
".debug_pubtypes",
|
||||
"-p",
|
||||
path,
|
||||
tmpfile.name,
|
||||
]
|
||||
proc = await asyncio.create_subprocess_exec(*cmd)
|
||||
rc = await proc.wait()
|
||||
if rc != 0:
|
||||
raise subprocess.CalledProcessError(rc, cmd)
|
||||
|
||||
# If the file got smaller, copy it over the original.
|
||||
# Otherwise keep the original
|
||||
stat = os.stat(tmpfile.name)
|
||||
new_size = stat.st_size
|
||||
if new_size < orig_size:
|
||||
with open(path, "wb") as orig_dst:
|
||||
shutil.copyfileobj(tmpfile, orig_dst)
|
||||
return (orig_size, new_size)
|
||||
else:
|
||||
return (orig_size, orig_size)
|
||||
|
||||
# convert the inode->path dictionary into plain list of paths.
|
||||
paths = []
|
||||
for path in inode_paths.values():
|
||||
paths.append(path)
|
||||
|
||||
# Launch worker processes to process the list of files
|
||||
before_total = 0
|
||||
after_total = 0
|
||||
|
||||
async def runner_subproc():
|
||||
nonlocal before_total
|
||||
nonlocal after_total
|
||||
while len(paths) > 0:
|
||||
path = paths.pop()
|
||||
|
||||
start_time = time.perf_counter_ns()
|
||||
(before_size, after_size) = await run_objcopy(path)
|
||||
end_time = time.perf_counter_ns()
|
||||
before_total += before_size
|
||||
after_total += after_size
|
||||
|
||||
duration_ms = round((end_time-start_time) / 1000000)
|
||||
print(f"{path}: {before_size} to {after_size} bytes ({duration_ms} ms)")
|
||||
|
||||
active_workers = []
|
||||
for i in range(max_parallel_processes):
|
||||
active_workers.append(asyncio.create_task(runner_subproc()))
|
||||
done, () = await asyncio.wait(active_workers)
|
||||
|
||||
# all done!
|
||||
print(f"total size before {before_total} after: {after_total}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
@@ -65,8 +65,17 @@ BASE_PORT = 15000
|
||||
WORKER_PORT_NUM = 1000
|
||||
|
||||
|
||||
# These are set in pytest_configure()
|
||||
base_dir = ""
|
||||
neon_binpath = ""
|
||||
pg_distrib_dir = ""
|
||||
top_output_dir = ""
|
||||
default_pg_version = ""
|
||||
|
||||
|
||||
def pytest_configure(config):
|
||||
"""
|
||||
Ensure that no unwanted daemons are running before we start testing.
|
||||
Check that we do not overflow available ports range.
|
||||
"""
|
||||
|
||||
@@ -76,89 +85,67 @@ def pytest_configure(config):
|
||||
): # do not use ephemeral ports
|
||||
raise Exception("Too many workers configured. Cannot distribute ports for services.")
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def base_dir() -> Iterator[Path]:
|
||||
# find the base directory (currently this is the git root)
|
||||
base_dir = get_self_dir().parent.parent
|
||||
global base_dir
|
||||
base_dir = os.path.normpath(os.path.join(get_self_dir(), "../.."))
|
||||
log.info(f"base_dir is {base_dir}")
|
||||
|
||||
yield base_dir
|
||||
# Compute the top-level directory for all tests.
|
||||
global top_output_dir
|
||||
env_test_output = os.environ.get("TEST_OUTPUT")
|
||||
if env_test_output is not None:
|
||||
top_output_dir = env_test_output
|
||||
else:
|
||||
top_output_dir = os.path.join(base_dir, DEFAULT_OUTPUT_DIR)
|
||||
Path(top_output_dir).mkdir(exist_ok=True)
|
||||
|
||||
# Find the postgres installation.
|
||||
global default_pg_version
|
||||
log.info(f"default_pg_version is {default_pg_version}")
|
||||
env_default_pg_version = os.environ.get("DEFAULT_PG_VERSION")
|
||||
if env_default_pg_version:
|
||||
default_pg_version = env_default_pg_version
|
||||
log.info(f"default_pg_version is set to {default_pg_version}")
|
||||
else:
|
||||
default_pg_version = DEFAULT_PG_VERSION_DEFAULT
|
||||
|
||||
global pg_distrib_dir
|
||||
|
||||
env_postgres_bin = os.environ.get("POSTGRES_DISTRIB_DIR")
|
||||
if env_postgres_bin:
|
||||
pg_distrib_dir = env_postgres_bin
|
||||
else:
|
||||
pg_distrib_dir = os.path.normpath(os.path.join(base_dir, "pg_install"))
|
||||
|
||||
log.info(f"pg_distrib_dir is {pg_distrib_dir}")
|
||||
psql_bin_path = os.path.join(pg_distrib_dir, "v{}".format(default_pg_version), "bin/psql")
|
||||
postgres_bin_path = os.path.join(
|
||||
pg_distrib_dir, "v{}".format(default_pg_version), "bin/postgres"
|
||||
)
|
||||
|
||||
if os.getenv("REMOTE_ENV"):
|
||||
# When testing against a remote server, we only need the client binary.
|
||||
if not os.path.exists(psql_bin_path):
|
||||
raise Exception('psql not found at "{}"'.format(psql_bin_path))
|
||||
else:
|
||||
if not os.path.exists(postgres_bin_path):
|
||||
raise Exception('postgres not found at "{}"'.format(postgres_bin_path))
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def neon_binpath(base_dir: Path) -> Iterator[Path]:
|
||||
if os.getenv("REMOTE_ENV"):
|
||||
# we are in remote env and do not have neon binaries locally
|
||||
# this is the case for benchmarks run on self-hosted runner
|
||||
return
|
||||
|
||||
# Find the neon binaries.
|
||||
if env_neon_bin := os.environ.get("NEON_BIN"):
|
||||
binpath = Path(env_neon_bin)
|
||||
global neon_binpath
|
||||
env_neon_bin = os.environ.get("NEON_BIN")
|
||||
if env_neon_bin:
|
||||
neon_binpath = env_neon_bin
|
||||
else:
|
||||
build_type = os.environ.get("BUILD_TYPE", "debug")
|
||||
binpath = base_dir / "target" / build_type
|
||||
log.info(f"neon_binpath is {binpath}")
|
||||
|
||||
if not (binpath / "pageserver").exists():
|
||||
raise Exception(f"neon binaries not found at '{binpath}'")
|
||||
|
||||
yield binpath
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def pg_distrib_dir(base_dir: Path) -> Iterator[Path]:
|
||||
if env_postgres_bin := os.environ.get("POSTGRES_DISTRIB_DIR"):
|
||||
distrib_dir = Path(env_postgres_bin).resolve()
|
||||
else:
|
||||
distrib_dir = base_dir / "pg_install"
|
||||
|
||||
log.info(f"pg_distrib_dir is {distrib_dir}")
|
||||
yield distrib_dir
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def top_output_dir(base_dir: Path) -> Iterator[Path]:
|
||||
# Compute the top-level directory for all tests.
|
||||
if env_test_output := os.environ.get("TEST_OUTPUT"):
|
||||
output_dir = Path(env_test_output).resolve()
|
||||
else:
|
||||
output_dir = base_dir / DEFAULT_OUTPUT_DIR
|
||||
output_dir.mkdir(exist_ok=True)
|
||||
|
||||
log.info(f"top_output_dir is {output_dir}")
|
||||
yield output_dir
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def pg_version() -> Iterator[str]:
|
||||
if env_default_pg_version := os.environ.get("DEFAULT_PG_VERSION"):
|
||||
version = env_default_pg_version
|
||||
else:
|
||||
version = DEFAULT_PG_VERSION_DEFAULT
|
||||
|
||||
log.info(f"pg_version is {version}")
|
||||
yield version
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def versioned_pg_distrib_dir(pg_distrib_dir: Path, pg_version: str) -> Iterator[Path]:
|
||||
versioned_dir = pg_distrib_dir / f"v{pg_version}"
|
||||
|
||||
psql_bin_path = versioned_dir / "bin/psql"
|
||||
postgres_bin_path = versioned_dir / "bin/postgres"
|
||||
|
||||
if os.getenv("REMOTE_ENV"):
|
||||
# When testing against a remote server, we only need the client binary.
|
||||
if not psql_bin_path.exists():
|
||||
raise Exception(f"psql not found at '{psql_bin_path}'")
|
||||
else:
|
||||
if not postgres_bin_path.exists:
|
||||
raise Exception(f"postgres not found at '{postgres_bin_path}'")
|
||||
|
||||
log.info(f"versioned_pg_distrib_dir is {versioned_dir}")
|
||||
yield versioned_dir
|
||||
neon_binpath = os.path.join(base_dir, "target", build_type)
|
||||
log.info(f"neon_binpath is {neon_binpath}")
|
||||
if not os.path.exists(os.path.join(neon_binpath, "pageserver")):
|
||||
raise Exception('neon binaries not found at "{}"'.format(neon_binpath))
|
||||
|
||||
|
||||
def shareable_scope(fixture_name, config) -> Literal["session", "function"]:
|
||||
@@ -245,18 +232,16 @@ def port_distributor(worker_base_port):
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def default_broker(request: Any, port_distributor: PortDistributor, top_output_dir: Path):
|
||||
def default_broker(request: Any, port_distributor: PortDistributor):
|
||||
client_port = port_distributor.get_port()
|
||||
# multiple pytest sessions could get launched in parallel, get them different datadirs
|
||||
etcd_datadir = get_test_output_dir(request, top_output_dir) / f"etcd_datadir_{client_port}"
|
||||
etcd_datadir.mkdir(exist_ok=True, parents=True)
|
||||
etcd_datadir = os.path.join(get_test_output_dir(request), f"etcd_datadir_{client_port}")
|
||||
Path(etcd_datadir).mkdir(exist_ok=True, parents=True)
|
||||
|
||||
broker = Etcd(
|
||||
datadir=str(etcd_datadir), port=client_port, peer_port=port_distributor.get_port()
|
||||
)
|
||||
broker = Etcd(datadir=etcd_datadir, port=client_port, peer_port=port_distributor.get_port())
|
||||
yield broker
|
||||
broker.stop()
|
||||
allure_attach_from_dir(etcd_datadir)
|
||||
allure_attach_from_dir(Path(etcd_datadir))
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
@@ -536,9 +521,6 @@ class NeonEnvBuilder:
|
||||
broker: Etcd,
|
||||
run_id: uuid.UUID,
|
||||
mock_s3_server: MockS3Server,
|
||||
neon_binpath: Path,
|
||||
pg_distrib_dir: Path,
|
||||
pg_version: str,
|
||||
remote_storage: Optional[RemoteStorage] = None,
|
||||
remote_storage_users: RemoteStorageUsers = RemoteStorageUsers.PAGESERVER,
|
||||
pageserver_config_override: Optional[str] = None,
|
||||
@@ -568,9 +550,7 @@ class NeonEnvBuilder:
|
||||
self.env: Optional[NeonEnv] = None
|
||||
self.remote_storage_prefix: Optional[str] = None
|
||||
self.keep_remote_storage_contents: bool = True
|
||||
self.neon_binpath = neon_binpath
|
||||
self.pg_distrib_dir = pg_distrib_dir
|
||||
self.pg_version = pg_version
|
||||
self.pg_version = default_pg_version
|
||||
|
||||
def init(self) -> NeonEnv:
|
||||
# Cannot create more than one environment from one builder
|
||||
@@ -786,8 +766,6 @@ class NeonEnv:
|
||||
self.remote_storage = config.remote_storage
|
||||
self.remote_storage_users = config.remote_storage_users
|
||||
self.pg_version = config.pg_version
|
||||
self.neon_binpath = config.neon_binpath
|
||||
self.pg_distrib_dir = config.pg_distrib_dir
|
||||
|
||||
# generate initial tenant ID here instead of letting 'neon init' generate it,
|
||||
# so that we don't need to dig it out of the config file afterwards.
|
||||
@@ -883,7 +861,7 @@ class NeonEnv:
|
||||
return self.repo_dir / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
|
||||
|
||||
def get_pageserver_version(self) -> str:
|
||||
bin_pageserver = str(self.neon_binpath / "pageserver")
|
||||
bin_pageserver = os.path.join(str(neon_binpath), "pageserver")
|
||||
res = subprocess.run(
|
||||
[bin_pageserver, "--version"],
|
||||
check=True,
|
||||
@@ -907,10 +885,6 @@ def _shared_simple_env(
|
||||
mock_s3_server: MockS3Server,
|
||||
default_broker: Etcd,
|
||||
run_id: uuid.UUID,
|
||||
top_output_dir: Path,
|
||||
neon_binpath: Path,
|
||||
pg_distrib_dir: Path,
|
||||
pg_version: str,
|
||||
) -> Iterator[NeonEnv]:
|
||||
"""
|
||||
# Internal fixture backing the `neon_simple_env` fixture. If TEST_SHARED_FIXTURES
|
||||
@@ -919,20 +893,17 @@ def _shared_simple_env(
|
||||
|
||||
if os.environ.get("TEST_SHARED_FIXTURES") is None:
|
||||
# Create the environment in the per-test output directory
|
||||
repo_dir = get_test_output_dir(request, top_output_dir) / "repo"
|
||||
repo_dir = os.path.join(get_test_output_dir(request), "repo")
|
||||
else:
|
||||
# We're running shared fixtures. Share a single directory.
|
||||
repo_dir = top_output_dir / "shared_repo"
|
||||
repo_dir = os.path.join(str(top_output_dir), "shared_repo")
|
||||
shutil.rmtree(repo_dir, ignore_errors=True)
|
||||
|
||||
with NeonEnvBuilder(
|
||||
repo_dir=repo_dir,
|
||||
repo_dir=Path(repo_dir),
|
||||
port_distributor=port_distributor,
|
||||
broker=default_broker,
|
||||
mock_s3_server=mock_s3_server,
|
||||
neon_binpath=neon_binpath,
|
||||
pg_distrib_dir=pg_distrib_dir,
|
||||
pg_version=pg_version,
|
||||
run_id=run_id,
|
||||
) as builder:
|
||||
env = builder.init_start()
|
||||
@@ -963,9 +934,6 @@ def neon_env_builder(
|
||||
test_output_dir,
|
||||
port_distributor: PortDistributor,
|
||||
mock_s3_server: MockS3Server,
|
||||
neon_binpath: Path,
|
||||
pg_distrib_dir: Path,
|
||||
pg_version: str,
|
||||
default_broker: Etcd,
|
||||
run_id: uuid.UUID,
|
||||
) -> Iterator[NeonEnvBuilder]:
|
||||
@@ -990,9 +958,6 @@ def neon_env_builder(
|
||||
repo_dir=Path(repo_dir),
|
||||
port_distributor=port_distributor,
|
||||
mock_s3_server=mock_s3_server,
|
||||
neon_binpath=neon_binpath,
|
||||
pg_distrib_dir=pg_distrib_dir,
|
||||
pg_version=pg_version,
|
||||
broker=default_broker,
|
||||
run_id=run_id,
|
||||
) as builder:
|
||||
@@ -1275,7 +1240,7 @@ class AbstractNeonCli(abc.ABC):
|
||||
assert type(arguments) == list
|
||||
assert type(self.COMMAND) == str
|
||||
|
||||
bin_neon = str(self.env.neon_binpath / self.COMMAND)
|
||||
bin_neon = os.path.join(str(neon_binpath), self.COMMAND)
|
||||
|
||||
args = [bin_neon] + arguments
|
||||
log.info('Running command "{}"'.format(" ".join(args)))
|
||||
@@ -1283,7 +1248,7 @@ class AbstractNeonCli(abc.ABC):
|
||||
|
||||
env_vars = os.environ.copy()
|
||||
env_vars["NEON_REPO_DIR"] = str(self.env.repo_dir)
|
||||
env_vars["POSTGRES_DISTRIB_DIR"] = str(self.env.pg_distrib_dir)
|
||||
env_vars["POSTGRES_DISTRIB_DIR"] = str(pg_distrib_dir)
|
||||
if self.env.rust_log_override is not None:
|
||||
env_vars["RUST_LOG"] = self.env.rust_log_override
|
||||
for (extra_env_key, extra_env_value) in (extra_env_vars or {}).items():
|
||||
@@ -1758,17 +1723,17 @@ def append_pageserver_param_overrides(
|
||||
class PgBin:
|
||||
"""A helper class for executing postgres binaries"""
|
||||
|
||||
def __init__(self, log_dir: Path, pg_distrib_dir: Path, pg_version: str):
|
||||
def __init__(self, log_dir: Path, pg_version: str):
|
||||
self.log_dir = log_dir
|
||||
self.pg_version = pg_version
|
||||
self.pg_bin_path = pg_distrib_dir / f"v{pg_version}" / "bin"
|
||||
self.pg_lib_dir = pg_distrib_dir / f"v{pg_version}" / "lib"
|
||||
self.pg_bin_path = os.path.join(str(pg_distrib_dir), "v{}".format(pg_version), "bin")
|
||||
self.pg_lib_dir = os.path.join(str(pg_distrib_dir), "v{}".format(pg_version), "lib")
|
||||
self.env = os.environ.copy()
|
||||
self.env["LD_LIBRARY_PATH"] = str(self.pg_lib_dir)
|
||||
self.env["LD_LIBRARY_PATH"] = self.pg_lib_dir
|
||||
|
||||
def _fixpath(self, command: List[str]):
|
||||
if "/" not in str(command[0]):
|
||||
command[0] = str(self.pg_bin_path / command[0])
|
||||
if "/" not in command[0]:
|
||||
command[0] = os.path.join(self.pg_bin_path, command[0])
|
||||
|
||||
def _build_env(self, env_add: Optional[Env]) -> Env:
|
||||
if env_add is None:
|
||||
@@ -1792,7 +1757,7 @@ class PgBin:
|
||||
"""
|
||||
|
||||
self._fixpath(command)
|
||||
log.info(f"Running command '{' '.join(command)}'")
|
||||
log.info('Running command "{}"'.format(" ".join(command)))
|
||||
env = self._build_env(env)
|
||||
subprocess.run(command, env=env, cwd=cwd, check=True)
|
||||
|
||||
@@ -1811,14 +1776,16 @@ class PgBin:
|
||||
"""
|
||||
|
||||
self._fixpath(command)
|
||||
log.info(f"Running command '{' '.join(command)}'")
|
||||
log.info('Running command "{}"'.format(" ".join(command)))
|
||||
env = self._build_env(env)
|
||||
return subprocess_capture(self.log_dir, command, env=env, cwd=cwd, check=True, **kwargs)
|
||||
return subprocess_capture(
|
||||
str(self.log_dir), command, env=env, cwd=cwd, check=True, **kwargs
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def pg_bin(test_output_dir: Path, pg_distrib_dir: Path, pg_version: str) -> PgBin:
|
||||
return PgBin(test_output_dir, pg_distrib_dir, pg_version)
|
||||
def pg_bin(test_output_dir: Path, pg_version: str) -> PgBin:
|
||||
return PgBin(test_output_dir, pg_version)
|
||||
|
||||
|
||||
class VanillaPostgres(PgProtocol):
|
||||
@@ -1865,15 +1832,19 @@ class VanillaPostgres(PgProtocol):
|
||||
self.stop()
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def pg_version() -> str:
|
||||
return default_pg_version
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def vanilla_pg(
|
||||
test_output_dir: Path,
|
||||
port_distributor: PortDistributor,
|
||||
pg_distrib_dir: Path,
|
||||
pg_version: str,
|
||||
) -> Iterator[VanillaPostgres]:
|
||||
pgdatadir = test_output_dir / "pgdata-vanilla"
|
||||
pg_bin = PgBin(test_output_dir, pg_distrib_dir, pg_version)
|
||||
pg_bin = PgBin(test_output_dir, pg_version)
|
||||
port = port_distributor.get_port()
|
||||
with VanillaPostgres(pgdatadir, pg_bin, port) as vanilla_pg:
|
||||
yield vanilla_pg
|
||||
@@ -1909,10 +1880,8 @@ class RemotePostgres(PgProtocol):
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def remote_pg(
|
||||
test_output_dir: Path, pg_distrib_dir: Path, pg_version: str
|
||||
) -> Iterator[RemotePostgres]:
|
||||
pg_bin = PgBin(test_output_dir, pg_distrib_dir, pg_version)
|
||||
def remote_pg(test_output_dir: Path, pg_version: str) -> Iterator[RemotePostgres]:
|
||||
pg_bin = PgBin(test_output_dir, pg_version)
|
||||
|
||||
connstr = os.getenv("BENCHMARK_CONNSTR")
|
||||
if connstr is None:
|
||||
@@ -1957,18 +1926,10 @@ class PSQL:
|
||||
|
||||
|
||||
class NeonProxy(PgProtocol):
|
||||
def __init__(
|
||||
self,
|
||||
proxy_port: int,
|
||||
http_port: int,
|
||||
neon_binpath: Path,
|
||||
auth_endpoint=None,
|
||||
mgmt_port=None,
|
||||
):
|
||||
def __init__(self, proxy_port: int, http_port: int, auth_endpoint=None, mgmt_port=None):
|
||||
super().__init__(dsn=auth_endpoint, port=proxy_port)
|
||||
self.host = "127.0.0.1"
|
||||
self.http_port = http_port
|
||||
self.neon_binpath = neon_binpath
|
||||
self.proxy_port = proxy_port
|
||||
self.mgmt_port = mgmt_port
|
||||
self.auth_endpoint = auth_endpoint
|
||||
@@ -1984,7 +1945,7 @@ class NeonProxy(PgProtocol):
|
||||
|
||||
# Start proxy
|
||||
args = [
|
||||
str(self.neon_binpath / "proxy"),
|
||||
os.path.join(neon_binpath, "proxy"),
|
||||
*["--http", f"{self.host}:{self.http_port}"],
|
||||
*["--proxy", f"{self.host}:{self.proxy_port}"],
|
||||
*["--auth-backend", "postgres"],
|
||||
@@ -2000,7 +1961,7 @@ class NeonProxy(PgProtocol):
|
||||
assert self._popen is None
|
||||
|
||||
# Start proxy
|
||||
bin_proxy = str(self.neon_binpath / "proxy")
|
||||
bin_proxy = os.path.join(str(neon_binpath), "proxy")
|
||||
args = [bin_proxy]
|
||||
args.extend(["--http", f"{self.host}:{self.http_port}"])
|
||||
args.extend(["--proxy", f"{self.host}:{self.proxy_port}"])
|
||||
@@ -2032,18 +1993,18 @@ class NeonProxy(PgProtocol):
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def link_proxy(port_distributor, neon_binpath: Path) -> Iterator[NeonProxy]:
|
||||
def link_proxy(port_distributor) -> Iterator[NeonProxy]:
|
||||
"""Neon proxy that routes through link auth."""
|
||||
http_port = port_distributor.get_port()
|
||||
proxy_port = port_distributor.get_port()
|
||||
mgmt_port = port_distributor.get_port()
|
||||
with NeonProxy(proxy_port, http_port, neon_binpath=neon_binpath, mgmt_port=mgmt_port) as proxy:
|
||||
with NeonProxy(proxy_port, http_port, mgmt_port=mgmt_port) as proxy:
|
||||
proxy.start_with_link_auth()
|
||||
yield proxy
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def static_proxy(vanilla_pg, port_distributor, neon_binpath: Path) -> Iterator[NeonProxy]:
|
||||
def static_proxy(vanilla_pg, port_distributor) -> Iterator[NeonProxy]:
|
||||
"""Neon proxy that routes directly to vanilla postgres."""
|
||||
|
||||
# For simplicity, we use the same user for both `--auth-endpoint` and `safe_psql`
|
||||
@@ -2059,10 +2020,7 @@ def static_proxy(vanilla_pg, port_distributor, neon_binpath: Path) -> Iterator[N
|
||||
http_port = port_distributor.get_port()
|
||||
|
||||
with NeonProxy(
|
||||
proxy_port=proxy_port,
|
||||
http_port=http_port,
|
||||
neon_binpath=neon_binpath,
|
||||
auth_endpoint=auth_endpoint,
|
||||
proxy_port=proxy_port, http_port=http_port, auth_endpoint=auth_endpoint
|
||||
) as proxy:
|
||||
proxy.start()
|
||||
yield proxy
|
||||
@@ -2565,10 +2523,10 @@ class Etcd:
|
||||
self.handle.wait()
|
||||
|
||||
|
||||
def get_test_output_dir(request: Any, top_output_dir: Path) -> Path:
|
||||
def get_test_output_dir(request: Any) -> Path:
|
||||
"""Compute the working directory for an individual test."""
|
||||
test_name = request.node.name
|
||||
test_dir = top_output_dir / test_name.replace("/", "-")
|
||||
test_dir = Path(top_output_dir) / test_name.replace("/", "-")
|
||||
log.info(f"get_test_output_dir is {test_dir}")
|
||||
# make mypy happy
|
||||
assert isinstance(test_dir, Path)
|
||||
@@ -2585,11 +2543,11 @@ def get_test_output_dir(request: Any, top_output_dir: Path) -> Path:
|
||||
# this fixture ensures that the directory exists. That works because
|
||||
# 'autouse' fixtures are run before other fixtures.
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def test_output_dir(request: Any, top_output_dir: Path) -> Iterator[Path]:
|
||||
def test_output_dir(request: Any) -> Iterator[Path]:
|
||||
"""Create the working directory for an individual test."""
|
||||
|
||||
# one directory per test
|
||||
test_dir = get_test_output_dir(request, top_output_dir)
|
||||
test_dir = get_test_output_dir(request)
|
||||
log.info(f"test_output_dir is {test_dir}")
|
||||
shutil.rmtree(test_dir, ignore_errors=True)
|
||||
test_dir.mkdir()
|
||||
@@ -2681,7 +2639,7 @@ def check_restored_datadir_content(
|
||||
restored_dir_path = env.repo_dir / f"{pg.node_name}_restored_datadir"
|
||||
restored_dir_path.mkdir(exist_ok=True)
|
||||
|
||||
pg_bin = PgBin(test_output_dir, env.pg_distrib_dir, env.pg_version)
|
||||
pg_bin = PgBin(test_output_dir, env.pg_version)
|
||||
psql_path = os.path.join(pg_bin.pg_bin_path, "psql")
|
||||
|
||||
cmd = rf"""
|
||||
|
||||
@@ -15,13 +15,12 @@ from psycopg2.extensions import cursor
|
||||
Fn = TypeVar("Fn", bound=Callable[..., Any])
|
||||
|
||||
|
||||
def get_self_dir() -> Path:
|
||||
def get_self_dir() -> str:
|
||||
"""Get the path to the directory where this script lives."""
|
||||
# return os.path.dirname(os.path.abspath(__file__))
|
||||
return Path(__file__).resolve().parent
|
||||
return os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
|
||||
def subprocess_capture(capture_dir: Path, cmd: List[str], **kwargs: Any) -> str:
|
||||
def subprocess_capture(capture_dir: str, cmd: List[str], **kwargs: Any) -> str:
|
||||
"""Run a process and capture its output
|
||||
|
||||
Output will go to files named "cmd_NNN.stdout" and "cmd_NNN.stderr"
|
||||
|
||||
@@ -46,9 +46,9 @@ def test_pg_clients(test_output_dir: Path, remote_pg: RemotePostgres, client: st
|
||||
raise RuntimeError("docker is required for running this test")
|
||||
|
||||
build_cmd = [docker_bin, "build", "--tag", image_tag, f"{Path(__file__).parent / client}"]
|
||||
subprocess_capture(test_output_dir, build_cmd, check=True)
|
||||
subprocess_capture(str(test_output_dir), build_cmd, check=True)
|
||||
|
||||
run_cmd = [docker_bin, "run", "--rm", "--env-file", env_file, image_tag]
|
||||
basepath = subprocess_capture(test_output_dir, run_cmd, check=True)
|
||||
basepath = subprocess_capture(str(test_output_dir), run_cmd, check=True)
|
||||
|
||||
assert Path(f"{basepath}.stdout").read_text().strip() == "1"
|
||||
|
||||
@@ -80,12 +80,7 @@ class PortReplacer(object):
|
||||
|
||||
@pytest.mark.order(after="test_prepare_snapshot")
|
||||
def test_backward_compatibility(
|
||||
pg_bin: PgBin,
|
||||
port_distributor: PortDistributor,
|
||||
test_output_dir: Path,
|
||||
request: FixtureRequest,
|
||||
neon_binpath: Path,
|
||||
pg_distrib_dir: Path,
|
||||
pg_bin: PgBin, port_distributor: PortDistributor, test_output_dir: Path, request: FixtureRequest
|
||||
):
|
||||
compatibility_snapshot_dir = Path(
|
||||
os.environ.get("COMPATIBILITY_SNAPSHOT_DIR", DEFAILT_LOCAL_SNAPSHOT_DIR)
|
||||
@@ -175,8 +170,6 @@ def test_backward_compatibility(
|
||||
config.repo_dir = repo_dir
|
||||
config.pg_version = "14" # Note: `pg_dumpall` (from pg_bin) version is set by DEFAULT_PG_VERSION_DEFAULT and can be overriden by DEFAULT_PG_VERSION env var
|
||||
config.initial_tenant = snapshot_config["default_tenant_id"]
|
||||
config.neon_binpath = neon_binpath
|
||||
config.pg_distrib_dir = pg_distrib_dir
|
||||
|
||||
# Check that we can start the project
|
||||
cli = NeonCli(config)
|
||||
|
||||
@@ -1,8 +1,13 @@
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, PortDistributor, VanillaPostgres
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
PortDistributor,
|
||||
VanillaPostgres,
|
||||
pg_distrib_dir,
|
||||
)
|
||||
from fixtures.types import Lsn, TimelineId
|
||||
from fixtures.utils import query_scalar, subprocess_capture
|
||||
|
||||
@@ -11,10 +16,7 @@ num_rows = 1000
|
||||
|
||||
# Ensure that regular postgres can start from fullbackup
|
||||
def test_fullbackup(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
pg_bin: PgBin,
|
||||
port_distributor: PortDistributor,
|
||||
pg_distrib_dir: Path,
|
||||
neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, port_distributor: PortDistributor
|
||||
):
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
@@ -38,7 +40,7 @@ def test_fullbackup(
|
||||
|
||||
# Set LD_LIBRARY_PATH in the env properly, otherwise we may use the wrong libpq.
|
||||
# PgBin sets it automatically, but here we need to pipe psql output to the tar command.
|
||||
psql_env = {"LD_LIBRARY_PATH": str(pg_distrib_dir / "lib")}
|
||||
psql_env = {"LD_LIBRARY_PATH": os.path.join(str(pg_distrib_dir), "lib")}
|
||||
|
||||
# Get and unpack fullbackup from pageserver
|
||||
restored_dir_path = env.repo_dir / "restored_datadir"
|
||||
@@ -47,7 +49,9 @@ def test_fullbackup(
|
||||
cmd = ["psql", "--no-psqlrc", env.pageserver.connstr(), "-c", query]
|
||||
result_basepath = pg_bin.run_capture(cmd, env=psql_env)
|
||||
tar_output_file = result_basepath + ".stdout"
|
||||
subprocess_capture(env.repo_dir, ["tar", "-xf", tar_output_file, "-C", str(restored_dir_path)])
|
||||
subprocess_capture(
|
||||
str(env.repo_dir), ["tar", "-xf", tar_output_file, "-C", str(restored_dir_path)]
|
||||
)
|
||||
|
||||
# HACK
|
||||
# fullbackup returns neon specific pg_control and first WAL segment
|
||||
|
||||
@@ -13,6 +13,7 @@ from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
Postgres,
|
||||
pg_distrib_dir,
|
||||
wait_for_last_record_lsn,
|
||||
wait_for_upload,
|
||||
)
|
||||
@@ -127,7 +128,7 @@ def test_import_from_pageserver_small(pg_bin: PgBin, neon_env_builder: NeonEnvBu
|
||||
|
||||
num_rows = 3000
|
||||
lsn = _generate_data(num_rows, pg)
|
||||
_import(num_rows, lsn, env, pg_bin, timeline, env.pg_distrib_dir)
|
||||
_import(num_rows, lsn, env, pg_bin, timeline)
|
||||
|
||||
|
||||
@pytest.mark.timeout(1800)
|
||||
@@ -155,7 +156,7 @@ def test_import_from_pageserver_multisegment(pg_bin: PgBin, neon_env_builder: Ne
|
||||
log.info(f"timeline logical size = {logical_size / (1024 ** 2)}MB")
|
||||
assert logical_size > 1024**3 # = 1GB
|
||||
|
||||
tar_output_file = _import(num_rows, lsn, env, pg_bin, timeline, env.pg_distrib_dir)
|
||||
tar_output_file = _import(num_rows, lsn, env, pg_bin, timeline)
|
||||
|
||||
# Check if the backup data contains multiple segment files
|
||||
cnt_seg_files = 0
|
||||
@@ -190,12 +191,7 @@ def _generate_data(num_rows: int, pg: Postgres) -> Lsn:
|
||||
|
||||
|
||||
def _import(
|
||||
expected_num_rows: int,
|
||||
lsn: Lsn,
|
||||
env: NeonEnv,
|
||||
pg_bin: PgBin,
|
||||
timeline: TimelineId,
|
||||
pg_distrib_dir: Path,
|
||||
expected_num_rows: int, lsn: Lsn, env: NeonEnv, pg_bin: PgBin, timeline: TimelineId
|
||||
) -> str:
|
||||
"""Test importing backup data to the pageserver.
|
||||
|
||||
@@ -209,7 +205,7 @@ def _import(
|
||||
|
||||
# Set LD_LIBRARY_PATH in the env properly, otherwise we may use the wrong libpq.
|
||||
# PgBin sets it automatically, but here we need to pipe psql output to the tar command.
|
||||
psql_env = {"LD_LIBRARY_PATH": str(pg_distrib_dir / "lib")}
|
||||
psql_env = {"LD_LIBRARY_PATH": os.path.join(str(pg_distrib_dir), "lib")}
|
||||
|
||||
# Get a fullbackup from pageserver
|
||||
query = f"fullbackup { env.initial_tenant} {timeline} {lsn}"
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import pathlib
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from fixtures.neon_fixtures import (
|
||||
@@ -7,18 +7,18 @@ from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
PageserverHttpClient,
|
||||
neon_binpath,
|
||||
pg_distrib_dir,
|
||||
)
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
from fixtures.utils import wait_until
|
||||
|
||||
|
||||
# test that we cannot override node id after init
|
||||
def test_pageserver_init_node_id(
|
||||
neon_simple_env: NeonEnv, neon_binpath: Path, pg_distrib_dir: Path
|
||||
):
|
||||
def test_pageserver_init_node_id(neon_simple_env: NeonEnv):
|
||||
repo_dir = neon_simple_env.repo_dir
|
||||
pageserver_config = repo_dir / "pageserver.toml"
|
||||
pageserver_bin = neon_binpath / "pageserver"
|
||||
pageserver_bin = pathlib.Path(neon_binpath) / "pageserver"
|
||||
|
||||
def run_pageserver(args):
|
||||
return subprocess.run(
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
#
|
||||
# This file runs pg_regress-based tests.
|
||||
#
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
|
||||
from fixtures.neon_fixtures import NeonEnv, base_dir, check_restored_datadir_content, pg_distrib_dir
|
||||
|
||||
|
||||
# Run the main PostgreSQL regression tests, in src/test/regress.
|
||||
@@ -12,14 +13,7 @@ from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
|
||||
# This runs for a long time, especially in debug mode, so use a larger-than-default
|
||||
# timeout.
|
||||
@pytest.mark.timeout(1800)
|
||||
def test_pg_regress(
|
||||
neon_simple_env: NeonEnv,
|
||||
test_output_dir: Path,
|
||||
pg_bin,
|
||||
capsys,
|
||||
base_dir: Path,
|
||||
pg_distrib_dir: Path,
|
||||
):
|
||||
def test_pg_regress(neon_simple_env: NeonEnv, test_output_dir: Path, pg_bin, capsys):
|
||||
env = neon_simple_env
|
||||
|
||||
env.neon_cli.create_branch("test_pg_regress", "empty")
|
||||
@@ -32,20 +26,20 @@ def test_pg_regress(
|
||||
(runpath / "testtablespace").mkdir(parents=True)
|
||||
|
||||
# Compute all the file locations that pg_regress will need.
|
||||
build_path = pg_distrib_dir / f"build/v{env.pg_version}/src/test/regress"
|
||||
src_path = base_dir / f"vendor/postgres-v{env.pg_version}/src/test/regress"
|
||||
bindir = pg_distrib_dir / f"v{env.pg_version}/bin"
|
||||
schedule = src_path / "parallel_schedule"
|
||||
pg_regress = build_path / "pg_regress"
|
||||
build_path = os.path.join(pg_distrib_dir, "build/v{}/src/test/regress").format(env.pg_version)
|
||||
src_path = os.path.join(base_dir, "vendor/postgres-v{}/src/test/regress").format(env.pg_version)
|
||||
bindir = os.path.join(pg_distrib_dir, "v{}".format(env.pg_version), "bin")
|
||||
schedule = os.path.join(src_path, "parallel_schedule")
|
||||
pg_regress = os.path.join(build_path, "pg_regress")
|
||||
|
||||
pg_regress_command = [
|
||||
str(pg_regress),
|
||||
pg_regress,
|
||||
'--bindir=""',
|
||||
"--use-existing",
|
||||
f"--bindir={bindir}",
|
||||
f"--dlpath={build_path}",
|
||||
f"--schedule={schedule}",
|
||||
f"--inputdir={src_path}",
|
||||
"--bindir={}".format(bindir),
|
||||
"--dlpath={}".format(build_path),
|
||||
"--schedule={}".format(schedule),
|
||||
"--inputdir={}".format(src_path),
|
||||
]
|
||||
|
||||
env_vars = {
|
||||
@@ -72,14 +66,7 @@ def test_pg_regress(
|
||||
# This runs for a long time, especially in debug mode, so use a larger-than-default
|
||||
# timeout.
|
||||
@pytest.mark.timeout(1800)
|
||||
def test_isolation(
|
||||
neon_simple_env: NeonEnv,
|
||||
test_output_dir: Path,
|
||||
pg_bin,
|
||||
capsys,
|
||||
base_dir: Path,
|
||||
pg_distrib_dir: Path,
|
||||
):
|
||||
def test_isolation(neon_simple_env: NeonEnv, test_output_dir: Path, pg_bin, capsys):
|
||||
env = neon_simple_env
|
||||
|
||||
env.neon_cli.create_branch("test_isolation", "empty")
|
||||
@@ -93,19 +80,21 @@ def test_isolation(
|
||||
(runpath / "testtablespace").mkdir(parents=True)
|
||||
|
||||
# Compute all the file locations that pg_isolation_regress will need.
|
||||
build_path = pg_distrib_dir / f"build/v{env.pg_version}/src/test/isolation"
|
||||
src_path = base_dir / f"vendor/postgres-v{env.pg_version}/src/test/isolation"
|
||||
bindir = pg_distrib_dir / f"v{env.pg_version}/bin"
|
||||
schedule = src_path / "isolation_schedule"
|
||||
pg_isolation_regress = build_path / "pg_isolation_regress"
|
||||
build_path = os.path.join(pg_distrib_dir, "build/v{}/src/test/isolation".format(env.pg_version))
|
||||
src_path = os.path.join(
|
||||
base_dir, "vendor/postgres-v{}/src/test/isolation".format(env.pg_version)
|
||||
)
|
||||
bindir = os.path.join(pg_distrib_dir, "v{}".format(env.pg_version), "bin")
|
||||
schedule = os.path.join(src_path, "isolation_schedule")
|
||||
pg_isolation_regress = os.path.join(build_path, "pg_isolation_regress")
|
||||
|
||||
pg_isolation_regress_command = [
|
||||
str(pg_isolation_regress),
|
||||
pg_isolation_regress,
|
||||
"--use-existing",
|
||||
f"--bindir={bindir}",
|
||||
f"--dlpath={build_path}",
|
||||
f"--inputdir={src_path}",
|
||||
f"--schedule={schedule}",
|
||||
"--bindir={}".format(bindir),
|
||||
"--dlpath={}".format(build_path),
|
||||
"--inputdir={}".format(src_path),
|
||||
"--schedule={}".format(schedule),
|
||||
]
|
||||
|
||||
env_vars = {
|
||||
@@ -123,14 +112,7 @@ def test_isolation(
|
||||
|
||||
# Run extra Neon-specific pg_regress-based tests. The tests and their
|
||||
# schedule file are in the sql_regress/ directory.
|
||||
def test_sql_regress(
|
||||
neon_simple_env: NeonEnv,
|
||||
test_output_dir: Path,
|
||||
pg_bin,
|
||||
capsys,
|
||||
base_dir: Path,
|
||||
pg_distrib_dir: Path,
|
||||
):
|
||||
def test_sql_regress(neon_simple_env: NeonEnv, test_output_dir: Path, pg_bin, capsys):
|
||||
env = neon_simple_env
|
||||
|
||||
env.neon_cli.create_branch("test_sql_regress", "empty")
|
||||
@@ -144,19 +126,19 @@ def test_sql_regress(
|
||||
|
||||
# Compute all the file locations that pg_regress will need.
|
||||
# This test runs neon specific tests
|
||||
build_path = pg_distrib_dir / f"build/v{env.pg_version}/src/test/regress"
|
||||
src_path = base_dir / "test_runner/sql_regress"
|
||||
bindir = pg_distrib_dir / f"v{env.pg_version}/bin"
|
||||
schedule = src_path / "parallel_schedule"
|
||||
pg_regress = build_path / "pg_regress"
|
||||
build_path = os.path.join(pg_distrib_dir, "build/v{}/src/test/regress").format(env.pg_version)
|
||||
src_path = os.path.join(base_dir, "test_runner/sql_regress")
|
||||
bindir = os.path.join(pg_distrib_dir, "v{}".format(env.pg_version), "bin")
|
||||
schedule = os.path.join(src_path, "parallel_schedule")
|
||||
pg_regress = os.path.join(build_path, "pg_regress")
|
||||
|
||||
pg_regress_command = [
|
||||
str(pg_regress),
|
||||
pg_regress,
|
||||
"--use-existing",
|
||||
f"--bindir={bindir}",
|
||||
f"--dlpath={build_path}",
|
||||
f"--schedule={schedule}",
|
||||
f"--inputdir={src_path}",
|
||||
"--bindir={}".format(bindir),
|
||||
"--dlpath={}".format(build_path),
|
||||
"--schedule={}".format(schedule),
|
||||
"--inputdir={}".format(src_path),
|
||||
]
|
||||
|
||||
env_vars = {
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import os
|
||||
import pathlib
|
||||
import threading
|
||||
from contextlib import closing, contextmanager
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Optional, Tuple
|
||||
|
||||
import pytest
|
||||
@@ -14,6 +14,9 @@ from fixtures.neon_fixtures import (
|
||||
PortDistributor,
|
||||
Postgres,
|
||||
assert_no_in_progress_downloads_for_tenant,
|
||||
base_dir,
|
||||
neon_binpath,
|
||||
pg_distrib_dir,
|
||||
wait_for_last_record_lsn,
|
||||
wait_for_upload,
|
||||
)
|
||||
@@ -27,13 +30,12 @@ def assert_abs_margin_ratio(a: float, b: float, margin_ratio: float):
|
||||
|
||||
@contextmanager
|
||||
def new_pageserver_service(
|
||||
new_pageserver_dir: Path,
|
||||
pageserver_bin: Path,
|
||||
remote_storage_mock_path: Path,
|
||||
new_pageserver_dir: pathlib.Path,
|
||||
pageserver_bin: pathlib.Path,
|
||||
remote_storage_mock_path: pathlib.Path,
|
||||
pg_port: int,
|
||||
http_port: int,
|
||||
broker: Optional[Etcd],
|
||||
pg_distrib_dir: Path,
|
||||
):
|
||||
"""
|
||||
cannot use NeonPageserver yet because it depends on neon cli
|
||||
@@ -191,10 +193,10 @@ def switch_pg_to_new_pageserver(
|
||||
new_pageserver_port: int,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> Path:
|
||||
) -> pathlib.Path:
|
||||
pg.stop()
|
||||
|
||||
pg_config_file_path = Path(pg.config_file_path())
|
||||
pg_config_file_path = pathlib.Path(pg.config_file_path())
|
||||
pg_config_file_path.open("a").write(
|
||||
f"\nneon.pageserver_connstring = 'postgresql://no_user:@localhost:{new_pageserver_port}'"
|
||||
)
|
||||
@@ -217,7 +219,7 @@ def switch_pg_to_new_pageserver(
|
||||
return timeline_to_detach_local_path
|
||||
|
||||
|
||||
def post_migration_check(pg: Postgres, sum_before_migration: int, old_local_path: Path):
|
||||
def post_migration_check(pg: Postgres, sum_before_migration: int, old_local_path: pathlib.Path):
|
||||
with pg_cur(pg) as cur:
|
||||
# check that data is still there
|
||||
cur.execute("SELECT sum(key) FROM t")
|
||||
@@ -249,9 +251,7 @@ def post_migration_check(pg: Postgres, sum_before_migration: int, old_local_path
|
||||
def test_tenant_relocation(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
port_distributor: PortDistributor,
|
||||
test_output_dir: Path,
|
||||
neon_binpath: Path,
|
||||
base_dir: Path,
|
||||
test_output_dir,
|
||||
method: str,
|
||||
with_load: str,
|
||||
):
|
||||
@@ -350,7 +350,7 @@ def test_tenant_relocation(
|
||||
new_pageserver_pg_port = port_distributor.get_port()
|
||||
new_pageserver_http_port = port_distributor.get_port()
|
||||
log.info("new pageserver ports pg %s http %s", new_pageserver_pg_port, new_pageserver_http_port)
|
||||
pageserver_bin = neon_binpath / "pageserver"
|
||||
pageserver_bin = pathlib.Path(neon_binpath) / "pageserver"
|
||||
|
||||
new_pageserver_http = PageserverHttpClient(
|
||||
port=new_pageserver_http_port,
|
||||
@@ -365,7 +365,6 @@ def test_tenant_relocation(
|
||||
new_pageserver_pg_port,
|
||||
new_pageserver_http_port,
|
||||
neon_env_builder.broker,
|
||||
neon_env_builder.pg_distrib_dir,
|
||||
):
|
||||
|
||||
# Migrate either by attaching from s3 or import/export basebackup
|
||||
@@ -374,7 +373,7 @@ def test_tenant_relocation(
|
||||
"poetry",
|
||||
"run",
|
||||
"python",
|
||||
str(base_dir / "scripts/export_import_between_pageservers.py"),
|
||||
os.path.join(base_dir, "scripts/export_import_between_pageservers.py"),
|
||||
"--tenant-id",
|
||||
str(tenant_id),
|
||||
"--from-host",
|
||||
@@ -390,9 +389,9 @@ def test_tenant_relocation(
|
||||
"--to-pg-port",
|
||||
str(new_pageserver_pg_port),
|
||||
"--pg-distrib-dir",
|
||||
str(neon_env_builder.pg_distrib_dir),
|
||||
pg_distrib_dir,
|
||||
"--work-dir",
|
||||
str(test_output_dir),
|
||||
os.path.join(test_output_dir),
|
||||
"--tmp-pg-port",
|
||||
str(port_distributor.get_port()),
|
||||
]
|
||||
|
||||
@@ -338,7 +338,6 @@ def test_timeline_size_metrics(
|
||||
neon_simple_env: NeonEnv,
|
||||
test_output_dir: Path,
|
||||
port_distributor: PortDistributor,
|
||||
pg_distrib_dir: Path,
|
||||
pg_version: str,
|
||||
):
|
||||
env = neon_simple_env
|
||||
@@ -383,7 +382,7 @@ def test_timeline_size_metrics(
|
||||
tl_logical_size_metric = int(matches.group(1))
|
||||
|
||||
pgdatadir = test_output_dir / "pgdata-vanilla"
|
||||
pg_bin = PgBin(test_output_dir, pg_distrib_dir, pg_version)
|
||||
pg_bin = PgBin(test_output_dir, pg_version)
|
||||
port = port_distributor.get_port()
|
||||
with VanillaPostgres(pgdatadir, pg_bin, port) as vanilla_pg:
|
||||
vanilla_pg.configure([f"port={port}"])
|
||||
|
||||
@@ -30,6 +30,7 @@ from fixtures.neon_fixtures import (
|
||||
SafekeeperHttpClient,
|
||||
SafekeeperPort,
|
||||
available_remote_storages,
|
||||
neon_binpath,
|
||||
wait_for_last_record_lsn,
|
||||
wait_for_upload,
|
||||
)
|
||||
@@ -796,7 +797,6 @@ class SafekeeperEnv:
|
||||
repo_dir: Path,
|
||||
port_distributor: PortDistributor,
|
||||
pg_bin: PgBin,
|
||||
neon_binpath: Path,
|
||||
num_safekeepers: int = 1,
|
||||
):
|
||||
self.repo_dir = repo_dir
|
||||
@@ -808,7 +808,7 @@ class SafekeeperEnv:
|
||||
)
|
||||
self.pg_bin = pg_bin
|
||||
self.num_safekeepers = num_safekeepers
|
||||
self.bin_safekeeper = str(neon_binpath / "safekeeper")
|
||||
self.bin_safekeeper = os.path.join(str(neon_binpath), "safekeeper")
|
||||
self.safekeepers: Optional[List[subprocess.CompletedProcess[Any]]] = None
|
||||
self.postgres: Optional[ProposerPostgres] = None
|
||||
self.tenant_id: Optional[TenantId] = None
|
||||
@@ -911,10 +911,7 @@ class SafekeeperEnv:
|
||||
|
||||
|
||||
def test_safekeeper_without_pageserver(
|
||||
test_output_dir: str,
|
||||
port_distributor: PortDistributor,
|
||||
pg_bin: PgBin,
|
||||
neon_binpath: Path,
|
||||
test_output_dir: str, port_distributor: PortDistributor, pg_bin: PgBin
|
||||
):
|
||||
# Create the environment in the test-specific output dir
|
||||
repo_dir = Path(os.path.join(test_output_dir, "repo"))
|
||||
@@ -923,7 +920,6 @@ def test_safekeeper_without_pageserver(
|
||||
repo_dir,
|
||||
port_distributor,
|
||||
pg_bin,
|
||||
neon_binpath,
|
||||
)
|
||||
|
||||
with env:
|
||||
|
||||
@@ -1,6 +1,14 @@
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, PortDistributor, VanillaPostgres
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
PortDistributor,
|
||||
VanillaPostgres,
|
||||
base_dir,
|
||||
pg_distrib_dir,
|
||||
)
|
||||
from fixtures.types import TenantId
|
||||
|
||||
|
||||
@@ -9,8 +17,6 @@ def test_wal_restore(
|
||||
pg_bin: PgBin,
|
||||
test_output_dir: Path,
|
||||
port_distributor: PortDistributor,
|
||||
base_dir: Path,
|
||||
pg_distrib_dir: Path,
|
||||
):
|
||||
env = neon_env_builder.init_start()
|
||||
env.neon_cli.create_branch("test_wal_restore")
|
||||
@@ -20,13 +26,11 @@ def test_wal_restore(
|
||||
env.neon_cli.pageserver_stop()
|
||||
port = port_distributor.get_port()
|
||||
data_dir = test_output_dir / "pgsql.restored"
|
||||
with VanillaPostgres(
|
||||
data_dir, PgBin(test_output_dir, env.pg_distrib_dir, env.pg_version), port
|
||||
) as restored:
|
||||
with VanillaPostgres(data_dir, PgBin(test_output_dir, env.pg_version), port) as restored:
|
||||
pg_bin.run_capture(
|
||||
[
|
||||
str(base_dir / "libs/utils/scripts/restore_from_wal.sh"),
|
||||
str(pg_distrib_dir / f"v{env.pg_version}/bin"),
|
||||
os.path.join(base_dir, "libs/utils/scripts/restore_from_wal.sh"),
|
||||
os.path.join(pg_distrib_dir, "v{}".format(env.pg_version), "bin"),
|
||||
str(test_output_dir / "repo" / "safekeepers" / "sk1" / str(tenant_id) / "*"),
|
||||
str(data_dir),
|
||||
str(port),
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: e56b812dd8...c0284ce58e
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 39e3d745b3...e5cc262697
Reference in New Issue
Block a user