Compare commits

..

9 Commits

Author SHA1 Message Date
Arpad Müller
fec5530c9e fixes 2025-04-02 20:02:20 +02:00
Arpad Müller
8d5d6b24df wip 2025-04-02 19:40:07 +02:00
Arpad Müller
0a5b9506ae cargo fmt 2025-04-02 16:03:01 +02:00
Arpad Müller
48d712af1b ruff format 2025-04-02 14:59:05 +02:00
Arpad Müller
1a78e9e9cf Add test_storcon_create_delete_sk_down 2025-04-01 20:19:18 +02:00
Arpad Müller
484208b5a8 logging 2025-04-01 20:17:10 +02:00
Arpad Müller
26d636ec14 Fix expected tenant deletion response 2025-04-01 20:16:53 +02:00
Arpad Müller
579069d74b Fix list_pending_ops which was broken 2025-04-01 20:16:53 +02:00
Arpad Müller
a02dcaea6b Don't send a body for deletion requests to safekeepers, () is not enough 2025-04-01 19:31:00 +02:00
17 changed files with 289 additions and 909 deletions

View File

@@ -227,6 +227,8 @@ pub struct TimelineDeleteResult {
pub dir_existed: bool,
}
pub type TenantDeleteResult = std::collections::HashMap<String, TimelineDeleteResult>;
fn lsn_invalid() -> Lsn {
Lsn::INVALID
}

View File

@@ -3774,7 +3774,7 @@ pub fn make_router(
)
.put(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/mark_invisible",
|r| api_handler( r, timeline_mark_invisible_handler),
|r| testing_api_handler("mark timeline invisible", r, timeline_mark_invisible_handler),
)
.put(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/checkpoint",

View File

@@ -11526,255 +11526,4 @@ mod tests {
Ok(())
}
#[cfg(feature = "testing")]
#[tokio::test]
async fn test_synthetic_size_calculation_with_invisible_branches() -> anyhow::Result<()> {
use pageserver_api::models::TimelineVisibilityState;
use crate::tenant::size::gather_inputs;
let tenant_conf = pageserver_api::models::TenantConfig {
// Ensure that we don't compute gc_cutoffs (which needs reading the layer files)
pitr_interval: Some(Duration::ZERO),
..Default::default()
};
let harness = TenantHarness::create_custom(
"test_synthetic_size_calculation_with_invisible_branches",
tenant_conf,
TenantId::generate(),
ShardIdentity::unsharded(),
Generation::new(0xdeadbeef),
)
.await?;
let (tenant, ctx) = harness.load().await;
let main_tline = tenant
.create_test_timeline_with_layers(
TIMELINE_ID,
Lsn(0x10),
DEFAULT_PG_VERSION,
&ctx,
vec![],
vec![],
vec![],
Lsn(0x100),
)
.await?;
let snapshot1 = TimelineId::from_array(hex!("11223344556677881122334455667790"));
tenant
.branch_timeline_test_with_layers(
&main_tline,
snapshot1,
Some(Lsn(0x20)),
&ctx,
vec![],
vec![],
Lsn(0x50),
)
.await?;
let snapshot2 = TimelineId::from_array(hex!("11223344556677881122334455667791"));
tenant
.branch_timeline_test_with_layers(
&main_tline,
snapshot2,
Some(Lsn(0x30)),
&ctx,
vec![],
vec![],
Lsn(0x50),
)
.await?;
let snapshot3 = TimelineId::from_array(hex!("11223344556677881122334455667792"));
tenant
.branch_timeline_test_with_layers(
&main_tline,
snapshot3,
Some(Lsn(0x40)),
&ctx,
vec![],
vec![],
Lsn(0x50),
)
.await?;
let limit = Arc::new(Semaphore::new(1));
let max_retention_period = None;
let mut logical_size_cache = HashMap::new();
let cause = LogicalSizeCalculationCause::EvictionTaskImitation;
let cancel = CancellationToken::new();
let inputs = gather_inputs(
&tenant,
&limit,
max_retention_period,
&mut logical_size_cache,
cause,
&cancel,
&ctx,
)
.instrument(info_span!(
"gather_inputs",
tenant_id = "unknown",
shard_id = "unknown",
))
.await?;
use crate::tenant::size::{LsnKind, ModelInputs, SegmentMeta};
use LsnKind::*;
use tenant_size_model::Segment;
let ModelInputs { mut segments, .. } = inputs;
segments.retain(|s| s.timeline_id == TIMELINE_ID);
for segment in segments.iter_mut() {
segment.segment.parent = None; // We don't care about the parent for the test
segment.segment.size = None; // We don't care about the size for the test
}
assert_eq!(
segments,
[
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x10,
size: None,
needed: false,
},
timeline_id: TIMELINE_ID,
kind: BranchStart,
},
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x20,
size: None,
needed: false,
},
timeline_id: TIMELINE_ID,
kind: BranchPoint,
},
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x30,
size: None,
needed: false,
},
timeline_id: TIMELINE_ID,
kind: BranchPoint,
},
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x40,
size: None,
needed: false,
},
timeline_id: TIMELINE_ID,
kind: BranchPoint,
},
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x100,
size: None,
needed: false,
},
timeline_id: TIMELINE_ID,
kind: GcCutOff,
}, // we need to retain everything above the last branch point
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x100,
size: None,
needed: true,
},
timeline_id: TIMELINE_ID,
kind: BranchEnd,
},
]
);
main_tline
.remote_client
.schedule_index_upload_for_timeline_invisible_state(
TimelineVisibilityState::Invisible,
)?;
main_tline.remote_client.wait_completion().await?;
let inputs = gather_inputs(
&tenant,
&limit,
max_retention_period,
&mut logical_size_cache,
cause,
&cancel,
&ctx,
)
.instrument(info_span!(
"gather_inputs",
tenant_id = "unknown",
shard_id = "unknown",
))
.await?;
let ModelInputs { mut segments, .. } = inputs;
segments.retain(|s| s.timeline_id == TIMELINE_ID);
for segment in segments.iter_mut() {
segment.segment.parent = None; // We don't care about the parent for the test
segment.segment.size = None; // We don't care about the size for the test
}
assert_eq!(
segments,
[
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x10,
size: None,
needed: false,
},
timeline_id: TIMELINE_ID,
kind: BranchStart,
},
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x20,
size: None,
needed: false,
},
timeline_id: TIMELINE_ID,
kind: BranchPoint,
},
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x30,
size: None,
needed: false,
},
timeline_id: TIMELINE_ID,
kind: BranchPoint,
},
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x40,
size: None,
needed: false,
},
timeline_id: TIMELINE_ID,
kind: BranchPoint,
},
SegmentMeta {
segment: Segment {
parent: None,
lsn: 0x40, // Branch end LSN == last branch point LSN
size: None,
needed: true,
},
timeline_id: TIMELINE_ID,
kind: BranchEnd,
},
]
);
Ok(())
}
}

View File

