Support backpressure for sharding (#7100)

Add shard_number to PageserverFeedback and parse it on the compute side.
When compute receives a new ps_feedback, it calculates min LSNs among
feedbacks from all shards, and uses those LSNs for backpressure.

Add `test_sharding_backpressure` to verify that backpressure slows down
compute to wait for the slowest shard.
This commit is contained in:
Arthur Petukhovsky
2024-03-18 22:54:44 +01:00
committed by GitHub
parent 2bc2fd9cfd
commit ad5efb49ee
11 changed files with 336 additions and 149 deletions

View File

@@ -29,12 +29,10 @@ pub struct PageserverFeedback {
// Serialize with RFC3339 format.
#[serde(with = "serde_systemtime")]
pub replytime: SystemTime,
/// Used to track feedbacks from different shards. Always zero for unsharded tenants.
pub shard_number: u32,
}
// NOTE: Do not forget to increment this number when adding new fields to PageserverFeedback.
// Do not remove previously available fields because this might be backwards incompatible.
pub const PAGESERVER_FEEDBACK_FIELDS_NUMBER: u8 = 5;
impl PageserverFeedback {
pub fn empty() -> PageserverFeedback {
PageserverFeedback {
@@ -43,6 +41,7 @@ impl PageserverFeedback {
remote_consistent_lsn: Lsn::INVALID,
disk_consistent_lsn: Lsn::INVALID,
replytime: *PG_EPOCH,
shard_number: 0,
}
}
@@ -59,17 +58,26 @@ impl PageserverFeedback {
//
// TODO: change serialized fields names once all computes migrate to rename.
pub fn serialize(&self, buf: &mut BytesMut) {
buf.put_u8(PAGESERVER_FEEDBACK_FIELDS_NUMBER); // # of keys
let buf_ptr = buf.len();
buf.put_u8(0); // # of keys, will be filled later
let mut nkeys = 0;
nkeys += 1;
buf.put_slice(b"current_timeline_size\0");
buf.put_i32(8);
buf.put_u64(self.current_timeline_size);
nkeys += 1;
buf.put_slice(b"ps_writelsn\0");
buf.put_i32(8);
buf.put_u64(self.last_received_lsn.0);
nkeys += 1;
buf.put_slice(b"ps_flushlsn\0");
buf.put_i32(8);
buf.put_u64(self.disk_consistent_lsn.0);
nkeys += 1;
buf.put_slice(b"ps_applylsn\0");
buf.put_i32(8);
buf.put_u64(self.remote_consistent_lsn.0);
@@ -80,9 +88,19 @@ impl PageserverFeedback {
.expect("failed to serialize pg_replytime earlier than PG_EPOCH")
.as_micros() as i64;
nkeys += 1;
buf.put_slice(b"ps_replytime\0");
buf.put_i32(8);
buf.put_i64(timestamp);
if self.shard_number > 0 {
nkeys += 1;
buf.put_slice(b"shard_number\0");
buf.put_i32(4);
buf.put_u32(self.shard_number);
}
buf[buf_ptr] = nkeys;
}
// Deserialize PageserverFeedback message
@@ -125,9 +143,8 @@ impl PageserverFeedback {
}
b"shard_number" => {
let len = buf.get_i32();
// TODO: this will be implemented in the next update,
// for now, we just skip the value.
buf.advance(len as usize);
assert_eq!(len, 4);
rf.shard_number = buf.get_u32();
}
_ => {
let len = buf.get_i32();
@@ -200,10 +217,7 @@ mod tests {
rf.serialize(&mut data);
// Add an extra field to the buffer and adjust number of keys
if let Some(first) = data.first_mut() {
*first = PAGESERVER_FEEDBACK_FIELDS_NUMBER + 1;
}
data[0] += 1;
data.put_slice(b"new_field_one\0");
data.put_i32(8);
data.put_u64(42);

View File

@@ -324,11 +324,11 @@ extern "C" fn finish_sync_safekeepers(wp: *mut WalProposer, lsn: XLogRecPtr) {
}
}
extern "C" fn process_safekeeper_feedback(wp: *mut WalProposer) {
extern "C" fn process_safekeeper_feedback(wp: *mut WalProposer, sk: *mut Safekeeper) {
unsafe {
let callback_data = (*(*wp).config).callback_data;
let api = callback_data as *mut Box<dyn ApiImpl>;
(*api).process_safekeeper_feedback(&mut (*wp))
(*api).process_safekeeper_feedback(&mut (*wp), &mut (*sk));
}
}

View File

@@ -142,7 +142,7 @@ pub trait ApiImpl {
todo!()
}
fn process_safekeeper_feedback(&mut self, _wp: &mut WalProposer) {
fn process_safekeeper_feedback(&mut self, _wp: &mut WalProposer, _sk: &mut Safekeeper) {
todo!()
}

View File

@@ -448,6 +448,7 @@ pub(super) async fn handle_walreceiver_connection(
disk_consistent_lsn,
remote_consistent_lsn,
replytime: ts,
shard_number: timeline.tenant_shard_id.shard_number.0 as u32,
};
debug!("neon_status_update {status_update:?}");

View File

@@ -109,6 +109,8 @@ impl WalIngest {
self.checkpoint_modified = true;
}
failpoint_support::sleep_millis_async!("wal-ingest-record-sleep");
match decoded.xl_rmid {
pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
// Heap AM records need some special handling, because they modify VM pages

View File

@@ -70,7 +70,7 @@ static bool SendAppendRequests(Safekeeper *sk);
static bool RecvAppendResponses(Safekeeper *sk);
static XLogRecPtr CalculateMinFlushLsn(WalProposer *wp);
static XLogRecPtr GetAcknowledgedByQuorumWALPosition(WalProposer *wp);
static void HandleSafekeeperResponse(WalProposer *wp);
static void HandleSafekeeperResponse(WalProposer *wp, Safekeeper *sk);
static bool AsyncRead(Safekeeper *sk, char **buf, int *buf_size);
static bool AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg);
static bool BlockingWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperState success_state);
@@ -1405,7 +1405,6 @@ static bool
RecvAppendResponses(Safekeeper *sk)
{
WalProposer *wp = sk->wp;
XLogRecPtr newCommitLsn;
bool readAnything = false;
while (true)
@@ -1425,6 +1424,8 @@ RecvAppendResponses(Safekeeper *sk)
LSN_FORMAT_ARGS(sk->appendResponse.commitLsn),
sk->host, sk->port);
readAnything = true;
if (sk->appendResponse.term > wp->propTerm)
{
/*
@@ -1438,35 +1439,28 @@ RecvAppendResponses(Safekeeper *sk)
sk->appendResponse.term, wp->propTerm);
}
readAnything = true;
HandleSafekeeperResponse(wp, sk);
}
if (!readAnything)
return sk->state == SS_ACTIVE;
/* update commit_lsn */
newCommitLsn = GetAcknowledgedByQuorumWALPosition(wp);
/*
* Send the new value to all safekeepers.
*/
if (newCommitLsn > wp->commitLsn)
{
wp->commitLsn = newCommitLsn;
BroadcastAppendRequest(wp);
}
HandleSafekeeperResponse(wp);
return sk->state == SS_ACTIVE;
}
#define psfeedback_log(fmt, key, ...) \
wp_log(DEBUG2, "ParsePageserverFeedbackMessage: %s " fmt, key, __VA_ARGS__)
/* Parse a PageserverFeedback message, or the PageserverFeedback part of an AppendResponse */
static void
ParsePageserverFeedbackMessage(WalProposer *wp, StringInfo reply_message, PageserverFeedback *rf)
ParsePageserverFeedbackMessage(WalProposer *wp, StringInfo reply_message, PageserverFeedback *ps_feedback)
{
uint8 nkeys;
int i;
int32 len;
/* initialize the struct before parsing */
memset(ps_feedback, 0, sizeof(PageserverFeedback));
ps_feedback->present = true;
/* get number of custom keys */
nkeys = pq_getmsgbyte(reply_message);
@@ -1474,66 +1468,52 @@ ParsePageserverFeedbackMessage(WalProposer *wp, StringInfo reply_message, Pagese
for (i = 0; i < nkeys; i++)
{
const char *key = pq_getmsgstring(reply_message);
unsigned int value_len = pq_getmsgint(reply_message, sizeof(int32));
if (strcmp(key, "current_timeline_size") == 0)
{
pq_getmsgint(reply_message, sizeof(int32));
/* read value length */
rf->currentClusterSize = pq_getmsgint64(reply_message);
wp_log(DEBUG2, "ParsePageserverFeedbackMessage: current_timeline_size %lu",
rf->currentClusterSize);
Assert(value_len == sizeof(int64));
ps_feedback->currentClusterSize = pq_getmsgint64(reply_message);
psfeedback_log(UINT64_FORMAT, key, ps_feedback->currentClusterSize);
}
else if ((strcmp(key, "ps_writelsn") == 0) || (strcmp(key, "last_received_lsn") == 0))
{
pq_getmsgint(reply_message, sizeof(int32));
/* read value length */
rf->last_received_lsn = pq_getmsgint64(reply_message);
wp_log(DEBUG2, "ParsePageserverFeedbackMessage: last_received_lsn %X/%X",
LSN_FORMAT_ARGS(rf->last_received_lsn));
Assert(value_len == sizeof(int64));
ps_feedback->last_received_lsn = pq_getmsgint64(reply_message);
psfeedback_log("%X/%X", key, LSN_FORMAT_ARGS(ps_feedback->last_received_lsn));
}
else if ((strcmp(key, "ps_flushlsn") == 0) || (strcmp(key, "disk_consistent_lsn") == 0))
{
pq_getmsgint(reply_message, sizeof(int32));
/* read value length */
rf->disk_consistent_lsn = pq_getmsgint64(reply_message);
wp_log(DEBUG2, "ParsePageserverFeedbackMessage: disk_consistent_lsn %X/%X",
LSN_FORMAT_ARGS(rf->disk_consistent_lsn));
Assert(value_len == sizeof(int64));
ps_feedback->disk_consistent_lsn = pq_getmsgint64(reply_message);
psfeedback_log("%X/%X", key, LSN_FORMAT_ARGS(ps_feedback->disk_consistent_lsn));
}
else if ((strcmp(key, "ps_applylsn") == 0) || (strcmp(key, "remote_consistent_lsn") == 0))
{
pq_getmsgint(reply_message, sizeof(int32));
/* read value length */
rf->remote_consistent_lsn = pq_getmsgint64(reply_message);
wp_log(DEBUG2, "ParsePageserverFeedbackMessage: remote_consistent_lsn %X/%X",
LSN_FORMAT_ARGS(rf->remote_consistent_lsn));
Assert(value_len == sizeof(int64));
ps_feedback->remote_consistent_lsn = pq_getmsgint64(reply_message);
psfeedback_log("%X/%X", key, LSN_FORMAT_ARGS(ps_feedback->remote_consistent_lsn));
}
else if ((strcmp(key, "ps_replytime") == 0) || (strcmp(key, "replytime") == 0))
{
pq_getmsgint(reply_message, sizeof(int32));
/* read value length */
rf->replytime = pq_getmsgint64(reply_message);
{
char *replyTimeStr;
/* Copy because timestamptz_to_str returns a static buffer */
replyTimeStr = pstrdup(timestamptz_to_str(rf->replytime));
wp_log(DEBUG2, "ParsePageserverFeedbackMessage: replytime %lu reply_time: %s",
rf->replytime, replyTimeStr);
pfree(replyTimeStr);
}
Assert(value_len == sizeof(int64));
ps_feedback->replytime = pq_getmsgint64(reply_message);
psfeedback_log("%s", key, timestamptz_to_str(ps_feedback->replytime));
}
else if (strcmp(key, "shard_number") == 0)
{
Assert(value_len == sizeof(uint32));
ps_feedback->shard_number = pq_getmsgint(reply_message, sizeof(uint32));
psfeedback_log("%u", key, ps_feedback->shard_number);
}
else
{
len = pq_getmsgint(reply_message, sizeof(int32));
/* read value length */
/*
* Skip unknown keys to support backward compatibile protocol
* changes
*/
wp_log(LOG, "ParsePageserverFeedbackMessage: unknown key: %s len %d", key, len);
pq_getmsgbytes(reply_message, len);
wp_log(LOG, "ParsePageserverFeedbackMessage: unknown key: %s len %d", key, value_len);
pq_getmsgbytes(reply_message, value_len);
};
}
}
@@ -1630,12 +1610,30 @@ GetDonor(WalProposer *wp, XLogRecPtr *donor_lsn)
return donor;
}
/*
* Process AppendResponse message from safekeeper.
*/
static void
HandleSafekeeperResponse(WalProposer *wp)
HandleSafekeeperResponse(WalProposer *wp, Safekeeper *sk)
{
XLogRecPtr candidateTruncateLsn;
XLogRecPtr newCommitLsn;
wp->api.process_safekeeper_feedback(wp);
newCommitLsn = GetAcknowledgedByQuorumWALPosition(wp);
if (newCommitLsn > wp->commitLsn)
{
wp->commitLsn = newCommitLsn;
/* Send new value to all safekeepers. */
BroadcastAppendRequest(wp);
}
/*
* Unlock syncrep waiters, update ps_feedback, CheckGracefulShutdown().
* The last one will terminate the process if the shutdown is requested
* and WAL is committed by the quorum. BroadcastAppendRequest() should be
* called to notify safekeepers about the new commitLsn.
*/
wp->api.process_safekeeper_feedback(wp, sk);
/*
* Try to advance truncateLsn -- the last record flushed to all
@@ -1811,8 +1809,10 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg)
msg->hs.ts = pq_getmsgint64_le(&s);
msg->hs.xmin.value = pq_getmsgint64_le(&s);
msg->hs.catalog_xmin.value = pq_getmsgint64_le(&s);
if (buf_size > APPENDRESPONSE_FIXEDPART_SIZE)
ParsePageserverFeedbackMessage(wp, &s, &msg->rf);
if (s.len > s.cursor)
ParsePageserverFeedbackMessage(wp, &s, &msg->ps_feedback);
else
msg->ps_feedback.present = false;
pq_getmsgend(&s);
return true;
}

View File

@@ -10,6 +10,7 @@
#include "libpqwalproposer.h"
#include "neon_walreader.h"
#include "pagestore_client.h"
#define SK_MAGIC 0xCafeCeefu
#define SK_PROTOCOL_VERSION 2
@@ -269,6 +270,8 @@ typedef struct HotStandbyFeedback
typedef struct PageserverFeedback
{
/* true if AppendResponse contains this feedback */
bool present;
/* current size of the timeline on pageserver */
uint64 currentClusterSize;
/* standby_status_update fields that safekeeper received from pageserver */
@@ -276,14 +279,21 @@ typedef struct PageserverFeedback
XLogRecPtr disk_consistent_lsn;
XLogRecPtr remote_consistent_lsn;
TimestampTz replytime;
uint32 shard_number;
} PageserverFeedback;
typedef struct WalproposerShmemState
{
slock_t mutex;
PageserverFeedback feedback;
term_t mineLastElectedTerm;
pg_atomic_uint64 backpressureThrottlingTime;
/* last feedback from each shard */
PageserverFeedback shard_ps_feedback[MAX_SHARDS];
int num_shards;
/* aggregated feedback with min LSNs across shards */
PageserverFeedback min_ps_feedback;
} WalproposerShmemState;
/*
@@ -307,12 +317,12 @@ typedef struct AppendResponse
/* Feedback received from pageserver includes standby_status_update fields */
/* and custom neon feedback. */
/* This part of the message is extensible. */
PageserverFeedback rf;
PageserverFeedback ps_feedback;
} AppendResponse;
/* PageserverFeedback is extensible part of the message that is parsed separately */
/* Other fields are fixed part */
#define APPENDRESPONSE_FIXEDPART_SIZE offsetof(AppendResponse, rf)
#define APPENDRESPONSE_FIXEDPART_SIZE 56
struct WalProposer;
typedef struct WalProposer WalProposer;
@@ -560,11 +570,11 @@ typedef struct walproposer_api
void (*finish_sync_safekeepers) (WalProposer *wp, XLogRecPtr lsn);
/*
* Called after every new message from the safekeeper. Used to propagate
* Called after every AppendResponse from the safekeeper. Used to propagate
* backpressure feedback and to confirm WAL persistence (has been commited
* on the quorum of safekeepers).
*/
void (*process_safekeeper_feedback) (WalProposer *wp);
void (*process_safekeeper_feedback) (WalProposer *wp, Safekeeper *sk);
/*
* Write a log message to the internal log processor. This is used only

View File

@@ -63,7 +63,6 @@ char *wal_acceptors_list = "";
int wal_acceptor_reconnect_timeout = 1000;
int wal_acceptor_connection_timeout = 10000;
static AppendResponse quorumFeedback;
static WalproposerShmemState *walprop_shared;
static WalProposerConfig walprop_config;
static XLogRecPtr sentPtr = InvalidXLogRecPtr;
@@ -71,6 +70,10 @@ static const walproposer_api walprop_pg;
static volatile sig_atomic_t got_SIGUSR2 = false;
static bool reported_sigusr2 = false;
static XLogRecPtr standby_flush_lsn = InvalidXLogRecPtr;
static XLogRecPtr standby_apply_lsn = InvalidXLogRecPtr;
static HotStandbyFeedback agg_hs_feedback;
static void nwp_shmem_startup_hook(void);
static void nwp_register_gucs(void);
static void nwp_prepare_shmem(void);
@@ -402,21 +405,58 @@ walprop_pg_get_shmem_state(WalProposer *wp)
return walprop_shared;
}
static void
replication_feedback_set(PageserverFeedback *rf)
/*
* Record new ps_feedback in the array with shards and update min_feedback.
*/
static PageserverFeedback
record_pageserver_feedback(PageserverFeedback *ps_feedback)
{
PageserverFeedback min_feedback;
Assert(ps_feedback->present);
Assert(ps_feedback->shard_number < MAX_SHARDS);
SpinLockAcquire(&walprop_shared->mutex);
memcpy(&walprop_shared->feedback, rf, sizeof(PageserverFeedback));
/* Update the number of shards */
if (ps_feedback->shard_number + 1 > walprop_shared->num_shards)
walprop_shared->num_shards = ps_feedback->shard_number + 1;
/* Update the feedback */
memcpy(&walprop_shared->shard_ps_feedback[ps_feedback->shard_number], ps_feedback, sizeof(PageserverFeedback));
/* Calculate min LSNs */
memcpy(&min_feedback, ps_feedback, sizeof(PageserverFeedback));
for (int i = 0; i < walprop_shared->num_shards; i++)
{
PageserverFeedback *feedback = &walprop_shared->shard_ps_feedback[i];
if (feedback->present)
{
if (min_feedback.last_received_lsn == InvalidXLogRecPtr || feedback->last_received_lsn < min_feedback.last_received_lsn)
min_feedback.last_received_lsn = feedback->last_received_lsn;
if (min_feedback.disk_consistent_lsn == InvalidXLogRecPtr || feedback->disk_consistent_lsn < min_feedback.disk_consistent_lsn)
min_feedback.disk_consistent_lsn = feedback->disk_consistent_lsn;
if (min_feedback.remote_consistent_lsn == InvalidXLogRecPtr || feedback->remote_consistent_lsn < min_feedback.remote_consistent_lsn)
min_feedback.remote_consistent_lsn = feedback->remote_consistent_lsn;
}
}
/* Copy min_feedback back to shmem */
memcpy(&walprop_shared->min_ps_feedback, &min_feedback, sizeof(PageserverFeedback));
SpinLockRelease(&walprop_shared->mutex);
return min_feedback;
}
void
replication_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRecPtr *applyLsn)
{
SpinLockAcquire(&walprop_shared->mutex);
*writeLsn = walprop_shared->feedback.last_received_lsn;
*flushLsn = walprop_shared->feedback.disk_consistent_lsn;
*applyLsn = walprop_shared->feedback.remote_consistent_lsn;
*writeLsn = walprop_shared->min_ps_feedback.last_received_lsn;
*flushLsn = walprop_shared->min_ps_feedback.disk_consistent_lsn;
*applyLsn = walprop_shared->min_ps_feedback.remote_consistent_lsn;
SpinLockRelease(&walprop_shared->mutex);
}
@@ -1869,39 +1909,6 @@ CheckGracefulShutdown(WalProposer *wp)
}
}
/*
* Choose most advanced PageserverFeedback and set it to *rf.
*/
static void
GetLatestNeonFeedback(PageserverFeedback *rf, WalProposer *wp)
{
int latest_safekeeper = 0;
XLogRecPtr last_received_lsn = InvalidXLogRecPtr;
for (int i = 0; i < wp->n_safekeepers; i++)
{
if (wp->safekeeper[i].appendResponse.rf.last_received_lsn > last_received_lsn)
{
latest_safekeeper = i;
last_received_lsn = wp->safekeeper[i].appendResponse.rf.last_received_lsn;
}
}
rf->currentClusterSize = wp->safekeeper[latest_safekeeper].appendResponse.rf.currentClusterSize;
rf->last_received_lsn = wp->safekeeper[latest_safekeeper].appendResponse.rf.last_received_lsn;
rf->disk_consistent_lsn = wp->safekeeper[latest_safekeeper].appendResponse.rf.disk_consistent_lsn;
rf->remote_consistent_lsn = wp->safekeeper[latest_safekeeper].appendResponse.rf.remote_consistent_lsn;
rf->replytime = wp->safekeeper[latest_safekeeper].appendResponse.rf.replytime;
wpg_log(DEBUG2, "GetLatestNeonFeedback: currentClusterSize %lu,"
" last_received_lsn %X/%X, disk_consistent_lsn %X/%X, remote_consistent_lsn %X/%X, replytime %lu",
rf->currentClusterSize,
LSN_FORMAT_ARGS(rf->last_received_lsn),
LSN_FORMAT_ARGS(rf->disk_consistent_lsn),
LSN_FORMAT_ARGS(rf->remote_consistent_lsn),
rf->replytime);
}
/*
* Combine hot standby feedbacks from all safekeepers.
*/
@@ -1949,26 +1956,38 @@ CombineHotStanbyFeedbacks(HotStandbyFeedback *hs, WalProposer *wp)
* None of that is functional in sync-safekeepers.
*/
static void
walprop_pg_process_safekeeper_feedback(WalProposer *wp)
walprop_pg_process_safekeeper_feedback(WalProposer *wp, Safekeeper *sk)
{
HotStandbyFeedback hsFeedback;
XLogRecPtr oldDiskConsistentLsn;
HotStandbyFeedback hsFeedback;
bool needToAdvanceSlot = false;
if (wp->config->syncSafekeepers)
return;
oldDiskConsistentLsn = quorumFeedback.rf.disk_consistent_lsn;
/* Get PageserverFeedback fields from the most advanced safekeeper */
GetLatestNeonFeedback(&quorumFeedback.rf, wp);
replication_feedback_set(&quorumFeedback.rf);
SetZenithCurrentClusterSize(quorumFeedback.rf.currentClusterSize);
if (wp->commitLsn > quorumFeedback.flushLsn || oldDiskConsistentLsn != quorumFeedback.rf.disk_consistent_lsn)
/* handle fresh ps_feedback */
if (sk->appendResponse.ps_feedback.present)
{
if (wp->commitLsn > quorumFeedback.flushLsn)
quorumFeedback.flushLsn = wp->commitLsn;
PageserverFeedback min_feedback = record_pageserver_feedback(&sk->appendResponse.ps_feedback);
/* Only one main shard sends non-zero currentClusterSize */
if (sk->appendResponse.ps_feedback.currentClusterSize > 0)
SetZenithCurrentClusterSize(sk->appendResponse.ps_feedback.currentClusterSize);
if (min_feedback.disk_consistent_lsn != standby_apply_lsn)
{
standby_apply_lsn = min_feedback.disk_consistent_lsn;
needToAdvanceSlot = true;
}
}
if (wp->commitLsn > standby_flush_lsn)
{
standby_flush_lsn = wp->commitLsn;
needToAdvanceSlot = true;
}
if (needToAdvanceSlot)
{
/*
* Advance the replication slot to commitLsn. WAL before it is
* hardened and will be fetched from one of safekeepers by
@@ -1977,23 +1996,23 @@ walprop_pg_process_safekeeper_feedback(WalProposer *wp)
* Also wakes up syncrep waiters.
*/
ProcessStandbyReply(
/* write_lsn - This is what durably stored in WAL service. */
quorumFeedback.flushLsn,
/* flush_lsn - This is what durably stored in WAL service. */
quorumFeedback.flushLsn,
/* write_lsn - This is what durably stored in safekeepers quorum. */
standby_flush_lsn,
/* flush_lsn - This is what durably stored in safekeepers quorum. */
standby_flush_lsn,
/*
* apply_lsn - This is what processed and durably saved at*
* pageserver.
*/
quorumFeedback.rf.disk_consistent_lsn,
standby_apply_lsn,
walprop_pg_get_current_timestamp(wp), false);
}
CombineHotStanbyFeedbacks(&hsFeedback, wp);
if (hsFeedback.ts != 0 && memcmp(&hsFeedback, &quorumFeedback.hs, sizeof hsFeedback) != 0)
if (hsFeedback.ts != 0 && memcmp(&hsFeedback, &agg_hs_feedback, sizeof hsFeedback) != 0)
{
quorumFeedback.hs = hsFeedback;
agg_hs_feedback = hsFeedback;
ProcessStandbyHSFeedback(hsFeedback.ts,
XidFromFullTransactionId(hsFeedback.xmin),
EpochFromFullTransactionId(hsFeedback.xmin),

View File

@@ -224,6 +224,16 @@ impl SimulationApi {
})
.collect::<Vec<_>>();
let empty_feedback = PageserverFeedback {
present: false,
currentClusterSize: 0,
last_received_lsn: 0,
disk_consistent_lsn: 0,
remote_consistent_lsn: 0,
replytime: 0,
shard_number: 0,
};
Self {
os: args.os,
safekeepers: RefCell::new(sk_conns),
@@ -232,15 +242,11 @@ impl SimulationApi {
last_logged_commit_lsn: 0,
shmem: UnsafeCell::new(walproposer::bindings::WalproposerShmemState {
mutex: 0,
feedback: PageserverFeedback {
currentClusterSize: 0,
last_received_lsn: 0,
disk_consistent_lsn: 0,
remote_consistent_lsn: 0,
replytime: 0,
},
mineLastElectedTerm: 0,
backpressureThrottlingTime: pg_atomic_uint64 { value: 0 },
shard_ps_feedback: [empty_feedback; 128],
num_shards: 0,
min_ps_feedback: empty_feedback,
}),
config: args.config,
event_set: RefCell::new(None),
@@ -598,7 +604,11 @@ impl ApiImpl for SimulationApi {
}
}
fn process_safekeeper_feedback(&mut self, wp: &mut walproposer::bindings::WalProposer) {
fn process_safekeeper_feedback(
&mut self,
wp: &mut walproposer::bindings::WalProposer,
_sk: &mut walproposer::bindings::Safekeeper,
) {
debug!("process_safekeeper_feedback, commit_lsn={}", wp.commitLsn);
if wp.commitLsn > self.last_logged_commit_lsn {
self.os.log_event(format!("commit_lsn;{}", wp.commitLsn));

View File

@@ -1,5 +1,5 @@
import threading
from typing import Optional
from typing import Any, Optional
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
@@ -32,6 +32,7 @@ class Workload:
tenant_id: TenantId,
timeline_id: TimelineId,
branch_name: Optional[str] = None,
endpoint_opts: Optional[dict[str, Any]] = None,
):
self.env = env
self.tenant_id = tenant_id
@@ -45,6 +46,7 @@ class Workload:
self.churn_cursor = 0
self._endpoint: Optional[Endpoint] = None
self._endpoint_opts = endpoint_opts or {}
def reconfigure(self):
"""
@@ -66,6 +68,7 @@ class Workload:
tenant_id=self.tenant_id,
pageserver_id=pageserver_id,
endpoint_id=endpoint_id,
**self._endpoint_opts,
)
self._endpoint.start(pageserver_id=pageserver_id)
else:

View File

@@ -1,4 +1,5 @@
import os
import time
from typing import Dict, List, Optional, Union
import pytest
@@ -837,3 +838,130 @@ def test_sharding_split_failures(
assert_split_done()
env.storage_controller.consistency_check()
def test_sharding_backpressure(neon_env_builder: NeonEnvBuilder):
"""
Check a scenario when one of the shards is much slower than others.
Without backpressure, this would lead to the slow shard falling behind
and eventually causing WAL timeouts.
"""
shard_count = 4
neon_env_builder.num_pageservers = shard_count
# 256KiB stripes: enable getting some meaningful data distribution without
# writing large quantities of data in this test. The stripe size is given
# in number of 8KiB pages.
stripe_size = 32
env = neon_env_builder.init_start(
initial_tenant_shard_count=shard_count, initial_tenant_shard_stripe_size=stripe_size
)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
pageservers = dict((int(p.id), p) for p in env.pageservers)
shards = env.storage_controller.locate(tenant_id)
# Slow down one of the shards, around ~1MB/s
pageservers[4].http_client().configure_failpoints(("wal-ingest-record-sleep", "5%sleep(1)"))
def shards_info():
infos = []
for shard in shards:
node_id = int(shard["node_id"])
pageserver = pageservers[node_id]
shard_info = pageserver.http_client().timeline_detail(shard["shard_id"], timeline_id)
infos.append(shard_info)
last_record_lsn = shard_info["last_record_lsn"]
current_physical_size = shard_info["current_physical_size"]
log.info(
f"Shard on pageserver {node_id}: lsn={last_record_lsn}, size={current_physical_size}"
)
return infos
shards_info()
workload = Workload(
env,
tenant_id,
timeline_id,
branch_name="main",
endpoint_opts={
"config_lines": [
# Tip: set to 100MB to make the test fail
"max_replication_write_lag=1MB",
],
},
)
workload.init()
endpoint = workload.endpoint()
# on 2024-03-05, the default config on prod was [15MB, 10GB, null]
res = endpoint.safe_psql_many(
[
"SHOW max_replication_write_lag",
"SHOW max_replication_flush_lag",
"SHOW max_replication_apply_lag",
]
)
log.info(f"backpressure config: {res}")
last_flush_lsn = None
last_timestamp = None
def update_write_lsn():
nonlocal last_flush_lsn
nonlocal last_timestamp
res = endpoint.safe_psql(
"""
SELECT
pg_wal_lsn_diff(pg_current_wal_flush_lsn(), received_lsn) as received_lsn_lag,
received_lsn,
pg_current_wal_flush_lsn() as flush_lsn,
neon.backpressure_throttling_time() as throttling_time
FROM neon.backpressure_lsns();
""",
dbname="postgres",
)[0]
log.info(
f"received_lsn_lag = {res[0]}, received_lsn = {res[1]}, flush_lsn = {res[2]}, throttling_time = {res[3]}"
)
lsn = Lsn(res[2])
now = time.time()
if last_timestamp is not None:
delta = now - last_timestamp
delta_bytes = lsn - last_flush_lsn
avg_speed = delta_bytes / delta / 1024 / 1024
log.info(
f"flush_lsn {lsn}, written {delta_bytes/1024}kb for {delta:.3f}s, avg_speed {avg_speed:.3f} MiB/s"
)
last_flush_lsn = lsn
last_timestamp = now
update_write_lsn()
workload.write_rows(4096, upload=False)
workload.write_rows(4096, upload=False)
workload.write_rows(4096, upload=False)
workload.write_rows(4096, upload=False)
workload.validate()
update_write_lsn()
shards_info()
for _write_iter in range(30):
# approximately 1MB of data
workload.write_rows(8000, upload=False)
update_write_lsn()
infos = shards_info()
min_lsn = min(Lsn(info["last_record_lsn"]) for info in infos)
max_lsn = max(Lsn(info["last_record_lsn"]) for info in infos)
diff = max_lsn - min_lsn
assert diff < 2 * 1024 * 1024, f"LSN diff={diff}, expected diff < 2MB due to backpressure"