@@ -33,7 +33,7 @@ pub struct ModelInputs {
}
/// A [`Segment`], with some extra information for display purposes
#[derive(Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct SegmentMeta {
pub segment: Segment,
pub timeline_id: TimelineId,
@@ -248,8 +248,6 @@ pub(super) async fn gather_inputs(
None
};
let branch_is_invisible = timeline.is_invisible() == Some(true);
let lease_points = gc_info
.leases
.keys()
@@ -273,10 +271,7 @@ pub(super) async fn gather_inputs(
.map(|(lsn, _child_id, _is_offloaded)| (lsn, LsnKind::BranchPoint))
.collect::<Vec<_>>();
if !branch_is_invisible {
// Do not count lease points for invisible branches.
lsns.extend(lease_points.iter().map(|&lsn| (lsn, LsnKind::LeasePoint)));
}
lsns.extend(lease_points.iter().map(|&lsn| (lsn, LsnKind::LeasePoint)));
drop(gc_info);
@@ -292,9 +287,7 @@ pub(super) async fn gather_inputs(
// Add a point for the PITR cutoff
let branch_start_needed = next_pitr_cutoff <= branch_start_lsn;
if !branch_start_needed && !branch_is_invisible {
// Only add the GcCutOff point when the timeline is visible; otherwise, do not compute the size for the LSN
// range from the last branch point to the latest data.
if !branch_start_needed {
lsns.push((next_pitr_cutoff, LsnKind::GcCutOff));
}
@@ -380,19 +373,11 @@ pub(super) async fn gather_inputs(
}
}
let branch_end_lsn = if branch_is_invisible {
// If the branch is invisible, the branch end is the last requested LSN (likely a branch cutoff point).
segments.last().unwrap().segment.lsn
} else {
// Otherwise, the branch end is the last record LSN.
last_record_lsn.0
};
// Current end of the timeline
segments.push(SegmentMeta {
segment: Segment {
parent: Some(parent),
lsn: branch_end_lsn,
lsn: last_record_lsn.0,
size: None, // Filled in later, if necessary
needed: true,
},
@@ -624,7 +609,6 @@ async fn calculate_logical_size(
Ok(TimelineAtLsnSizeResult(timeline, lsn, size_res))
}
#[cfg(test)]
#[test]
fn verify_size_for_multiple_branches() {
// this is generated from integration test test_tenant_size_with_multiple_branches, but this way
@@ -782,7 +766,6 @@ fn verify_size_for_multiple_branches() {
assert_eq!(inputs.calculate(), 37_851_408);
}
#[cfg(test)]
#[test]
fn verify_size_for_one_branch() {
let doc = r#"

View File

@@ -2215,10 +2215,6 @@ impl Timeline {
self.remote_client.is_archived()
}
pub(crate) fn is_invisible(&self) -> Option<bool> {
self.remote_client.is_invisible()
}
pub(crate) fn is_stopping(&self) -> bool {
self.current_state() == TimelineState::Stopping
}

View File

@@ -99,9 +99,6 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
wp->config = config;
wp->api = api;
wp->state = WPS_COLLECTING_TERMS;
wp->mconf.generation = INVALID_GENERATION;
wp->mconf.members.len = 0;
wp->mconf.new_members.len = 0;
wp_log(LOG, "neon.safekeepers=%s", wp->config->safekeepers_list);
@@ -173,8 +170,6 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
if (wp->config->proto_version != 2 && wp->config->proto_version != 3)
wp_log(FATAL, "unsupported safekeeper protocol version %d", wp->config->proto_version);
if (wp->safekeepers_generation > 0 && wp->config->proto_version < 3)
wp_log(FATAL, "enabling generations requires protocol version 3");
wp_log(LOG, "using safekeeper protocol version %d", wp->config->proto_version);
/* Fill the greeting package */
@@ -219,7 +214,7 @@ WalProposerFree(WalProposer *wp)
static bool
WalProposerGenerationsEnabled(WalProposer *wp)
{
return wp->safekeepers_generation != INVALID_GENERATION;
return wp->safekeepers_generation != 0;
}
/*
@@ -728,176 +723,13 @@ SendProposerGreeting(Safekeeper *sk)
BlockingWrite(sk, sk->outbuf.data, sk->outbuf.len, SS_HANDSHAKE_RECV);
}
/*
* Assuming `sk` sent its node id, find such member(s) in wp->mconf and set ptr in
* members_safekeepers & new_members_safekeepers to sk.
*/
static void
UpdateMemberSafekeeperPtr(WalProposer *wp, Safekeeper *sk)
{
/* members_safekeepers etc are fixed size, sanity check mconf size */
if (wp->mconf.members.len > MAX_SAFEKEEPERS)
wp_log(FATAL, "too many members %d in mconf", wp->mconf.members.len);
if (wp->mconf.new_members.len > MAX_SAFEKEEPERS)
wp_log(FATAL, "too many new_members %d in mconf", wp->mconf.new_members.len);
/* node id is not known until greeting is received */
if (sk->state < SS_WAIT_VOTING)
return;
/* 0 is assumed to be invalid node id, should never happen */
if (sk->greetResponse.nodeId == 0)
{
wp_log(WARNING, "safekeeper %s:%s sent zero node id", sk->host, sk->port);
return;
}
for (uint32 i = 0; i < wp->mconf.members.len; i++)
{
SafekeeperId *sk_id = &wp->mconf.members.m[i];
if (wp->mconf.members.m[i].node_id == sk->greetResponse.nodeId)
{
/*
* If mconf or list of safekeepers to connect to changed (the
* latter always currently goes through restart though),
* ResetMemberSafekeeperPtrs is expected to be called before
* UpdateMemberSafekeeperPtr. So, other value suggests that we are
* connected to the same sk under different host name, complain
* about that.
*/
if (wp->members_safekeepers[i] != NULL && wp->members_safekeepers[i] != sk)
{
wp_log(WARNING, "safekeeper {id = %lu, ep = %s:%u } in members[%u] is already mapped to connection slot %lu",
sk_id->node_id, sk_id->host, sk_id->port, i, wp->members_safekeepers[i] - wp->safekeeper);
}
wp_log(LOG, "safekeeper {id = %lu, ep = %s:%u } in members[%u] mapped to connection slot %lu",
sk_id->node_id, sk_id->host, sk_id->port, i, sk - wp->safekeeper);
wp->members_safekeepers[i] = sk;
}
}
/* repeat for new_members */
for (uint32 i = 0; i < wp->mconf.new_members.len; i++)
{
SafekeeperId *sk_id = &wp->mconf.new_members.m[i];
if (wp->mconf.new_members.m[i].node_id == sk->greetResponse.nodeId)
{
if (wp->new_members_safekeepers[i] != NULL && wp->new_members_safekeepers[i] != sk)
{
wp_log(WARNING, "safekeeper {id = %lu, ep = %s:%u } in new_members[%u] is already mapped to connection slot %lu",
sk_id->node_id, sk_id->host, sk_id->port, i, wp->new_members_safekeepers[i] - wp->safekeeper);
}
wp_log(LOG, "safekeeper {id = %lu, ep = %s:%u } in new_members[%u] mapped to connection slot %lu",
sk_id->node_id, sk_id->host, sk_id->port, i, sk - wp->safekeeper);
wp->new_members_safekeepers[i] = sk;
}
}
}
/*
* Reset wp->members_safekeepers & new_members_safekeepers and refill them.
* Called after wp changes mconf.
*/
static void
ResetMemberSafekeeperPtrs(WalProposer *wp)
{
memset(&wp->members_safekeepers, 0, sizeof(Safekeeper *) * MAX_SAFEKEEPERS);
memset(&wp->new_members_safekeepers, 0, sizeof(Safekeeper *) * MAX_SAFEKEEPERS);
for (int i = 0; i < wp->n_safekeepers; i++)
{
if (wp->safekeeper[i].state >= SS_WAIT_VOTING)
UpdateMemberSafekeeperPtr(wp, &wp->safekeeper[i]);
}
}
static uint32
MsetQuorum(MemberSet *mset)
{
Assert(mset->len > 0);
return mset->len / 2 + 1;
}
/* Does n forms quorum in mset? */
static bool
MsetHasQuorum(MemberSet *mset, uint32 n)
{
return n >= MsetQuorum(mset);
}
/*
* TermsCollected helper for a single member set `mset`.
*
* `msk` is the member -> safekeeper mapping for mset, i.e. members_safekeepers
* or new_members_safekeepers.
*/
static bool
TermsCollectedMset(WalProposer *wp, MemberSet *mset, Safekeeper **msk, StringInfo s)
{
uint32 n_greeted = 0;
for (uint32 i = 0; i < wp->mconf.members.len; i++)
{
Safekeeper *sk = msk[i];
if (sk != NULL && sk->state == SS_WAIT_VOTING)
{
if (n_greeted > 0)
appendStringInfoString(s, ", ");
appendStringInfo(s, "{id = %lu, ep = %s:%s}", sk->greetResponse.nodeId, sk->host, sk->port);
n_greeted++;
}
}
appendStringInfo(s, ", %u/%u total", n_greeted, mset->len);
return MsetHasQuorum(mset, n_greeted);
}
/*
* Have we received greeting from enough (quorum) safekeepers to start voting?
*/
static bool
TermsCollected(WalProposer *wp)
{
StringInfoData s; /* str for logging */
bool collected = false;
/* legacy: generations disabled */
if (!WalProposerGenerationsEnabled(wp) && wp->mconf.generation == INVALID_GENERATION)
{
collected = wp->n_connected >= wp->quorum;
if (collected)
{
wp->propTerm++;
wp_log(LOG, "walproposer connected to quorum (%d) safekeepers, propTerm=" INT64_FORMAT ", starting voting", wp->quorum, wp->propTerm);
}
return collected;
}
/*
* With generations enabled, we start campaign only when 1) some mconf is
* actually received 2) we have greetings from majority of members as well
* as from majority of new_members if it exists.
*/
if (wp->mconf.generation == INVALID_GENERATION)
return false;
initStringInfo(&s);
appendStringInfoString(&s, "mset greeters: ");
if (!TermsCollectedMset(wp, &wp->mconf.members, wp->members_safekeepers, &s))
goto res;
if (wp->mconf.new_members.len > 0)
{
appendStringInfoString(&s, ", new_mset greeters: ");
if (!TermsCollectedMset(wp, &wp->mconf.new_members, wp->new_members_safekeepers, &s))
goto res;
}
wp->propTerm++;
wp_log(LOG, "walproposer connected to quorum of safekeepers: %s, propTerm=" INT64_FORMAT ", starting voting", s.data, wp->propTerm);
collected = true;
res:
pfree(s.data);
return collected;
return wp->n_connected >= wp->quorum;
}
static void
@@ -921,23 +753,13 @@ RecvAcceptorGreeting(Safekeeper *sk)
pfree(mconf_toml);
/*
* Adopt mconf of safekeepers if it is higher.
* Adopt mconf of safekeepers if it is higher. TODO: mconf change should
* restart wp if it started voting.
*/
if (sk->greetResponse.mconf.generation > wp->mconf.generation)
{
/* sanity check before adopting, should never happen */
if (sk->greetResponse.mconf.members.len == 0)
{
wp_log(FATAL, "mconf %u has zero members", sk->greetResponse.mconf.generation);
}
/* TODO: put mconf to shmem to immediately pick it up on start */
if (wp->state >= WPS_CAMPAIGN)
{
wp_log(FATAL, "restarting to adopt mconf generation %d", sk->greetResponse.mconf.generation);
}
MembershipConfigurationFree(&wp->mconf);
MembershipConfigurationCopy(&sk->greetResponse.mconf, &wp->mconf);
ResetMemberSafekeeperPtrs(wp);
/* full conf was just logged above */
wp_log(LOG, "changed mconf to generation %u", wp->mconf.generation);
}
@@ -945,9 +767,6 @@ RecvAcceptorGreeting(Safekeeper *sk)
/* Protocol is all good, move to voting. */
sk->state = SS_WAIT_VOTING;
/* In greeting safekeeper sent its id; update mappings accordingly. */
UpdateMemberSafekeeperPtr(wp, sk);
/*
* Note: it would be better to track the counter on per safekeeper basis,
* but at worst walproposer would restart with 'term rejected', so leave
@@ -959,9 +778,12 @@ RecvAcceptorGreeting(Safekeeper *sk)
/* We're still collecting terms from the majority. */
wp->propTerm = Max(sk->greetResponse.term, wp->propTerm);
/* Quorum is acquired, prepare the vote request. */
/* Quorum is acquried, prepare the vote request. */
if (TermsCollected(wp))
{
wp->propTerm++;
wp_log(LOG, "proposer connected to quorum (%d) safekeepers, propTerm=" INT64_FORMAT, wp->quorum, wp->propTerm);
wp->state = WPS_CAMPAIGN;
wp->voteRequest.pam.tag = 'v';
wp->voteRequest.generation = wp->mconf.generation;
@@ -1077,44 +899,6 @@ RecvVoteResponse(Safekeeper *sk)
}
}
/*
* VotesCollected helper for a single member set `mset`.
*
* `msk` is the member -> safekeeper mapping for mset, i.e. members_safekeepers
* or new_members_safekeepers.
*/
static bool
VotesCollectedMset(WalProposer *wp, MemberSet *mset, Safekeeper **msk, StringInfo s)
{
uint32 n_votes = 0;
for (uint32 i = 0; i < wp->mconf.members.len; i++)
{
Safekeeper *sk = msk[i];
if (sk != NULL && sk->state == SS_WAIT_ELECTED)
{
if (GetLastLogTerm(sk) > wp->donorLastLogTerm ||
(GetLastLogTerm(sk) == wp->donorLastLogTerm &&
sk->voteResponse.flushLsn > wp->propTermStartLsn))
{
wp->donorLastLogTerm = GetLastLogTerm(sk);
wp->propTermStartLsn = sk->voteResponse.flushLsn;
wp->donor = sk;
}
wp->truncateLsn = Max(wp->safekeeper[i].voteResponse.truncateLsn, wp->truncateLsn);
if (n_votes > 0)
appendStringInfoString(s, ", ");
appendStringInfo(s, "{id = %lu, ep = %s:%s}", sk->greetResponse.nodeId, sk->host, sk->port);
n_votes++;
}
}
appendStringInfo(s, ", %u/%u total", n_votes, mset->len);
return MsetHasQuorum(mset, n_votes);
}
/*
* Checks if enough votes has been collected to get elected and if that's the
* case finds the highest vote, setting donor, donorLastLogTerm,
@@ -1123,8 +907,7 @@ VotesCollectedMset(WalProposer *wp, MemberSet *mset, Safekeeper **msk, StringInf
static bool
VotesCollected(WalProposer *wp)
{
StringInfoData s; /* str for logging */
bool collected = false;
int n_ready = 0;
/* assumed to be called only when not elected yet */
Assert(wp->state == WPS_CAMPAIGN);
@@ -1133,61 +916,25 @@ VotesCollected(WalProposer *wp)
wp->donorLastLogTerm = 0;
wp->truncateLsn = InvalidXLogRecPtr;
/* legacy: generations disabled */
if (!WalProposerGenerationsEnabled(wp) && wp->mconf.generation == INVALID_GENERATION)
for (int i = 0; i < wp->n_safekeepers; i++)
{
int n_ready = 0;
for (int i = 0; i < wp->n_safekeepers; i++)
if (wp->safekeeper[i].state == SS_WAIT_ELECTED)
{
if (wp->safekeeper[i].state == SS_WAIT_ELECTED)
n_ready++;
if (GetLastLogTerm(&wp->safekeeper[i]) > wp->donorLastLogTerm ||
(GetLastLogTerm(&wp->safekeeper[i]) == wp->donorLastLogTerm &&
wp->safekeeper[i].voteResponse.flushLsn > wp->propTermStartLsn))
{
n_ready++;
if (GetLastLogTerm(&wp->safekeeper[i]) > wp->donorLastLogTerm ||
(GetLastLogTerm(&wp->safekeeper[i]) == wp->donorLastLogTerm &&
wp->safekeeper[i].voteResponse.flushLsn > wp->propTermStartLsn))
{
wp->donorLastLogTerm = GetLastLogTerm(&wp->safekeeper[i]);
wp->propTermStartLsn = wp->safekeeper[i].voteResponse.flushLsn;
wp->donor = &wp->safekeeper[i];
}
wp->truncateLsn = Max(wp->safekeeper[i].voteResponse.truncateLsn, wp->truncateLsn);
wp->donorLastLogTerm = GetLastLogTerm(&wp->safekeeper[i]);
wp->propTermStartLsn = wp->safekeeper[i].voteResponse.flushLsn;
wp->donor = i;
}
wp->truncateLsn = Max(wp->safekeeper[i].voteResponse.truncateLsn, wp->truncateLsn);
}
collected = n_ready >= wp->quorum;
if (collected)
{
wp_log(LOG, "walproposer elected with %d/%d votes", n_ready, wp->n_safekeepers);
}
return collected;
}
/*
* if generations are enabled we're expected to get to voting only when
* mconf is established.
*/
Assert(wp->mconf.generation != INVALID_GENERATION);
/*
* We must get votes from both msets if both are present.
*/
initStringInfo(&s);
appendStringInfoString(&s, "mset votes: ");
if (!VotesCollectedMset(wp, &wp->mconf.members, wp->members_safekeepers, &s))
goto res;
if (wp->mconf.new_members.len > 0)
{
appendStringInfoString(&s, ", new_mset votes: ");
if (!VotesCollectedMset(wp, &wp->mconf.new_members, wp->new_members_safekeepers, &s))
goto res;
}
wp_log(LOG, "walproposer elected, %s", s.data);
collected = true;
res:
pfree(s.data);
return collected;
return n_ready >= wp->quorum;
}
/*
@@ -1208,7 +955,7 @@ HandleElectedProposer(WalProposer *wp)
* that only for logical replication (and switching logical walsenders to
* neon_walreader is a todo.)
*/
if (!wp->api.recovery_download(wp, wp->donor))
if (!wp->api.recovery_download(wp, &wp->safekeeper[wp->donor]))
{
wp_log(FATAL, "failed to download WAL for logical replicaiton");
}
@@ -1331,7 +1078,7 @@ ProcessPropStartPos(WalProposer *wp)
/*
* Proposer's term history is the donor's + its own entry.
*/
dth = &wp->donor->voteResponse.termHistory;
dth = &wp->safekeeper[wp->donor].voteResponse.termHistory;
wp->propTermHistory.n_entries = dth->n_entries + 1;
wp->propTermHistory.entries = palloc(sizeof(TermSwitchEntry) * wp->propTermHistory.n_entries);
if (dth->n_entries > 0)
@@ -1339,10 +1086,11 @@ ProcessPropStartPos(WalProposer *wp)
wp->propTermHistory.entries[wp->propTermHistory.n_entries - 1].term = wp->propTerm;
wp->propTermHistory.entries[wp->propTermHistory.n_entries - 1].lsn = wp->propTermStartLsn;
wp_log(LOG, "walproposer elected in term " UINT64_FORMAT ", epochStartLsn %X/%X, donor %s:%s, truncate_lsn %X/%X",
wp_log(LOG, "got votes from majority (%d) of nodes, term " UINT64_FORMAT ", epochStartLsn %X/%X, donor %s:%s, truncate_lsn %X/%X",
wp->quorum,
wp->propTerm,
LSN_FORMAT_ARGS(wp->propTermStartLsn),
wp->donor->host, wp->donor->port,
wp->safekeeper[wp->donor].host, wp->safekeeper[wp->donor].port,
LSN_FORMAT_ARGS(wp->truncateLsn));
/*
@@ -1760,14 +1508,6 @@ RecvAppendResponses(Safekeeper *sk)
readAnything = true;
/* should never happen: sk is expected to send ERROR instead */
if (sk->appendResponse.generation != wp->mconf.generation)
{
wp_log(FATAL, "safekeeper {id = %lu, ep = %s:%s} sent response with generation %u, expected %u",
sk->greetResponse.nodeId, sk->host, sk->port,
sk->appendResponse.generation, wp->mconf.generation);
}
if (sk->appendResponse.term > wp->propTerm)
{
/*
@@ -1884,100 +1624,30 @@ CalculateMinFlushLsn(WalProposer *wp)
}
/*
* GetAcknowledgedByQuorumWALPosition for a single member set `mset`.
*
* `msk` is the member -> safekeeper mapping for mset, i.e. members_safekeepers
* or new_members_safekeepers.
*/
static XLogRecPtr
GetCommittedMset(WalProposer *wp, MemberSet *mset, Safekeeper **msk)
{
XLogRecPtr responses[MAX_SAFEKEEPERS];
/*
* Ascending sort acknowledged LSNs.
*/
Assert(mset->len <= MAX_SAFEKEEPERS);
for (uint32 i = 0; i < mset->len; i++)
{
Safekeeper *sk = msk[i];
/*
* Like in Raft, we aren't allowed to commit entries from previous
* terms, so ignore reported LSN until it gets to propTermStartLsn.
*
* Note: we ignore sk state, which is ok: before first ack flushLsn is
* 0, and later we just preserve value across reconnections. It would
* be ok to check for SS_ACTIVE as well.
*/
if (sk != NULL && sk->appendResponse.flushLsn >= wp->propTermStartLsn)
{
responses[i] = sk->appendResponse.flushLsn;
}
else
{
responses[i] = 0;
}
}
qsort(responses, mset->len, sizeof(XLogRecPtr), CompareLsn);
/*
* And get value committed by the quorum. A way to view this: to get the
* highest value committed on the quorum, in the ordered array we skip n -
* n_quorum elements to get to the first (lowest) value present on all sks
* of the highest quorum.
*/
return responses[mset->len - MsetQuorum(mset)];
}
/*
* Calculate WAL position acknowledged by quorum, i.e. which may be regarded
* committed.
*
* Zero may be returned when there is no quorum of nodes recovered to term start
* lsn which sent feedback yet.
* Calculate WAL position acknowledged by quorum
*/
static XLogRecPtr
GetAcknowledgedByQuorumWALPosition(WalProposer *wp)
{
XLogRecPtr committed;
XLogRecPtr responses[MAX_SAFEKEEPERS];
/* legacy: generations disabled */
if (!WalProposerGenerationsEnabled(wp) && wp->mconf.generation == INVALID_GENERATION)
/*
* Sort acknowledged LSNs
*/
for (int i = 0; i < wp->n_safekeepers; i++)
{
XLogRecPtr responses[MAX_SAFEKEEPERS];
/*
* Sort acknowledged LSNs
* Like in Raft, we aren't allowed to commit entries from previous
* terms, so ignore reported LSN until it gets to epochStartLsn.
*/
for (int i = 0; i < wp->n_safekeepers; i++)
{
/*
* Like in Raft, we aren't allowed to commit entries from previous
* terms, so ignore reported LSN until it gets to propTermStartLsn.
*
* Note: we ignore sk state, which is ok: before first ack
* flushLsn is 0, and later we just preserve value across
* reconnections. It would be ok to check for SS_ACTIVE as well.
*/
responses[i] = wp->safekeeper[i].appendResponse.flushLsn >= wp->propTermStartLsn ? wp->safekeeper[i].appendResponse.flushLsn : 0;
}
qsort(responses, wp->n_safekeepers, sizeof(XLogRecPtr), CompareLsn);
/*
* Get the smallest LSN committed by quorum
*/
return responses[wp->n_safekeepers - wp->quorum];
responses[i] = wp->safekeeper[i].appendResponse.flushLsn >= wp->propTermStartLsn ? wp->safekeeper[i].appendResponse.flushLsn : 0;
}
qsort(responses, wp->n_safekeepers, sizeof(XLogRecPtr), CompareLsn);
committed = GetCommittedMset(wp, &wp->mconf.members, wp->members_safekeepers);
if (wp->mconf.new_members.len > 0)
{
XLogRecPtr new_mset_committed = GetCommittedMset(wp, &wp->mconf.new_members, wp->new_members_safekeepers);
committed = Min(committed, new_mset_committed);
}
return committed;
/*
* Get the smallest LSN committed by quorum
*/
return responses[wp->n_safekeepers - wp->quorum];
}
/*
@@ -2003,9 +1673,9 @@ UpdateDonorShmem(WalProposer *wp)
* about its position immediately after election before any feedbacks are
* sent.
*/
if (wp->donor->state >= SS_WAIT_ELECTED)
if (wp->safekeeper[wp->donor].state >= SS_WAIT_ELECTED)
{
donor = wp->donor;
donor = &wp->safekeeper[wp->donor];
donor_lsn = wp->propTermStartLsn;
}

View File

@@ -145,7 +145,6 @@ typedef uint64 NNodeId;
* This and following structs pair ones in membership.rs.
*/
typedef uint32 Generation;
#define INVALID_GENERATION 0
typedef struct SafekeeperId
{
@@ -772,17 +771,7 @@ typedef struct WalProposer
/* Current walproposer membership configuration */
MembershipConfiguration mconf;
/*
* Parallels mconf.members with pointers to the member's slot in
* safekeepers array of connections, or NULL if such member is not
* connected. Helps to avoid looking slot per id through all
* .safekeepers[] when doing quorum checks.
*/
Safekeeper *members_safekeepers[MAX_SAFEKEEPERS];
/* As above, but for new_members. */
Safekeeper *new_members_safekeepers[MAX_SAFEKEEPERS];
/* (n_safekeepers / 2) + 1. Used for static pre-generations quorum checks. */
/* (n_safekeepers / 2) + 1 */
int quorum;
/*
@@ -840,7 +829,7 @@ typedef struct WalProposer
term_t donorLastLogTerm;
/* Most advanced acceptor */
Safekeeper *donor;
int donor;
/* timeline globally starts at this LSN */
XLogRecPtr timelineStartLsn;

View File

@@ -115,13 +115,17 @@ impl Client {
"{}/v1/tenant/{}/timeline/{}",
self.mgmt_api_endpoint, tenant_id, timeline_id
);
let resp = self.request(Method::DELETE, &uri, ()).await?;
let resp = self
.request_maybe_body(Method::DELETE, &uri, None::<()>)
.await?;
resp.json().await.map_err(Error::ReceiveBody)
}
pub async fn delete_tenant(&self, tenant_id: TenantId) -> Result<models::TimelineDeleteResult> {
pub async fn delete_tenant(&self, tenant_id: TenantId) -> Result<models::TenantDeleteResult> {
let uri = format!("{}/v1/tenant/{}", self.mgmt_api_endpoint, tenant_id);
let resp = self.request(Method::DELETE, &uri, ()).await?;
let resp = self
.request_maybe_body(Method::DELETE, &uri, None::<()>)
.await?;
resp.json().await.map_err(Error::ReceiveBody)
}
@@ -197,6 +201,16 @@ impl Client {
method: Method,
uri: U,
body: B,
) -> Result<reqwest::Response> {
self.request_maybe_body(method, uri, Some(body)).await
}
/// Send the request and check that the status code is good, with an optional body.
async fn request_maybe_body<B: serde::Serialize, U: reqwest::IntoUrl>(
&self,
method: Method,
uri: U,
body: Option<B>,
) -> Result<reqwest::Response> {
let res = self.request_noerror(method, uri, body).await?;
let response = res.error_from_body().await?;
@@ -208,12 +222,15 @@ impl Client {
&self,
method: Method,
uri: U,
body: B,
body: Option<B>,
) -> Result<reqwest::Response> {
let mut req = self.client.request(method, uri);
if let Some(value) = &self.authorization_header {
req = req.header(reqwest::header::AUTHORIZATION, value.get_contents())
}
req.json(&body).send().await.map_err(Error::ReceiveBody)
if let Some(body) = body {
req = req.json(&body);
}
req.send().await.map_err(Error::ReceiveBody)
}
}

View File

@@ -16,9 +16,9 @@ use http_utils::{RequestExt, RouterBuilder};
use hyper::{Body, Request, Response, StatusCode};
use postgres_ffi::WAL_SEGMENT_SIZE;
use safekeeper_api::models::{
AcceptorStateStatus, PullTimelineRequest, SafekeeperStatus, SkTimelineInfo, TermSwitchApiEntry,
TimelineCopyRequest, TimelineCreateRequest, TimelineDeleteResult, TimelineStatus,
TimelineTermBumpRequest,
AcceptorStateStatus, PullTimelineRequest, SafekeeperStatus, SkTimelineInfo, TenantDeleteResult,
TermSwitchApiEntry, TimelineCopyRequest, TimelineCreateRequest, TimelineDeleteResult,
TimelineStatus, TimelineTermBumpRequest,
};
use safekeeper_api::{ServerInfo, membership, models};
use storage_broker::proto::{SafekeeperTimelineInfo, TenantTimelineId as ProtoTenantTimelineId};
@@ -83,13 +83,11 @@ async fn tenant_delete_handler(mut request: Request<Body>) -> Result<Response<Bo
.delete_all_for_tenant(&tenant_id, action)
.await
.map_err(ApiError::InternalServerError)?;
json_response(
StatusCode::OK,
delete_info
.iter()
.map(|(ttid, resp)| (format!("{}", ttid.timeline_id), *resp))
.collect::<HashMap<String, TimelineDeleteResult>>(),
)
let response_body: TenantDeleteResult = delete_info
.iter()
.map(|(ttid, resp)| (format!("{}", ttid.timeline_id), *resp))
.collect::<HashMap<String, TimelineDeleteResult>>();
json_response(StatusCode::OK, response_body)
}
async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {

View File

@@ -1524,25 +1524,14 @@ impl Persistence {
/// Load pending operations from db.
pub(crate) async fn list_pending_ops(
&self,
filter_for_sk: Option<NodeId>,
) -> DatabaseResult<Vec<TimelinePendingOpPersistence>> {
use crate::schema::safekeeper_timeline_pending_ops::dsl;
const FILTER_VAL_1: i64 = 1;
const FILTER_VAL_2: i64 = 2;
let filter_opt = filter_for_sk.map(|id| id.0 as i64);
let timeline_from_db = self
.with_measured_conn(DatabaseOperation::ListTimelineReconcile, move |conn| {
Box::pin(async move {
let from_db: Vec<TimelinePendingOpPersistence> =
dsl::safekeeper_timeline_pending_ops
.filter(
dsl::sk_id
.eq(filter_opt.unwrap_or(FILTER_VAL_1))
.and(dsl::sk_id.eq(filter_opt.unwrap_or(FILTER_VAL_2))),
)
.load(conn)
.await?;
dsl::safekeeper_timeline_pending_ops.load(conn).await?;
Ok(from_db)
})
})

View File

@@ -101,7 +101,7 @@ impl SafekeeperClient {
pub(crate) async fn delete_tenant(
&self,
tenant_id: TenantId,
) -> Result<models::TimelineDeleteResult> {
) -> Result<models::TenantDeleteResult> {
measured_request!(
"delete_tenant",
crate::metrics::Method::Delete,

View File

@@ -35,6 +35,10 @@ impl SafekeeperReconcilers {
service: &Arc<Service>,
reqs: Vec<ScheduleRequest>,
) {
tracing::info!(
"Scheduling {} pending safekeeper ops loaded from db",
reqs.len()
);
for req in reqs {
self.schedule_request(service, req);
}
@@ -74,7 +78,7 @@ pub(crate) async fn load_schedule_requests(
service: &Arc<Service>,
safekeepers: &HashMap<NodeId, Safekeeper>,
) -> anyhow::Result<Vec<ScheduleRequest>> {
let pending_ops = service.persistence.list_pending_ops(None).await?;
let pending_ops = service.persistence.list_pending_ops().await?;
let mut res = Vec::with_capacity(pending_ops.len());
for op_persist in pending_ops {
let node_id = NodeId(op_persist.sk_id as u64);
@@ -232,12 +236,14 @@ impl SafekeeperReconciler {
let kind = req.kind;
let tenant_id = req.tenant_id;
let timeline_id = req.timeline_id;
let node_id = req.safekeeper.skp.id;
self.reconcile_one(req, req_cancel)
.instrument(tracing::info_span!(
"reconcile_one",
?kind,
%tenant_id,
?timeline_id
?timeline_id,
%node_id,
))
.await;
}

View File

@@ -79,12 +79,7 @@ from fixtures.remote_storage import (
default_remote_storage,
remote_storage_to_toml_dict,
)
from fixtures.safekeeper.http import (
MembershipConfiguration,
SafekeeperHttpClient,
SafekeeperId,
TimelineCreateRequest,
)
from fixtures.safekeeper.http import SafekeeperHttpClient
from fixtures.safekeeper.utils import wait_walreceivers_absent
from fixtures.utils import (
ATTACHMENT_NAME_REGEX,
@@ -4767,50 +4762,6 @@ class Safekeeper(LogUtils):
wait_until(paused)
@staticmethod
def sks_to_safekeeper_ids(sks: list[Safekeeper]) -> list[SafekeeperId]:
return [SafekeeperId(sk.id, "localhost", sk.port.pg_tenant_only) for sk in sks]
@staticmethod
def mconf_sks(env: NeonEnv, mconf: MembershipConfiguration) -> list[Safekeeper]:
"""
List of Safekeepers which are members in `mconf`.
"""
members_ids = [m.id for m in mconf.members]
new_members_ids = [m.id for m in mconf.new_members] if mconf.new_members is not None else []
return [sk for sk in env.safekeepers if sk.id in members_ids or sk.id in new_members_ids]
@staticmethod
def create_timeline(
tenant_id: TenantId,
timeline_id: TimelineId,
ps: NeonPageserver,
mconf: MembershipConfiguration,
members_sks: list[Safekeeper],
):
"""
Manually create timeline on safekeepers with given (presumably inital)
mconf: figure out LSN from pageserver, bake request and execute it on
given safekeepers.
Normally done by storcon, but some tests want to do it manually so far.
"""
ps_http_cli = ps.http_client()
# figure out initial LSN.
ps_timeline_detail = ps_http_cli.timeline_detail(tenant_id, timeline_id)
init_lsn = ps_timeline_detail["last_record_lsn"]
log.info(f"initial LSN: {init_lsn}")
# sk timeline creation request expects minor version
pg_version = ps_timeline_detail["pg_version"] * 10000
# create inital mconf
create_r = TimelineCreateRequest(
tenant_id, timeline_id, mconf, pg_version, Lsn(init_lsn), commit_lsn=None
)
log.info(f"sending timeline create: {create_r.to_json()}")
for sk in members_sks:
sk.http_client().timeline_create(create_r)
class NeonBroker(LogUtils):
"""An object managing storage_broker instance"""

View File

@@ -25,7 +25,7 @@ class Walreceiver:
@dataclass
class SafekeeperTimelineStatus:
mconf: MembershipConfiguration | None
mconf: Configuration | None
term: int
last_log_term: int
pg_version: int # Not exactly a PgVersion, safekeeper returns version as int, for example 150002 for 15.2
@@ -78,17 +78,17 @@ class SafekeeperId:
@dataclass
class MembershipConfiguration:
class Configuration:
generation: int
members: list[SafekeeperId]
new_members: list[SafekeeperId] | None
@classmethod
def from_json(cls, d: dict[str, Any]) -> MembershipConfiguration:
def from_json(cls, d: dict[str, Any]) -> Configuration:
generation = d["generation"]
members = d["members"]
new_members = d.get("new_members")
return MembershipConfiguration(generation, members, new_members)
return Configuration(generation, members, new_members)
def to_json(self) -> str:
return json.dumps(self, cls=EnhancedJSONEncoder)
@@ -98,7 +98,7 @@ class MembershipConfiguration:
class TimelineCreateRequest:
tenant_id: TenantId
timeline_id: TimelineId
mconf: MembershipConfiguration
mconf: Configuration
# not exactly PgVersion, for example 150002 for 15.2
pg_version: int
start_lsn: Lsn
@@ -110,13 +110,13 @@ class TimelineCreateRequest:
@dataclass
class TimelineMembershipSwitchResponse:
previous_conf: MembershipConfiguration
current_conf: MembershipConfiguration
previous_conf: Configuration
current_conf: Configuration
@classmethod
def from_json(cls, d: dict[str, Any]) -> TimelineMembershipSwitchResponse:
previous_conf = MembershipConfiguration.from_json(d["previous_conf"])
current_conf = MembershipConfiguration.from_json(d["current_conf"])
previous_conf = Configuration.from_json(d["previous_conf"])
current_conf = Configuration.from_json(d["current_conf"])
return TimelineMembershipSwitchResponse(previous_conf, current_conf)
@@ -194,7 +194,7 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
resj = res.json()
walreceivers = [Walreceiver(wr["conn_id"], wr["status"]) for wr in resj["walreceivers"]]
# It is always normally not None, it is allowed only to make forward compat tests happy.
mconf = MembershipConfiguration.from_json(resj["mconf"]) if "mconf" in resj else None
mconf = Configuration.from_json(resj["mconf"]) if "mconf" in resj else None
return SafekeeperTimelineStatus(
mconf=mconf,
term=resj["acceptor_state"]["term"],
@@ -223,9 +223,7 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
return self.timeline_status(tenant_id, timeline_id).commit_lsn
# Get timeline membership configuration.
def get_membership(
self, tenant_id: TenantId, timeline_id: TimelineId
) -> MembershipConfiguration:
def get_membership(self, tenant_id: TenantId, timeline_id: TimelineId) -> Configuration:
# make mypy happy
return self.timeline_status(tenant_id, timeline_id).mconf # type: ignore
@@ -277,7 +275,7 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
return res_json
def timeline_exclude(
self, tenant_id: TenantId, timeline_id: TimelineId, to: MembershipConfiguration
self, tenant_id: TenantId, timeline_id: TimelineId, to: Configuration
) -> dict[str, Any]:
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/exclude",
@@ -289,7 +287,7 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
return res_json
def membership_switch(
self, tenant_id: TenantId, timeline_id: TimelineId, to: MembershipConfiguration
self, tenant_id: TenantId, timeline_id: TimelineId, to: Configuration
) -> TimelineMembershipSwitchResponse:
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/membership",

View File

@@ -4073,6 +4073,134 @@ def test_storage_controller_location_conf_equivalence(neon_env_builder: NeonEnvB
assert reconciles_after_restart == 0
@pytest.mark.parametrize("restart_storcon", [True, False])
@pytest.mark.parametrize("delete_only_timeline", [True, False])
def test_storcon_create_delete_sk_down(
neon_env_builder: NeonEnvBuilder, restart_storcon: bool, delete_only_timeline: bool
):
"""
Test that the storcon can create and delete tenants and timelines with a safekeeper being down.
"""
neon_env_builder.num_safekeepers = 3
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": True,
}
env = neon_env_builder.init_start()
env.safekeepers[0].stop()
# Wait for heartbeater to pick up that the safekeeper is gone
# This isn't really neccessary
def logged_offline():
env.storage_controller.assert_log_contains(
"Heartbeat round complete for 3 safekeepers, 1 offline"
)
wait_until(logged_offline)
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
env.create_tenant(tenant_id, timeline_id)
branch_timeline_id = env.create_branch("child_of_main", tenant_id)
log.info(
f"Creating tenant {tenant_id} with main timeline {timeline_id} and branch {branch_timeline_id}"
)
env.storage_controller.allowed_errors.extend(
[
".*Call to safekeeper.* management API still failed after.*",
f".*reconcile_one.*tenant_id={tenant_id}.*Call to safekeeper.* management API failed, will retry.*",
f".*reconcile_one.*tenant_id={tenant_id}.*Call to safekeeper.* management API still failed after.*",
]
)
env.safekeepers[1].assert_log_contains(f"creating new timeline {tenant_id}/{timeline_id}")
env.safekeepers[2].assert_log_contains(f"creating new timeline {tenant_id}/{timeline_id}")
if restart_storcon:
# Restart the storcon to check that we persist operations
env.storage_controller.stop()
env.storage_controller.start()
config_lines = [
"neon.safekeeper_proto_version = 3",
]
with env.endpoints.create("main", tenant_id=tenant_id, config_lines=config_lines) as ep:
# endpoint should start.
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3])
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")
with env.endpoints.create("child_of_main", tenant_id=tenant_id, config_lines=config_lines) as ep:
# endpoint should start.
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3])
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")
env.storage_controller.assert_log_contains("writing pending op for sk id 0")
env.safekeepers[0].start()
# ensure that we applied the operation also for the safekeeper we just brought down
def logged_contains_on_sk():
env.safekeepers[0].assert_log_contains(
f"pulling timeline {tenant_id}/{timeline_id} from safekeeper"
)
env.safekeepers[0].assert_log_contains(
f"pulling timeline {tenant_id}/{branch_timeline_id} from safekeeper"
)
wait_until(logged_contains_on_sk)
env.safekeepers[1].stop()
if delete_only_timeline:
env.storage_controller.pageserver_api().timeline_delete(tenant_id, branch_timeline_id)
else:
env.storage_controller.pageserver_api().tenant_delete(tenant_id)
def logged_delete_finished():
env.safekeepers[0].assert_log_contains(f"method=DELETE.*{tenant_id}.*Request handled")
env.safekeepers[2].assert_log_contains(f"method=DELETE.*{tenant_id}.*Request handled")
wait_until(logged_delete_finished)
env.safekeepers[0].assert_log_contains(
f"deleting timeline {tenant_id}/{branch_timeline_id} from disk"
)
env.safekeepers[2].assert_log_contains(
f"deleting timeline {tenant_id}/{branch_timeline_id} from disk"
)
root_was_deleted_on_0 = env.safekeepers[0].log_contains(
f"deleting timeline {tenant_id}/{timeline_id} from disk"
)
root_was_deleted_on_2 = env.safekeepers[2].log_contains(
f"deleting timeline {tenant_id}/{timeline_id} from disk"
)
assert (root_was_deleted_on_0 is None) == (root_was_deleted_on_2 is None)
# We only delete the root timeline iff the tenant delete was requested
if delete_only_timeline:
assert not root_was_deleted_on_0
else:
assert root_was_deleted_on_0
if restart_storcon:
# Restart the storcon to check that we persist operations
env.storage_controller.stop()
env.storage_controller.start()
env.safekeepers[1].start()
# ensure that there is log msgs for the third safekeeper too
def logged_deleted_on_sk():
env.safekeepers[1].assert_log_contains(
f"deleting timeline {tenant_id}/{branch_timeline_id} from disk"
)
wait_until(logged_deleted_on_sk)
@pytest.mark.parametrize("wrong_az", [True, False])
def test_storage_controller_graceful_migration(neon_env_builder: NeonEnvBuilder, wrong_az: bool):
"""

View File

@@ -45,7 +45,7 @@ from fixtures.remote_storage import (
s3_storage,
)
from fixtures.safekeeper.http import (
MembershipConfiguration,
Configuration,
SafekeeperHttpClient,
SafekeeperId,
TimelineCreateRequest,
@@ -589,7 +589,7 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re
for sk in env.safekeepers:
sk.start()
cli = sk.http_client()
mconf = MembershipConfiguration(generation=0, members=[], new_members=None)
mconf = Configuration(generation=0, members=[], new_members=None)
# set start_lsn to the beginning of the first segment to allow reading
# WAL from there (could you intidb LSN as well).
r = TimelineCreateRequest(
@@ -1948,7 +1948,7 @@ def test_membership_api(neon_env_builder: NeonEnvBuilder):
sk_id_2 = SafekeeperId(11, "localhost", 5434) # just a mock
# Request to switch before timeline creation should fail.
init_conf = MembershipConfiguration(generation=1, members=[sk_id_1], new_members=None)
init_conf = Configuration(generation=1, members=[sk_id_1], new_members=None)
with pytest.raises(requests.exceptions.HTTPError):
http_cli.membership_switch(tenant_id, timeline_id, init_conf)
@@ -1960,7 +1960,7 @@ def test_membership_api(neon_env_builder: NeonEnvBuilder):
http_cli.timeline_create(create_r)
# Switch into some conf.
joint_conf = MembershipConfiguration(generation=4, members=[sk_id_1], new_members=[sk_id_2])
joint_conf = Configuration(generation=4, members=[sk_id_1], new_members=[sk_id_2])
resp = http_cli.membership_switch(tenant_id, timeline_id, joint_conf)
log.info(f"joint switch resp: {resp}")
assert resp.previous_conf.generation == 1
@@ -1973,26 +1973,24 @@ def test_membership_api(neon_env_builder: NeonEnvBuilder):
assert after_restart.generation == 4
# Switch into non joint conf of which sk is not a member, must fail.
non_joint_not_member = MembershipConfiguration(
generation=5, members=[sk_id_2], new_members=None
)
non_joint_not_member = Configuration(generation=5, members=[sk_id_2], new_members=None)
with pytest.raises(requests.exceptions.HTTPError):
resp = http_cli.membership_switch(tenant_id, timeline_id, non_joint_not_member)
# Switch into good non joint conf.
non_joint = MembershipConfiguration(generation=6, members=[sk_id_1], new_members=None)
non_joint = Configuration(generation=6, members=[sk_id_1], new_members=None)
resp = http_cli.membership_switch(tenant_id, timeline_id, non_joint)
log.info(f"non joint switch resp: {resp}")
assert resp.previous_conf.generation == 4
assert resp.current_conf.generation == 6
# Switch request to lower conf should be rejected.
lower_conf = MembershipConfiguration(generation=3, members=[sk_id_1], new_members=None)
lower_conf = Configuration(generation=3, members=[sk_id_1], new_members=None)
with pytest.raises(requests.exceptions.HTTPError):
http_cli.membership_switch(tenant_id, timeline_id, lower_conf)
# Now, exclude sk from the membership, timeline should be deleted.
excluded_conf = MembershipConfiguration(generation=7, members=[sk_id_2], new_members=None)
excluded_conf = Configuration(generation=7, members=[sk_id_2], new_members=None)
http_cli.timeline_exclude(tenant_id, timeline_id, excluded_conf)
with pytest.raises(requests.exceptions.HTTPError):
http_cli.timeline_status(tenant_id, timeline_id)
@@ -2012,6 +2010,11 @@ def test_explicit_timeline_creation(neon_env_builder: NeonEnvBuilder):
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
ps = env.pageservers[0]
ps_http_cli = ps.http_client()
http_clis = [sk.http_client() for sk in env.safekeepers]
config_lines = [
"neon.safekeeper_proto_version = 3",
]
@@ -2020,11 +2023,22 @@ def test_explicit_timeline_creation(neon_env_builder: NeonEnvBuilder):
# expected to fail because timeline is not created on safekeepers
with pytest.raises(Exception, match=r".*timed out.*"):
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3], timeout="2s")
# figure out initial LSN.
ps_timeline_detail = ps_http_cli.timeline_detail(tenant_id, timeline_id)
init_lsn = ps_timeline_detail["last_record_lsn"]
log.info(f"initial LSN: {init_lsn}")
# sk timeline creation request expects minor version
pg_version = ps_timeline_detail["pg_version"] * 10000
# create inital mconf
mconf = MembershipConfiguration(
generation=1, members=Safekeeper.sks_to_safekeeper_ids(env.safekeepers), new_members=None
sk_ids = [SafekeeperId(sk.id, "localhost", sk.port.pg_tenant_only) for sk in env.safekeepers]
mconf = Configuration(generation=1, members=sk_ids, new_members=None)
create_r = TimelineCreateRequest(
tenant_id, timeline_id, mconf, pg_version, Lsn(init_lsn), commit_lsn=None
)
Safekeeper.create_timeline(tenant_id, timeline_id, env.pageservers[0], mconf, env.safekeepers)
log.info(f"sending timeline create: {create_r.to_json()}")
for sk_http_cli in http_clis:
sk_http_cli.timeline_create(create_r)
# Once timeline created endpoint should start.
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3])
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")

View File

@@ -18,7 +18,6 @@ from fixtures.neon_fixtures import (
Safekeeper,
)
from fixtures.remote_storage import RemoteStorageKind
from fixtures.safekeeper.http import MembershipConfiguration
from fixtures.utils import skip_in_debug_build
if TYPE_CHECKING:
@@ -453,24 +452,20 @@ def test_concurrent_computes(neon_env_builder: NeonEnvBuilder):
asyncio.run(run_concurrent_computes(env))
async def assert_query_hangs(endpoint: Endpoint, query: str):
"""
Start on endpoint query which is expected to hang and check that it does.
"""
conn = await endpoint.connect_async()
bg_query = asyncio.create_task(conn.execute(query))
await asyncio.sleep(2)
assert not bg_query.done()
return bg_query
# Stop safekeeper and check that query cannot be executed while safekeeper is down.
# Query will insert a single row into a table.
async def check_unavailability(sk: Safekeeper, ep: Endpoint, key: int, start_delay_sec: int = 2):
async def check_unavailability(
sk: Safekeeper, conn: asyncpg.Connection, key: int, start_delay_sec: int = 2
):
# shutdown one of two acceptors, that is, majority
sk.stop()
bg_query = await assert_query_hangs(ep, f"INSERT INTO t values ({key}, 'payload')")
bg_query = asyncio.create_task(conn.execute(f"INSERT INTO t values ({key}, 'payload')"))
await asyncio.sleep(start_delay_sec)
# ensure that the query has not been executed yet
assert not bg_query.done()
# start safekeeper and await the query
sk.start()
await bg_query
@@ -485,10 +480,10 @@ async def run_unavailability(env: NeonEnv, endpoint: Endpoint):
await conn.execute("INSERT INTO t values (1, 'payload')")
# stop safekeeper and check that query cannot be executed while safekeeper is down
await check_unavailability(env.safekeepers[0], endpoint, 2)
await check_unavailability(env.safekeepers[0], conn, 2)
# for the world's balance, do the same with second safekeeper
await check_unavailability(env.safekeepers[1], endpoint, 3)
await check_unavailability(env.safekeepers[1], conn, 3)
# check that we can execute queries after restart
await conn.execute("INSERT INTO t values (4, 'payload')")
@@ -519,7 +514,15 @@ async def run_recovery_uncommitted(env: NeonEnv):
# insert with only one safekeeper up to create tail of flushed but not committed WAL
sk1.stop()
sk2.stop()
await assert_query_hangs(ep, "insert into t select generate_series(1, 2000), 'payload'")
conn = await ep.connect_async()
# query should hang, so execute in separate task
bg_query = asyncio.create_task(
conn.execute("insert into t select generate_series(1, 2000), 'payload'")
)
sleep_sec = 2
await asyncio.sleep(sleep_sec)
# it must still be not finished
assert not bg_query.done()
# note: destoy will kill compute_ctl, preventing it waiting for hanging sync-safekeepers.
ep.stop_and_destroy()
@@ -556,7 +559,15 @@ async def run_wal_truncation(env: NeonEnv, safekeeper_proto_version: int):
# insert with only one sk3 up to create tail of flushed but not committed WAL on it
sk1.stop()
sk2.stop()
await assert_query_hangs(ep, "insert into t select generate_series(1, 180000), 'Papaya'")
conn = await ep.connect_async()
# query should hang, so execute in separate task
bg_query = asyncio.create_task(
conn.execute("insert into t select generate_series(1, 180000), 'Papaya'")
)
sleep_sec = 2
await asyncio.sleep(sleep_sec)
# it must still be not finished
assert not bg_query.done()
# note: destoy will kill compute_ctl, preventing it waiting for hanging sync-safekeepers.
ep.stop_and_destroy()
@@ -596,127 +607,6 @@ def test_wal_truncation(neon_env_builder: NeonEnvBuilder, safekeeper_proto_versi
asyncio.run(run_wal_truncation(env, safekeeper_proto_version))
# todo: add should_start when all up; check and exit early if not
async def quorum_sanity_single(
env: NeonEnv,
compute_sks_ids: list[int],
members_sks_ids: list[int],
new_members_sks_ids: list[int] | None,
sks_to_stop_ids: list[int],
should_work_when_stopped: bool,
):
"""
*_ids params contain safekeeper node ids; it is assumed they are issued
from 1 and sequentially assigned to env.safekeepers.
"""
members_sks = [env.safekeepers[i - 1] for i in members_sks_ids]
new_members_sks = [env.safekeepers[i - 1] for i in new_members_sks_ids] if new_members_sks_ids else None
sks_to_stop = [env.safekeepers[i - 1] for i in sks_to_stop_ids]
mconf = MembershipConfiguration(
generation=1,
members=Safekeeper.sks_to_safekeeper_ids(members_sks),
new_members=Safekeeper.sks_to_safekeeper_ids(new_members_sks) if new_members_sks else None,
)
members_sks = Safekeeper.mconf_sks(env, mconf)
tenant_id = env.initial_tenant
compute_sks_ids_str = "-".join([str(sk_id) for sk_id in compute_sks_ids])
members_sks_ids_str = "-".join([str(sk.id) for sk in mconf.members])
new_members_sks_ids_str = "-".join(
[str(sk.id) for sk in mconf.new_members] if mconf.new_members is not None else []
)
sks_to_stop_ids_str = "-".join([str(sk.id) for sk in sks_to_stop])
log.info(
f"running quorum_sanity_single with compute_sks={compute_sks_ids_str}, members_sks={members_sks_ids_str}, new_members_sks={new_members_sks_ids_str}, sks_to_stop={sks_to_stop_ids_str}, should_work_when_stopped={should_work_when_stopped}"
)
branch_name = f"test_quorum_single_c{compute_sks_ids_str}_m{members_sks_ids_str}_{new_members_sks_ids_str}_s{sks_to_stop_ids_str}"
timeline_id = env.create_branch(branch_name)
# create timeline on `members_sks`
Safekeeper.create_timeline(tenant_id, timeline_id, env.pageservers[0], mconf, members_sks)
config_lines = [
"neon.safekeeper_proto_version = 3",
]
ep = env.endpoints.create(branch_name, config_lines=config_lines)
ep.start(safekeeper_generation=1, safekeepers=compute_sks_ids)
ep.safe_psql("create table t(key int, value text)")
# stop specified sks and check whether writes work
for sk in sks_to_stop:
sk.stop()
if should_work_when_stopped:
log.info("checking that writes still work")
ep.safe_psql("insert into t select generate_series(1, 100), 'Papaya'")
bg_query = None
else:
log.info("checking that writes hang")
bg_query = await assert_query_hangs(
ep, "insert into t select generate_series(1, 100), 'Papaya'"
)
# start again; now they should work
for sk in sks_to_stop:
sk.start()
if bg_query:
log.info("awaiting query")
await bg_query
# It's a bit tempting to iterate over all possible combinations, but let's stick
# with this for now.
async def run_quorum_sanity(env: NeonEnv):
# 3 members, all up, should work
await quorum_sanity_single(env, [1, 2, 3], [1, 2, 3], None, [], True)
# # 3 members, 2/3 up, should work
await quorum_sanity_single(env, [1, 2, 3], [1, 2, 3], None, [3], True)
# # 3 members, 1/3 up, should not work
await quorum_sanity_single(env, [1, 2, 3], [1, 2, 3], None, [2, 3], False)
# # 3 members, all up, should work; wp redundantly talks to 4th.
await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], None, [], True)
# # 3 members, all up, should work with wp talking to 2 of these 3 + plus one redundant
await quorum_sanity_single(env, [2, 3, 4], [1, 2, 3], None, [], True)
# # 3 members, 2/3 up, could work but wp talks to different 3s, so it shouldn't
await quorum_sanity_single(env, [2, 3, 4], [1, 2, 3], None, [3], False)
# # joint conf of 1-2-3 and 4, all up, should work
await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], [4], [], True)
# # joint conf of 1-2-3 and 4, 4 down, shouldn't work
await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], [4], [4], False)
# # joint conf of 1-2-3 and 2-3-4, all up, should work
await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], [2, 3, 4], [], True)
# # joint conf of 1-2-3 and 2-3-4, 1 and 4 down, should work
await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], [2, 3, 4], [1, 4], True)
# # joint conf of 1-2-3 and 2-3-4, 2 down, should work
await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], [2, 3, 4], [2], True)
# # joint conf of 1-2-3 and 2-3-4, 3 down, should work
await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], [2, 3, 4], [3], True)
# # joint conf of 1-2-3 and 2-3-4, 1 and 2 down, shouldn't work
await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], [2, 3, 4], [1, 2], False)
# # joint conf of 1-2-3 and 2-3-4, 2 and 4 down, shouldn't work
await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], [2, 3, 4], [2, 4], False)
# # joint conf of 1-2-3 and 2-3-4 with wp talking to 2-3-4 only.
await quorum_sanity_single(env, [2, 3, 4], [1, 2, 3], [2, 3, 4], [], True)
# # with 1 down should still be ok
await quorum_sanity_single(env, [2, 3, 4], [1, 2, 3], [2, 3, 4], [1], True)
# # but with 2 down not ok
await quorum_sanity_single(env, [2, 3, 4], [1, 2, 3], [2, 3, 4], [2], False)
# Test various combinations of membership configurations / neon.safekeepers
# (list of safekeepers endpoint connects to) values / up & down safekeepers and
# check that endpont can start and write data when we have quorum and can't when
# we don't.
def test_quorum_sanity(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 4
env = neon_env_builder.init_start()
asyncio.run(run_quorum_sanity(env))
async def run_segment_init_failure(env: NeonEnv):
env.create_branch("test_segment_init_failure")
ep = env.endpoints.create_start("test_segment_init_failure")