mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-21 23:20:40 +00:00
Compare commits
9 Commits
wp-gens-wi
...
arpad/sk_t
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fec5530c9e | ||
|
|
8d5d6b24df | ||
|
|
0a5b9506ae | ||
|
|
48d712af1b | ||
|
|
1a78e9e9cf | ||
|
|
484208b5a8 | ||
|
|
26d636ec14 | ||
|
|
579069d74b | ||
|
|
a02dcaea6b |
@@ -227,6 +227,8 @@ pub struct TimelineDeleteResult {
|
||||
pub dir_existed: bool,
|
||||
}
|
||||
|
||||
pub type TenantDeleteResult = std::collections::HashMap<String, TimelineDeleteResult>;
|
||||
|
||||
fn lsn_invalid() -> Lsn {
|
||||
Lsn::INVALID
|
||||
}
|
||||
|
||||
@@ -3774,7 +3774,7 @@ pub fn make_router(
|
||||
)
|
||||
.put(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/mark_invisible",
|
||||
|r| api_handler( r, timeline_mark_invisible_handler),
|
||||
|r| testing_api_handler("mark timeline invisible", r, timeline_mark_invisible_handler),
|
||||
)
|
||||
.put(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/checkpoint",
|
||||
|
||||
@@ -11526,255 +11526,4 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
#[tokio::test]
|
||||
async fn test_synthetic_size_calculation_with_invisible_branches() -> anyhow::Result<()> {
|
||||
use pageserver_api::models::TimelineVisibilityState;
|
||||
|
||||
use crate::tenant::size::gather_inputs;
|
||||
|
||||
let tenant_conf = pageserver_api::models::TenantConfig {
|
||||
// Ensure that we don't compute gc_cutoffs (which needs reading the layer files)
|
||||
pitr_interval: Some(Duration::ZERO),
|
||||
..Default::default()
|
||||
};
|
||||
let harness = TenantHarness::create_custom(
|
||||
"test_synthetic_size_calculation_with_invisible_branches",
|
||||
tenant_conf,
|
||||
TenantId::generate(),
|
||||
ShardIdentity::unsharded(),
|
||||
Generation::new(0xdeadbeef),
|
||||
)
|
||||
.await?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let main_tline = tenant
|
||||
.create_test_timeline_with_layers(
|
||||
TIMELINE_ID,
|
||||
Lsn(0x10),
|
||||
DEFAULT_PG_VERSION,
|
||||
&ctx,
|
||||
vec![],
|
||||
vec![],
|
||||
vec![],
|
||||
Lsn(0x100),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let snapshot1 = TimelineId::from_array(hex!("11223344556677881122334455667790"));
|
||||
tenant
|
||||
.branch_timeline_test_with_layers(
|
||||
&main_tline,
|
||||
snapshot1,
|
||||
Some(Lsn(0x20)),
|
||||
&ctx,
|
||||
vec![],
|
||||
vec![],
|
||||
Lsn(0x50),
|
||||
)
|
||||
.await?;
|
||||
let snapshot2 = TimelineId::from_array(hex!("11223344556677881122334455667791"));
|
||||
tenant
|
||||
.branch_timeline_test_with_layers(
|
||||
&main_tline,
|
||||
snapshot2,
|
||||
Some(Lsn(0x30)),
|
||||
&ctx,
|
||||
vec![],
|
||||
vec![],
|
||||
Lsn(0x50),
|
||||
)
|
||||
.await?;
|
||||
let snapshot3 = TimelineId::from_array(hex!("11223344556677881122334455667792"));
|
||||
tenant
|
||||
.branch_timeline_test_with_layers(
|
||||
&main_tline,
|
||||
snapshot3,
|
||||
Some(Lsn(0x40)),
|
||||
&ctx,
|
||||
vec![],
|
||||
vec![],
|
||||
Lsn(0x50),
|
||||
)
|
||||
.await?;
|
||||
let limit = Arc::new(Semaphore::new(1));
|
||||
let max_retention_period = None;
|
||||
let mut logical_size_cache = HashMap::new();
|
||||
let cause = LogicalSizeCalculationCause::EvictionTaskImitation;
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let inputs = gather_inputs(
|
||||
&tenant,
|
||||
&limit,
|
||||
max_retention_period,
|
||||
&mut logical_size_cache,
|
||||
cause,
|
||||
&cancel,
|
||||
&ctx,
|
||||
)
|
||||
.instrument(info_span!(
|
||||
"gather_inputs",
|
||||
tenant_id = "unknown",
|
||||
shard_id = "unknown",
|
||||
))
|
||||
.await?;
|
||||
use crate::tenant::size::{LsnKind, ModelInputs, SegmentMeta};
|
||||
use LsnKind::*;
|
||||
use tenant_size_model::Segment;
|
||||
let ModelInputs { mut segments, .. } = inputs;
|
||||
segments.retain(|s| s.timeline_id == TIMELINE_ID);
|
||||
for segment in segments.iter_mut() {
|
||||
segment.segment.parent = None; // We don't care about the parent for the test
|
||||
segment.segment.size = None; // We don't care about the size for the test
|
||||
}
|
||||
assert_eq!(
|
||||
segments,
|
||||
[
|
||||
SegmentMeta {
|
||||
segment: Segment {
|
||||
parent: None,
|
||||
lsn: 0x10,
|
||||
size: None,
|
||||
needed: false,
|
||||
},
|
||||
timeline_id: TIMELINE_ID,
|
||||
kind: BranchStart,
|
||||
},
|
||||
SegmentMeta {
|
||||
segment: Segment {
|
||||
parent: None,
|
||||
lsn: 0x20,
|
||||
size: None,
|
||||
needed: false,
|
||||
},
|
||||
timeline_id: TIMELINE_ID,
|
||||
kind: BranchPoint,
|
||||
},
|
||||
SegmentMeta {
|
||||
segment: Segment {
|
||||
parent: None,
|
||||
lsn: 0x30,
|
||||
size: None,
|
||||
needed: false,
|
||||
},
|
||||
timeline_id: TIMELINE_ID,
|
||||
kind: BranchPoint,
|
||||
},
|
||||
SegmentMeta {
|
||||
segment: Segment {
|
||||
parent: None,
|
||||
lsn: 0x40,
|
||||
size: None,
|
||||
needed: false,
|
||||
},
|
||||
timeline_id: TIMELINE_ID,
|
||||
kind: BranchPoint,
|
||||
},
|
||||
SegmentMeta {
|
||||
segment: Segment {
|
||||
parent: None,
|
||||
lsn: 0x100,
|
||||
size: None,
|
||||
needed: false,
|
||||
},
|
||||
timeline_id: TIMELINE_ID,
|
||||
kind: GcCutOff,
|
||||
}, // we need to retain everything above the last branch point
|
||||
SegmentMeta {
|
||||
segment: Segment {
|
||||
parent: None,
|
||||
lsn: 0x100,
|
||||
size: None,
|
||||
needed: true,
|
||||
},
|
||||
timeline_id: TIMELINE_ID,
|
||||
kind: BranchEnd,
|
||||
},
|
||||
]
|
||||
);
|
||||
|
||||
main_tline
|
||||
.remote_client
|
||||
.schedule_index_upload_for_timeline_invisible_state(
|
||||
TimelineVisibilityState::Invisible,
|
||||
)?;
|
||||
main_tline.remote_client.wait_completion().await?;
|
||||
let inputs = gather_inputs(
|
||||
&tenant,
|
||||
&limit,
|
||||
max_retention_period,
|
||||
&mut logical_size_cache,
|
||||
cause,
|
||||
&cancel,
|
||||
&ctx,
|
||||
)
|
||||
.instrument(info_span!(
|
||||
"gather_inputs",
|
||||
tenant_id = "unknown",
|
||||
shard_id = "unknown",
|
||||
))
|
||||
.await?;
|
||||
let ModelInputs { mut segments, .. } = inputs;
|
||||
segments.retain(|s| s.timeline_id == TIMELINE_ID);
|
||||
for segment in segments.iter_mut() {
|
||||
segment.segment.parent = None; // We don't care about the parent for the test
|
||||
segment.segment.size = None; // We don't care about the size for the test
|
||||
}
|
||||
assert_eq!(
|
||||
segments,
|
||||
[
|
||||
SegmentMeta {
|
||||
segment: Segment {
|
||||
parent: None,
|
||||
lsn: 0x10,
|
||||
size: None,
|
||||
needed: false,
|
||||
},
|
||||
timeline_id: TIMELINE_ID,
|
||||
kind: BranchStart,
|
||||
},
|
||||
SegmentMeta {
|
||||
segment: Segment {
|
||||
parent: None,
|
||||
lsn: 0x20,
|
||||
size: None,
|
||||
needed: false,
|
||||
},
|
||||
timeline_id: TIMELINE_ID,
|
||||
kind: BranchPoint,
|
||||
},
|
||||
SegmentMeta {
|
||||
segment: Segment {
|
||||
parent: None,
|
||||
lsn: 0x30,
|
||||
size: None,
|
||||
needed: false,
|
||||
},
|
||||
timeline_id: TIMELINE_ID,
|
||||
kind: BranchPoint,
|
||||
},
|
||||
SegmentMeta {
|
||||
segment: Segment {
|
||||
parent: None,
|
||||
lsn: 0x40,
|
||||
size: None,
|
||||
needed: false,
|
||||
},
|
||||
timeline_id: TIMELINE_ID,
|
||||
kind: BranchPoint,
|
||||
},
|
||||
SegmentMeta {
|
||||
segment: Segment {
|
||||
parent: None,
|
||||
lsn: 0x40, // Branch end LSN == last branch point LSN
|
||||
size: None,
|
||||
needed: true,
|
||||
},
|
||||
timeline_id: TIMELINE_ID,
|
||||
kind: BranchEnd,
|
||||
},
|
||||
]
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,7 +33,7 @@ pub struct ModelInputs {
|
||||
}
|
||||
|
||||
/// A [`Segment`], with some extra information for display purposes
|
||||
#[derive(Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
|
||||
#[derive(Debug, serde::Serialize, serde::Deserialize)]
|
||||
pub struct SegmentMeta {
|
||||
pub segment: Segment,
|
||||
pub timeline_id: TimelineId,
|
||||
@@ -248,8 +248,6 @@ pub(super) async fn gather_inputs(
|
||||
None
|
||||
};
|
||||
|
||||
let branch_is_invisible = timeline.is_invisible() == Some(true);
|
||||
|
||||
let lease_points = gc_info
|
||||
.leases
|
||||
.keys()
|
||||
@@ -273,10 +271,7 @@ pub(super) async fn gather_inputs(
|
||||
.map(|(lsn, _child_id, _is_offloaded)| (lsn, LsnKind::BranchPoint))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
if !branch_is_invisible {
|
||||
// Do not count lease points for invisible branches.
|
||||
lsns.extend(lease_points.iter().map(|&lsn| (lsn, LsnKind::LeasePoint)));
|
||||
}
|
||||
lsns.extend(lease_points.iter().map(|&lsn| (lsn, LsnKind::LeasePoint)));
|
||||
|
||||
drop(gc_info);
|
||||
|
||||
@@ -292,9 +287,7 @@ pub(super) async fn gather_inputs(
|
||||
|
||||
// Add a point for the PITR cutoff
|
||||
let branch_start_needed = next_pitr_cutoff <= branch_start_lsn;
|
||||
if !branch_start_needed && !branch_is_invisible {
|
||||
// Only add the GcCutOff point when the timeline is visible; otherwise, do not compute the size for the LSN
|
||||
// range from the last branch point to the latest data.
|
||||
if !branch_start_needed {
|
||||
lsns.push((next_pitr_cutoff, LsnKind::GcCutOff));
|
||||
}
|
||||
|
||||
@@ -380,19 +373,11 @@ pub(super) async fn gather_inputs(
|
||||
}
|
||||
}
|
||||
|
||||
let branch_end_lsn = if branch_is_invisible {
|
||||
// If the branch is invisible, the branch end is the last requested LSN (likely a branch cutoff point).
|
||||
segments.last().unwrap().segment.lsn
|
||||
} else {
|
||||
// Otherwise, the branch end is the last record LSN.
|
||||
last_record_lsn.0
|
||||
};
|
||||
|
||||
// Current end of the timeline
|
||||
segments.push(SegmentMeta {
|
||||
segment: Segment {
|
||||
parent: Some(parent),
|
||||
lsn: branch_end_lsn,
|
||||
lsn: last_record_lsn.0,
|
||||
size: None, // Filled in later, if necessary
|
||||
needed: true,
|
||||
},
|
||||
@@ -624,7 +609,6 @@ async fn calculate_logical_size(
|
||||
Ok(TimelineAtLsnSizeResult(timeline, lsn, size_res))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[test]
|
||||
fn verify_size_for_multiple_branches() {
|
||||
// this is generated from integration test test_tenant_size_with_multiple_branches, but this way
|
||||
@@ -782,7 +766,6 @@ fn verify_size_for_multiple_branches() {
|
||||
assert_eq!(inputs.calculate(), 37_851_408);
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[test]
|
||||
fn verify_size_for_one_branch() {
|
||||
let doc = r#"
|
||||
|
||||
@@ -2215,10 +2215,6 @@ impl Timeline {
|
||||
self.remote_client.is_archived()
|
||||
}
|
||||
|
||||
pub(crate) fn is_invisible(&self) -> Option<bool> {
|
||||
self.remote_client.is_invisible()
|
||||
}
|
||||
|
||||
pub(crate) fn is_stopping(&self) -> bool {
|
||||
self.current_state() == TimelineState::Stopping
|
||||
}
|
||||
|
||||
@@ -99,9 +99,6 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
|
||||
wp->config = config;
|
||||
wp->api = api;
|
||||
wp->state = WPS_COLLECTING_TERMS;
|
||||
wp->mconf.generation = INVALID_GENERATION;
|
||||
wp->mconf.members.len = 0;
|
||||
wp->mconf.new_members.len = 0;
|
||||
|
||||
wp_log(LOG, "neon.safekeepers=%s", wp->config->safekeepers_list);
|
||||
|
||||
@@ -173,8 +170,6 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
|
||||
|
||||
if (wp->config->proto_version != 2 && wp->config->proto_version != 3)
|
||||
wp_log(FATAL, "unsupported safekeeper protocol version %d", wp->config->proto_version);
|
||||
if (wp->safekeepers_generation > 0 && wp->config->proto_version < 3)
|
||||
wp_log(FATAL, "enabling generations requires protocol version 3");
|
||||
wp_log(LOG, "using safekeeper protocol version %d", wp->config->proto_version);
|
||||
|
||||
/* Fill the greeting package */
|
||||
@@ -219,7 +214,7 @@ WalProposerFree(WalProposer *wp)
|
||||
static bool
|
||||
WalProposerGenerationsEnabled(WalProposer *wp)
|
||||
{
|
||||
return wp->safekeepers_generation != INVALID_GENERATION;
|
||||
return wp->safekeepers_generation != 0;
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -728,176 +723,13 @@ SendProposerGreeting(Safekeeper *sk)
|
||||
BlockingWrite(sk, sk->outbuf.data, sk->outbuf.len, SS_HANDSHAKE_RECV);
|
||||
}
|
||||
|
||||
/*
|
||||
* Assuming `sk` sent its node id, find such member(s) in wp->mconf and set ptr in
|
||||
* members_safekeepers & new_members_safekeepers to sk.
|
||||
*/
|
||||
static void
|
||||
UpdateMemberSafekeeperPtr(WalProposer *wp, Safekeeper *sk)
|
||||
{
|
||||
/* members_safekeepers etc are fixed size, sanity check mconf size */
|
||||
if (wp->mconf.members.len > MAX_SAFEKEEPERS)
|
||||
wp_log(FATAL, "too many members %d in mconf", wp->mconf.members.len);
|
||||
if (wp->mconf.new_members.len > MAX_SAFEKEEPERS)
|
||||
wp_log(FATAL, "too many new_members %d in mconf", wp->mconf.new_members.len);
|
||||
|
||||
/* node id is not known until greeting is received */
|
||||
if (sk->state < SS_WAIT_VOTING)
|
||||
return;
|
||||
|
||||
/* 0 is assumed to be invalid node id, should never happen */
|
||||
if (sk->greetResponse.nodeId == 0)
|
||||
{
|
||||
wp_log(WARNING, "safekeeper %s:%s sent zero node id", sk->host, sk->port);
|
||||
return;
|
||||
}
|
||||
|
||||
for (uint32 i = 0; i < wp->mconf.members.len; i++)
|
||||
{
|
||||
SafekeeperId *sk_id = &wp->mconf.members.m[i];
|
||||
|
||||
if (wp->mconf.members.m[i].node_id == sk->greetResponse.nodeId)
|
||||
{
|
||||
/*
|
||||
* If mconf or list of safekeepers to connect to changed (the
|
||||
* latter always currently goes through restart though),
|
||||
* ResetMemberSafekeeperPtrs is expected to be called before
|
||||
* UpdateMemberSafekeeperPtr. So, other value suggests that we are
|
||||
* connected to the same sk under different host name, complain
|
||||
* about that.
|
||||
*/
|
||||
if (wp->members_safekeepers[i] != NULL && wp->members_safekeepers[i] != sk)
|
||||
{
|
||||
wp_log(WARNING, "safekeeper {id = %lu, ep = %s:%u } in members[%u] is already mapped to connection slot %lu",
|
||||
sk_id->node_id, sk_id->host, sk_id->port, i, wp->members_safekeepers[i] - wp->safekeeper);
|
||||
}
|
||||
wp_log(LOG, "safekeeper {id = %lu, ep = %s:%u } in members[%u] mapped to connection slot %lu",
|
||||
sk_id->node_id, sk_id->host, sk_id->port, i, sk - wp->safekeeper);
|
||||
wp->members_safekeepers[i] = sk;
|
||||
}
|
||||
}
|
||||
/* repeat for new_members */
|
||||
for (uint32 i = 0; i < wp->mconf.new_members.len; i++)
|
||||
{
|
||||
SafekeeperId *sk_id = &wp->mconf.new_members.m[i];
|
||||
|
||||
if (wp->mconf.new_members.m[i].node_id == sk->greetResponse.nodeId)
|
||||
{
|
||||
if (wp->new_members_safekeepers[i] != NULL && wp->new_members_safekeepers[i] != sk)
|
||||
{
|
||||
wp_log(WARNING, "safekeeper {id = %lu, ep = %s:%u } in new_members[%u] is already mapped to connection slot %lu",
|
||||
sk_id->node_id, sk_id->host, sk_id->port, i, wp->new_members_safekeepers[i] - wp->safekeeper);
|
||||
}
|
||||
wp_log(LOG, "safekeeper {id = %lu, ep = %s:%u } in new_members[%u] mapped to connection slot %lu",
|
||||
sk_id->node_id, sk_id->host, sk_id->port, i, sk - wp->safekeeper);
|
||||
wp->new_members_safekeepers[i] = sk;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Reset wp->members_safekeepers & new_members_safekeepers and refill them.
|
||||
* Called after wp changes mconf.
|
||||
*/
|
||||
static void
|
||||
ResetMemberSafekeeperPtrs(WalProposer *wp)
|
||||
{
|
||||
memset(&wp->members_safekeepers, 0, sizeof(Safekeeper *) * MAX_SAFEKEEPERS);
|
||||
memset(&wp->new_members_safekeepers, 0, sizeof(Safekeeper *) * MAX_SAFEKEEPERS);
|
||||
for (int i = 0; i < wp->n_safekeepers; i++)
|
||||
{
|
||||
if (wp->safekeeper[i].state >= SS_WAIT_VOTING)
|
||||
UpdateMemberSafekeeperPtr(wp, &wp->safekeeper[i]);
|
||||
}
|
||||
}
|
||||
|
||||
static uint32
|
||||
MsetQuorum(MemberSet *mset)
|
||||
{
|
||||
Assert(mset->len > 0);
|
||||
return mset->len / 2 + 1;
|
||||
}
|
||||
|
||||
/* Does n forms quorum in mset? */
|
||||
static bool
|
||||
MsetHasQuorum(MemberSet *mset, uint32 n)
|
||||
{
|
||||
return n >= MsetQuorum(mset);
|
||||
}
|
||||
|
||||
/*
|
||||
* TermsCollected helper for a single member set `mset`.
|
||||
*
|
||||
* `msk` is the member -> safekeeper mapping for mset, i.e. members_safekeepers
|
||||
* or new_members_safekeepers.
|
||||
*/
|
||||
static bool
|
||||
TermsCollectedMset(WalProposer *wp, MemberSet *mset, Safekeeper **msk, StringInfo s)
|
||||
{
|
||||
uint32 n_greeted = 0;
|
||||
|
||||
for (uint32 i = 0; i < wp->mconf.members.len; i++)
|
||||
{
|
||||
Safekeeper *sk = msk[i];
|
||||
|
||||
if (sk != NULL && sk->state == SS_WAIT_VOTING)
|
||||
{
|
||||
if (n_greeted > 0)
|
||||
appendStringInfoString(s, ", ");
|
||||
appendStringInfo(s, "{id = %lu, ep = %s:%s}", sk->greetResponse.nodeId, sk->host, sk->port);
|
||||
n_greeted++;
|
||||
}
|
||||
}
|
||||
appendStringInfo(s, ", %u/%u total", n_greeted, mset->len);
|
||||
return MsetHasQuorum(mset, n_greeted);
|
||||
}
|
||||
|
||||
/*
|
||||
* Have we received greeting from enough (quorum) safekeepers to start voting?
|
||||
*/
|
||||
static bool
|
||||
TermsCollected(WalProposer *wp)
|
||||
{
|
||||
StringInfoData s; /* str for logging */
|
||||
bool collected = false;
|
||||
|
||||
/* legacy: generations disabled */
|
||||
if (!WalProposerGenerationsEnabled(wp) && wp->mconf.generation == INVALID_GENERATION)
|
||||
{
|
||||
collected = wp->n_connected >= wp->quorum;
|
||||
if (collected)
|
||||
{
|
||||
wp->propTerm++;
|
||||
wp_log(LOG, "walproposer connected to quorum (%d) safekeepers, propTerm=" INT64_FORMAT ", starting voting", wp->quorum, wp->propTerm);
|
||||
}
|
||||
return collected;
|
||||
}
|
||||
|
||||
/*
|
||||
* With generations enabled, we start campaign only when 1) some mconf is
|
||||
* actually received 2) we have greetings from majority of members as well
|
||||
* as from majority of new_members if it exists.
|
||||
*/
|
||||
if (wp->mconf.generation == INVALID_GENERATION)
|
||||
return false;
|
||||
|
||||
initStringInfo(&s);
|
||||
appendStringInfoString(&s, "mset greeters: ");
|
||||
if (!TermsCollectedMset(wp, &wp->mconf.members, wp->members_safekeepers, &s))
|
||||
goto res;
|
||||
if (wp->mconf.new_members.len > 0)
|
||||
{
|
||||
appendStringInfoString(&s, ", new_mset greeters: ");
|
||||
if (!TermsCollectedMset(wp, &wp->mconf.new_members, wp->new_members_safekeepers, &s))
|
||||
goto res;
|
||||
}
|
||||
wp->propTerm++;
|
||||
wp_log(LOG, "walproposer connected to quorum of safekeepers: %s, propTerm=" INT64_FORMAT ", starting voting", s.data, wp->propTerm);
|
||||
collected = true;
|
||||
|
||||
res:
|
||||
pfree(s.data);
|
||||
return collected;
|
||||
return wp->n_connected >= wp->quorum;
|
||||
}
|
||||
|
||||
static void
|
||||
@@ -921,23 +753,13 @@ RecvAcceptorGreeting(Safekeeper *sk)
|
||||
pfree(mconf_toml);
|
||||
|
||||
/*
|
||||
* Adopt mconf of safekeepers if it is higher.
|
||||
* Adopt mconf of safekeepers if it is higher. TODO: mconf change should
|
||||
* restart wp if it started voting.
|
||||
*/
|
||||
if (sk->greetResponse.mconf.generation > wp->mconf.generation)
|
||||
{
|
||||
/* sanity check before adopting, should never happen */
|
||||
if (sk->greetResponse.mconf.members.len == 0)
|
||||
{
|
||||
wp_log(FATAL, "mconf %u has zero members", sk->greetResponse.mconf.generation);
|
||||
}
|
||||
/* TODO: put mconf to shmem to immediately pick it up on start */
|
||||
if (wp->state >= WPS_CAMPAIGN)
|
||||
{
|
||||
wp_log(FATAL, "restarting to adopt mconf generation %d", sk->greetResponse.mconf.generation);
|
||||
}
|
||||
MembershipConfigurationFree(&wp->mconf);
|
||||
MembershipConfigurationCopy(&sk->greetResponse.mconf, &wp->mconf);
|
||||
ResetMemberSafekeeperPtrs(wp);
|
||||
/* full conf was just logged above */
|
||||
wp_log(LOG, "changed mconf to generation %u", wp->mconf.generation);
|
||||
}
|
||||
@@ -945,9 +767,6 @@ RecvAcceptorGreeting(Safekeeper *sk)
|
||||
/* Protocol is all good, move to voting. */
|
||||
sk->state = SS_WAIT_VOTING;
|
||||
|
||||
/* In greeting safekeeper sent its id; update mappings accordingly. */
|
||||
UpdateMemberSafekeeperPtr(wp, sk);
|
||||
|
||||
/*
|
||||
* Note: it would be better to track the counter on per safekeeper basis,
|
||||
* but at worst walproposer would restart with 'term rejected', so leave
|
||||
@@ -959,9 +778,12 @@ RecvAcceptorGreeting(Safekeeper *sk)
|
||||
/* We're still collecting terms from the majority. */
|
||||
wp->propTerm = Max(sk->greetResponse.term, wp->propTerm);
|
||||
|
||||
/* Quorum is acquired, prepare the vote request. */
|
||||
/* Quorum is acquried, prepare the vote request. */
|
||||
if (TermsCollected(wp))
|
||||
{
|
||||
wp->propTerm++;
|
||||
wp_log(LOG, "proposer connected to quorum (%d) safekeepers, propTerm=" INT64_FORMAT, wp->quorum, wp->propTerm);
|
||||
|
||||
wp->state = WPS_CAMPAIGN;
|
||||
wp->voteRequest.pam.tag = 'v';
|
||||
wp->voteRequest.generation = wp->mconf.generation;
|
||||
@@ -1077,44 +899,6 @@ RecvVoteResponse(Safekeeper *sk)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* VotesCollected helper for a single member set `mset`.
|
||||
*
|
||||
* `msk` is the member -> safekeeper mapping for mset, i.e. members_safekeepers
|
||||
* or new_members_safekeepers.
|
||||
*/
|
||||
static bool
|
||||
VotesCollectedMset(WalProposer *wp, MemberSet *mset, Safekeeper **msk, StringInfo s)
|
||||
{
|
||||
uint32 n_votes = 0;
|
||||
|
||||
for (uint32 i = 0; i < wp->mconf.members.len; i++)
|
||||
{
|
||||
Safekeeper *sk = msk[i];
|
||||
|
||||
if (sk != NULL && sk->state == SS_WAIT_ELECTED)
|
||||
{
|
||||
if (GetLastLogTerm(sk) > wp->donorLastLogTerm ||
|
||||
(GetLastLogTerm(sk) == wp->donorLastLogTerm &&
|
||||
sk->voteResponse.flushLsn > wp->propTermStartLsn))
|
||||
{
|
||||
wp->donorLastLogTerm = GetLastLogTerm(sk);
|
||||
wp->propTermStartLsn = sk->voteResponse.flushLsn;
|
||||
wp->donor = sk;
|
||||
}
|
||||
wp->truncateLsn = Max(wp->safekeeper[i].voteResponse.truncateLsn, wp->truncateLsn);
|
||||
|
||||
if (n_votes > 0)
|
||||
appendStringInfoString(s, ", ");
|
||||
appendStringInfo(s, "{id = %lu, ep = %s:%s}", sk->greetResponse.nodeId, sk->host, sk->port);
|
||||
n_votes++;
|
||||
}
|
||||
}
|
||||
appendStringInfo(s, ", %u/%u total", n_votes, mset->len);
|
||||
return MsetHasQuorum(mset, n_votes);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Checks if enough votes has been collected to get elected and if that's the
|
||||
* case finds the highest vote, setting donor, donorLastLogTerm,
|
||||
@@ -1123,8 +907,7 @@ VotesCollectedMset(WalProposer *wp, MemberSet *mset, Safekeeper **msk, StringInf
|
||||
static bool
|
||||
VotesCollected(WalProposer *wp)
|
||||
{
|
||||
StringInfoData s; /* str for logging */
|
||||
bool collected = false;
|
||||
int n_ready = 0;
|
||||
|
||||
/* assumed to be called only when not elected yet */
|
||||
Assert(wp->state == WPS_CAMPAIGN);
|
||||
@@ -1133,61 +916,25 @@ VotesCollected(WalProposer *wp)
|
||||
wp->donorLastLogTerm = 0;
|
||||
wp->truncateLsn = InvalidXLogRecPtr;
|
||||
|
||||
/* legacy: generations disabled */
|
||||
if (!WalProposerGenerationsEnabled(wp) && wp->mconf.generation == INVALID_GENERATION)
|
||||
for (int i = 0; i < wp->n_safekeepers; i++)
|
||||
{
|
||||
int n_ready = 0;
|
||||
|
||||
for (int i = 0; i < wp->n_safekeepers; i++)
|
||||
if (wp->safekeeper[i].state == SS_WAIT_ELECTED)
|
||||
{
|
||||
if (wp->safekeeper[i].state == SS_WAIT_ELECTED)
|
||||
n_ready++;
|
||||
|
||||
if (GetLastLogTerm(&wp->safekeeper[i]) > wp->donorLastLogTerm ||
|
||||
(GetLastLogTerm(&wp->safekeeper[i]) == wp->donorLastLogTerm &&
|
||||
wp->safekeeper[i].voteResponse.flushLsn > wp->propTermStartLsn))
|
||||
{
|
||||
n_ready++;
|
||||
|
||||
if (GetLastLogTerm(&wp->safekeeper[i]) > wp->donorLastLogTerm ||
|
||||
(GetLastLogTerm(&wp->safekeeper[i]) == wp->donorLastLogTerm &&
|
||||
wp->safekeeper[i].voteResponse.flushLsn > wp->propTermStartLsn))
|
||||
{
|
||||
wp->donorLastLogTerm = GetLastLogTerm(&wp->safekeeper[i]);
|
||||
wp->propTermStartLsn = wp->safekeeper[i].voteResponse.flushLsn;
|
||||
wp->donor = &wp->safekeeper[i];
|
||||
}
|
||||
wp->truncateLsn = Max(wp->safekeeper[i].voteResponse.truncateLsn, wp->truncateLsn);
|
||||
wp->donorLastLogTerm = GetLastLogTerm(&wp->safekeeper[i]);
|
||||
wp->propTermStartLsn = wp->safekeeper[i].voteResponse.flushLsn;
|
||||
wp->donor = i;
|
||||
}
|
||||
wp->truncateLsn = Max(wp->safekeeper[i].voteResponse.truncateLsn, wp->truncateLsn);
|
||||
}
|
||||
collected = n_ready >= wp->quorum;
|
||||
if (collected)
|
||||
{
|
||||
wp_log(LOG, "walproposer elected with %d/%d votes", n_ready, wp->n_safekeepers);
|
||||
}
|
||||
return collected;
|
||||
}
|
||||
|
||||
/*
|
||||
* if generations are enabled we're expected to get to voting only when
|
||||
* mconf is established.
|
||||
*/
|
||||
Assert(wp->mconf.generation != INVALID_GENERATION);
|
||||
|
||||
/*
|
||||
* We must get votes from both msets if both are present.
|
||||
*/
|
||||
initStringInfo(&s);
|
||||
appendStringInfoString(&s, "mset votes: ");
|
||||
if (!VotesCollectedMset(wp, &wp->mconf.members, wp->members_safekeepers, &s))
|
||||
goto res;
|
||||
if (wp->mconf.new_members.len > 0)
|
||||
{
|
||||
appendStringInfoString(&s, ", new_mset votes: ");
|
||||
if (!VotesCollectedMset(wp, &wp->mconf.new_members, wp->new_members_safekeepers, &s))
|
||||
goto res;
|
||||
}
|
||||
wp_log(LOG, "walproposer elected, %s", s.data);
|
||||
collected = true;
|
||||
|
||||
res:
|
||||
pfree(s.data);
|
||||
return collected;
|
||||
return n_ready >= wp->quorum;
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -1208,7 +955,7 @@ HandleElectedProposer(WalProposer *wp)
|
||||
* that only for logical replication (and switching logical walsenders to
|
||||
* neon_walreader is a todo.)
|
||||
*/
|
||||
if (!wp->api.recovery_download(wp, wp->donor))
|
||||
if (!wp->api.recovery_download(wp, &wp->safekeeper[wp->donor]))
|
||||
{
|
||||
wp_log(FATAL, "failed to download WAL for logical replicaiton");
|
||||
}
|
||||
@@ -1331,7 +1078,7 @@ ProcessPropStartPos(WalProposer *wp)
|
||||
/*
|
||||
* Proposer's term history is the donor's + its own entry.
|
||||
*/
|
||||
dth = &wp->donor->voteResponse.termHistory;
|
||||
dth = &wp->safekeeper[wp->donor].voteResponse.termHistory;
|
||||
wp->propTermHistory.n_entries = dth->n_entries + 1;
|
||||
wp->propTermHistory.entries = palloc(sizeof(TermSwitchEntry) * wp->propTermHistory.n_entries);
|
||||
if (dth->n_entries > 0)
|
||||
@@ -1339,10 +1086,11 @@ ProcessPropStartPos(WalProposer *wp)
|
||||
wp->propTermHistory.entries[wp->propTermHistory.n_entries - 1].term = wp->propTerm;
|
||||
wp->propTermHistory.entries[wp->propTermHistory.n_entries - 1].lsn = wp->propTermStartLsn;
|
||||
|
||||
wp_log(LOG, "walproposer elected in term " UINT64_FORMAT ", epochStartLsn %X/%X, donor %s:%s, truncate_lsn %X/%X",
|
||||
wp_log(LOG, "got votes from majority (%d) of nodes, term " UINT64_FORMAT ", epochStartLsn %X/%X, donor %s:%s, truncate_lsn %X/%X",
|
||||
wp->quorum,
|
||||
wp->propTerm,
|
||||
LSN_FORMAT_ARGS(wp->propTermStartLsn),
|
||||
wp->donor->host, wp->donor->port,
|
||||
wp->safekeeper[wp->donor].host, wp->safekeeper[wp->donor].port,
|
||||
LSN_FORMAT_ARGS(wp->truncateLsn));
|
||||
|
||||
/*
|
||||
@@ -1760,14 +1508,6 @@ RecvAppendResponses(Safekeeper *sk)
|
||||
|
||||
readAnything = true;
|
||||
|
||||
/* should never happen: sk is expected to send ERROR instead */
|
||||
if (sk->appendResponse.generation != wp->mconf.generation)
|
||||
{
|
||||
wp_log(FATAL, "safekeeper {id = %lu, ep = %s:%s} sent response with generation %u, expected %u",
|
||||
sk->greetResponse.nodeId, sk->host, sk->port,
|
||||
sk->appendResponse.generation, wp->mconf.generation);
|
||||
}
|
||||
|
||||
if (sk->appendResponse.term > wp->propTerm)
|
||||
{
|
||||
/*
|
||||
@@ -1884,100 +1624,30 @@ CalculateMinFlushLsn(WalProposer *wp)
|
||||
}
|
||||
|
||||
/*
|
||||
* GetAcknowledgedByQuorumWALPosition for a single member set `mset`.
|
||||
*
|
||||
* `msk` is the member -> safekeeper mapping for mset, i.e. members_safekeepers
|
||||
* or new_members_safekeepers.
|
||||
*/
|
||||
static XLogRecPtr
|
||||
GetCommittedMset(WalProposer *wp, MemberSet *mset, Safekeeper **msk)
|
||||
{
|
||||
XLogRecPtr responses[MAX_SAFEKEEPERS];
|
||||
|
||||
/*
|
||||
* Ascending sort acknowledged LSNs.
|
||||
*/
|
||||
Assert(mset->len <= MAX_SAFEKEEPERS);
|
||||
for (uint32 i = 0; i < mset->len; i++)
|
||||
{
|
||||
Safekeeper *sk = msk[i];
|
||||
|
||||
/*
|
||||
* Like in Raft, we aren't allowed to commit entries from previous
|
||||
* terms, so ignore reported LSN until it gets to propTermStartLsn.
|
||||
*
|
||||
* Note: we ignore sk state, which is ok: before first ack flushLsn is
|
||||
* 0, and later we just preserve value across reconnections. It would
|
||||
* be ok to check for SS_ACTIVE as well.
|
||||
*/
|
||||
if (sk != NULL && sk->appendResponse.flushLsn >= wp->propTermStartLsn)
|
||||
{
|
||||
responses[i] = sk->appendResponse.flushLsn;
|
||||
}
|
||||
else
|
||||
{
|
||||
responses[i] = 0;
|
||||
}
|
||||
}
|
||||
qsort(responses, mset->len, sizeof(XLogRecPtr), CompareLsn);
|
||||
|
||||
/*
|
||||
* And get value committed by the quorum. A way to view this: to get the
|
||||
* highest value committed on the quorum, in the ordered array we skip n -
|
||||
* n_quorum elements to get to the first (lowest) value present on all sks
|
||||
* of the highest quorum.
|
||||
*/
|
||||
return responses[mset->len - MsetQuorum(mset)];
|
||||
}
|
||||
|
||||
/*
|
||||
* Calculate WAL position acknowledged by quorum, i.e. which may be regarded
|
||||
* committed.
|
||||
*
|
||||
* Zero may be returned when there is no quorum of nodes recovered to term start
|
||||
* lsn which sent feedback yet.
|
||||
* Calculate WAL position acknowledged by quorum
|
||||
*/
|
||||
static XLogRecPtr
|
||||
GetAcknowledgedByQuorumWALPosition(WalProposer *wp)
|
||||
{
|
||||
XLogRecPtr committed;
|
||||
XLogRecPtr responses[MAX_SAFEKEEPERS];
|
||||
|
||||
/* legacy: generations disabled */
|
||||
if (!WalProposerGenerationsEnabled(wp) && wp->mconf.generation == INVALID_GENERATION)
|
||||
/*
|
||||
* Sort acknowledged LSNs
|
||||
*/
|
||||
for (int i = 0; i < wp->n_safekeepers; i++)
|
||||
{
|
||||
XLogRecPtr responses[MAX_SAFEKEEPERS];
|
||||
|
||||
/*
|
||||
* Sort acknowledged LSNs
|
||||
* Like in Raft, we aren't allowed to commit entries from previous
|
||||
* terms, so ignore reported LSN until it gets to epochStartLsn.
|
||||
*/
|
||||
for (int i = 0; i < wp->n_safekeepers; i++)
|
||||
{
|
||||
/*
|
||||
* Like in Raft, we aren't allowed to commit entries from previous
|
||||
* terms, so ignore reported LSN until it gets to propTermStartLsn.
|
||||
*
|
||||
* Note: we ignore sk state, which is ok: before first ack
|
||||
* flushLsn is 0, and later we just preserve value across
|
||||
* reconnections. It would be ok to check for SS_ACTIVE as well.
|
||||
*/
|
||||
responses[i] = wp->safekeeper[i].appendResponse.flushLsn >= wp->propTermStartLsn ? wp->safekeeper[i].appendResponse.flushLsn : 0;
|
||||
}
|
||||
qsort(responses, wp->n_safekeepers, sizeof(XLogRecPtr), CompareLsn);
|
||||
|
||||
/*
|
||||
* Get the smallest LSN committed by quorum
|
||||
*/
|
||||
return responses[wp->n_safekeepers - wp->quorum];
|
||||
responses[i] = wp->safekeeper[i].appendResponse.flushLsn >= wp->propTermStartLsn ? wp->safekeeper[i].appendResponse.flushLsn : 0;
|
||||
}
|
||||
qsort(responses, wp->n_safekeepers, sizeof(XLogRecPtr), CompareLsn);
|
||||
|
||||
committed = GetCommittedMset(wp, &wp->mconf.members, wp->members_safekeepers);
|
||||
if (wp->mconf.new_members.len > 0)
|
||||
{
|
||||
XLogRecPtr new_mset_committed = GetCommittedMset(wp, &wp->mconf.new_members, wp->new_members_safekeepers);
|
||||
|
||||
committed = Min(committed, new_mset_committed);
|
||||
}
|
||||
return committed;
|
||||
/*
|
||||
* Get the smallest LSN committed by quorum
|
||||
*/
|
||||
return responses[wp->n_safekeepers - wp->quorum];
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -2003,9 +1673,9 @@ UpdateDonorShmem(WalProposer *wp)
|
||||
* about its position immediately after election before any feedbacks are
|
||||
* sent.
|
||||
*/
|
||||
if (wp->donor->state >= SS_WAIT_ELECTED)
|
||||
if (wp->safekeeper[wp->donor].state >= SS_WAIT_ELECTED)
|
||||
{
|
||||
donor = wp->donor;
|
||||
donor = &wp->safekeeper[wp->donor];
|
||||
donor_lsn = wp->propTermStartLsn;
|
||||
}
|
||||
|
||||
|
||||
@@ -145,7 +145,6 @@ typedef uint64 NNodeId;
|
||||
* This and following structs pair ones in membership.rs.
|
||||
*/
|
||||
typedef uint32 Generation;
|
||||
#define INVALID_GENERATION 0
|
||||
|
||||
typedef struct SafekeeperId
|
||||
{
|
||||
@@ -772,17 +771,7 @@ typedef struct WalProposer
|
||||
/* Current walproposer membership configuration */
|
||||
MembershipConfiguration mconf;
|
||||
|
||||
/*
|
||||
* Parallels mconf.members with pointers to the member's slot in
|
||||
* safekeepers array of connections, or NULL if such member is not
|
||||
* connected. Helps to avoid looking slot per id through all
|
||||
* .safekeepers[] when doing quorum checks.
|
||||
*/
|
||||
Safekeeper *members_safekeepers[MAX_SAFEKEEPERS];
|
||||
/* As above, but for new_members. */
|
||||
Safekeeper *new_members_safekeepers[MAX_SAFEKEEPERS];
|
||||
|
||||
/* (n_safekeepers / 2) + 1. Used for static pre-generations quorum checks. */
|
||||
/* (n_safekeepers / 2) + 1 */
|
||||
int quorum;
|
||||
|
||||
/*
|
||||
@@ -840,7 +829,7 @@ typedef struct WalProposer
|
||||
term_t donorLastLogTerm;
|
||||
|
||||
/* Most advanced acceptor */
|
||||
Safekeeper *donor;
|
||||
int donor;
|
||||
|
||||
/* timeline globally starts at this LSN */
|
||||
XLogRecPtr timelineStartLsn;
|
||||
|
||||
@@ -115,13 +115,17 @@ impl Client {
|
||||
"{}/v1/tenant/{}/timeline/{}",
|
||||
self.mgmt_api_endpoint, tenant_id, timeline_id
|
||||
);
|
||||
let resp = self.request(Method::DELETE, &uri, ()).await?;
|
||||
let resp = self
|
||||
.request_maybe_body(Method::DELETE, &uri, None::<()>)
|
||||
.await?;
|
||||
resp.json().await.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
pub async fn delete_tenant(&self, tenant_id: TenantId) -> Result<models::TimelineDeleteResult> {
|
||||
pub async fn delete_tenant(&self, tenant_id: TenantId) -> Result<models::TenantDeleteResult> {
|
||||
let uri = format!("{}/v1/tenant/{}", self.mgmt_api_endpoint, tenant_id);
|
||||
let resp = self.request(Method::DELETE, &uri, ()).await?;
|
||||
let resp = self
|
||||
.request_maybe_body(Method::DELETE, &uri, None::<()>)
|
||||
.await?;
|
||||
resp.json().await.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
@@ -197,6 +201,16 @@ impl Client {
|
||||
method: Method,
|
||||
uri: U,
|
||||
body: B,
|
||||
) -> Result<reqwest::Response> {
|
||||
self.request_maybe_body(method, uri, Some(body)).await
|
||||
}
|
||||
|
||||
/// Send the request and check that the status code is good, with an optional body.
|
||||
async fn request_maybe_body<B: serde::Serialize, U: reqwest::IntoUrl>(
|
||||
&self,
|
||||
method: Method,
|
||||
uri: U,
|
||||
body: Option<B>,
|
||||
) -> Result<reqwest::Response> {
|
||||
let res = self.request_noerror(method, uri, body).await?;
|
||||
let response = res.error_from_body().await?;
|
||||
@@ -208,12 +222,15 @@ impl Client {
|
||||
&self,
|
||||
method: Method,
|
||||
uri: U,
|
||||
body: B,
|
||||
body: Option<B>,
|
||||
) -> Result<reqwest::Response> {
|
||||
let mut req = self.client.request(method, uri);
|
||||
if let Some(value) = &self.authorization_header {
|
||||
req = req.header(reqwest::header::AUTHORIZATION, value.get_contents())
|
||||
}
|
||||
req.json(&body).send().await.map_err(Error::ReceiveBody)
|
||||
if let Some(body) = body {
|
||||
req = req.json(&body);
|
||||
}
|
||||
req.send().await.map_err(Error::ReceiveBody)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,9 +16,9 @@ use http_utils::{RequestExt, RouterBuilder};
|
||||
use hyper::{Body, Request, Response, StatusCode};
|
||||
use postgres_ffi::WAL_SEGMENT_SIZE;
|
||||
use safekeeper_api::models::{
|
||||
AcceptorStateStatus, PullTimelineRequest, SafekeeperStatus, SkTimelineInfo, TermSwitchApiEntry,
|
||||
TimelineCopyRequest, TimelineCreateRequest, TimelineDeleteResult, TimelineStatus,
|
||||
TimelineTermBumpRequest,
|
||||
AcceptorStateStatus, PullTimelineRequest, SafekeeperStatus, SkTimelineInfo, TenantDeleteResult,
|
||||
TermSwitchApiEntry, TimelineCopyRequest, TimelineCreateRequest, TimelineDeleteResult,
|
||||
TimelineStatus, TimelineTermBumpRequest,
|
||||
};
|
||||
use safekeeper_api::{ServerInfo, membership, models};
|
||||
use storage_broker::proto::{SafekeeperTimelineInfo, TenantTimelineId as ProtoTenantTimelineId};
|
||||
@@ -83,13 +83,11 @@ async fn tenant_delete_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
.delete_all_for_tenant(&tenant_id, action)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
json_response(
|
||||
StatusCode::OK,
|
||||
delete_info
|
||||
.iter()
|
||||
.map(|(ttid, resp)| (format!("{}", ttid.timeline_id), *resp))
|
||||
.collect::<HashMap<String, TimelineDeleteResult>>(),
|
||||
)
|
||||
let response_body: TenantDeleteResult = delete_info
|
||||
.iter()
|
||||
.map(|(ttid, resp)| (format!("{}", ttid.timeline_id), *resp))
|
||||
.collect::<HashMap<String, TimelineDeleteResult>>();
|
||||
json_response(StatusCode::OK, response_body)
|
||||
}
|
||||
|
||||
async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
|
||||
@@ -1524,25 +1524,14 @@ impl Persistence {
|
||||
/// Load pending operations from db.
|
||||
pub(crate) async fn list_pending_ops(
|
||||
&self,
|
||||
filter_for_sk: Option<NodeId>,
|
||||
) -> DatabaseResult<Vec<TimelinePendingOpPersistence>> {
|
||||
use crate::schema::safekeeper_timeline_pending_ops::dsl;
|
||||
|
||||
const FILTER_VAL_1: i64 = 1;
|
||||
const FILTER_VAL_2: i64 = 2;
|
||||
let filter_opt = filter_for_sk.map(|id| id.0 as i64);
|
||||
let timeline_from_db = self
|
||||
.with_measured_conn(DatabaseOperation::ListTimelineReconcile, move |conn| {
|
||||
Box::pin(async move {
|
||||
let from_db: Vec<TimelinePendingOpPersistence> =
|
||||
dsl::safekeeper_timeline_pending_ops
|
||||
.filter(
|
||||
dsl::sk_id
|
||||
.eq(filter_opt.unwrap_or(FILTER_VAL_1))
|
||||
.and(dsl::sk_id.eq(filter_opt.unwrap_or(FILTER_VAL_2))),
|
||||
)
|
||||
.load(conn)
|
||||
.await?;
|
||||
dsl::safekeeper_timeline_pending_ops.load(conn).await?;
|
||||
Ok(from_db)
|
||||
})
|
||||
})
|
||||
|
||||
@@ -101,7 +101,7 @@ impl SafekeeperClient {
|
||||
pub(crate) async fn delete_tenant(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<models::TimelineDeleteResult> {
|
||||
) -> Result<models::TenantDeleteResult> {
|
||||
measured_request!(
|
||||
"delete_tenant",
|
||||
crate::metrics::Method::Delete,
|
||||
|
||||
@@ -35,6 +35,10 @@ impl SafekeeperReconcilers {
|
||||
service: &Arc<Service>,
|
||||
reqs: Vec<ScheduleRequest>,
|
||||
) {
|
||||
tracing::info!(
|
||||
"Scheduling {} pending safekeeper ops loaded from db",
|
||||
reqs.len()
|
||||
);
|
||||
for req in reqs {
|
||||
self.schedule_request(service, req);
|
||||
}
|
||||
@@ -74,7 +78,7 @@ pub(crate) async fn load_schedule_requests(
|
||||
service: &Arc<Service>,
|
||||
safekeepers: &HashMap<NodeId, Safekeeper>,
|
||||
) -> anyhow::Result<Vec<ScheduleRequest>> {
|
||||
let pending_ops = service.persistence.list_pending_ops(None).await?;
|
||||
let pending_ops = service.persistence.list_pending_ops().await?;
|
||||
let mut res = Vec::with_capacity(pending_ops.len());
|
||||
for op_persist in pending_ops {
|
||||
let node_id = NodeId(op_persist.sk_id as u64);
|
||||
@@ -232,12 +236,14 @@ impl SafekeeperReconciler {
|
||||
let kind = req.kind;
|
||||
let tenant_id = req.tenant_id;
|
||||
let timeline_id = req.timeline_id;
|
||||
let node_id = req.safekeeper.skp.id;
|
||||
self.reconcile_one(req, req_cancel)
|
||||
.instrument(tracing::info_span!(
|
||||
"reconcile_one",
|
||||
?kind,
|
||||
%tenant_id,
|
||||
?timeline_id
|
||||
?timeline_id,
|
||||
%node_id,
|
||||
))
|
||||
.await;
|
||||
}
|
||||
|
||||
@@ -79,12 +79,7 @@ from fixtures.remote_storage import (
|
||||
default_remote_storage,
|
||||
remote_storage_to_toml_dict,
|
||||
)
|
||||
from fixtures.safekeeper.http import (
|
||||
MembershipConfiguration,
|
||||
SafekeeperHttpClient,
|
||||
SafekeeperId,
|
||||
TimelineCreateRequest,
|
||||
)
|
||||
from fixtures.safekeeper.http import SafekeeperHttpClient
|
||||
from fixtures.safekeeper.utils import wait_walreceivers_absent
|
||||
from fixtures.utils import (
|
||||
ATTACHMENT_NAME_REGEX,
|
||||
@@ -4767,50 +4762,6 @@ class Safekeeper(LogUtils):
|
||||
|
||||
wait_until(paused)
|
||||
|
||||
@staticmethod
|
||||
def sks_to_safekeeper_ids(sks: list[Safekeeper]) -> list[SafekeeperId]:
|
||||
return [SafekeeperId(sk.id, "localhost", sk.port.pg_tenant_only) for sk in sks]
|
||||
|
||||
@staticmethod
|
||||
def mconf_sks(env: NeonEnv, mconf: MembershipConfiguration) -> list[Safekeeper]:
|
||||
"""
|
||||
List of Safekeepers which are members in `mconf`.
|
||||
"""
|
||||
members_ids = [m.id for m in mconf.members]
|
||||
new_members_ids = [m.id for m in mconf.new_members] if mconf.new_members is not None else []
|
||||
return [sk for sk in env.safekeepers if sk.id in members_ids or sk.id in new_members_ids]
|
||||
|
||||
@staticmethod
|
||||
def create_timeline(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
ps: NeonPageserver,
|
||||
mconf: MembershipConfiguration,
|
||||
members_sks: list[Safekeeper],
|
||||
):
|
||||
"""
|
||||
Manually create timeline on safekeepers with given (presumably inital)
|
||||
mconf: figure out LSN from pageserver, bake request and execute it on
|
||||
given safekeepers.
|
||||
|
||||
Normally done by storcon, but some tests want to do it manually so far.
|
||||
"""
|
||||
ps_http_cli = ps.http_client()
|
||||
# figure out initial LSN.
|
||||
ps_timeline_detail = ps_http_cli.timeline_detail(tenant_id, timeline_id)
|
||||
init_lsn = ps_timeline_detail["last_record_lsn"]
|
||||
log.info(f"initial LSN: {init_lsn}")
|
||||
# sk timeline creation request expects minor version
|
||||
pg_version = ps_timeline_detail["pg_version"] * 10000
|
||||
# create inital mconf
|
||||
create_r = TimelineCreateRequest(
|
||||
tenant_id, timeline_id, mconf, pg_version, Lsn(init_lsn), commit_lsn=None
|
||||
)
|
||||
log.info(f"sending timeline create: {create_r.to_json()}")
|
||||
|
||||
for sk in members_sks:
|
||||
sk.http_client().timeline_create(create_r)
|
||||
|
||||
|
||||
class NeonBroker(LogUtils):
|
||||
"""An object managing storage_broker instance"""
|
||||
|
||||
@@ -25,7 +25,7 @@ class Walreceiver:
|
||||
|
||||
@dataclass
|
||||
class SafekeeperTimelineStatus:
|
||||
mconf: MembershipConfiguration | None
|
||||
mconf: Configuration | None
|
||||
term: int
|
||||
last_log_term: int
|
||||
pg_version: int # Not exactly a PgVersion, safekeeper returns version as int, for example 150002 for 15.2
|
||||
@@ -78,17 +78,17 @@ class SafekeeperId:
|
||||
|
||||
|
||||
@dataclass
|
||||
class MembershipConfiguration:
|
||||
class Configuration:
|
||||
generation: int
|
||||
members: list[SafekeeperId]
|
||||
new_members: list[SafekeeperId] | None
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, d: dict[str, Any]) -> MembershipConfiguration:
|
||||
def from_json(cls, d: dict[str, Any]) -> Configuration:
|
||||
generation = d["generation"]
|
||||
members = d["members"]
|
||||
new_members = d.get("new_members")
|
||||
return MembershipConfiguration(generation, members, new_members)
|
||||
return Configuration(generation, members, new_members)
|
||||
|
||||
def to_json(self) -> str:
|
||||
return json.dumps(self, cls=EnhancedJSONEncoder)
|
||||
@@ -98,7 +98,7 @@ class MembershipConfiguration:
|
||||
class TimelineCreateRequest:
|
||||
tenant_id: TenantId
|
||||
timeline_id: TimelineId
|
||||
mconf: MembershipConfiguration
|
||||
mconf: Configuration
|
||||
# not exactly PgVersion, for example 150002 for 15.2
|
||||
pg_version: int
|
||||
start_lsn: Lsn
|
||||
@@ -110,13 +110,13 @@ class TimelineCreateRequest:
|
||||
|
||||
@dataclass
|
||||
class TimelineMembershipSwitchResponse:
|
||||
previous_conf: MembershipConfiguration
|
||||
current_conf: MembershipConfiguration
|
||||
previous_conf: Configuration
|
||||
current_conf: Configuration
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, d: dict[str, Any]) -> TimelineMembershipSwitchResponse:
|
||||
previous_conf = MembershipConfiguration.from_json(d["previous_conf"])
|
||||
current_conf = MembershipConfiguration.from_json(d["current_conf"])
|
||||
previous_conf = Configuration.from_json(d["previous_conf"])
|
||||
current_conf = Configuration.from_json(d["current_conf"])
|
||||
return TimelineMembershipSwitchResponse(previous_conf, current_conf)
|
||||
|
||||
|
||||
@@ -194,7 +194,7 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
|
||||
resj = res.json()
|
||||
walreceivers = [Walreceiver(wr["conn_id"], wr["status"]) for wr in resj["walreceivers"]]
|
||||
# It is always normally not None, it is allowed only to make forward compat tests happy.
|
||||
mconf = MembershipConfiguration.from_json(resj["mconf"]) if "mconf" in resj else None
|
||||
mconf = Configuration.from_json(resj["mconf"]) if "mconf" in resj else None
|
||||
return SafekeeperTimelineStatus(
|
||||
mconf=mconf,
|
||||
term=resj["acceptor_state"]["term"],
|
||||
@@ -223,9 +223,7 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
|
||||
return self.timeline_status(tenant_id, timeline_id).commit_lsn
|
||||
|
||||
# Get timeline membership configuration.
|
||||
def get_membership(
|
||||
self, tenant_id: TenantId, timeline_id: TimelineId
|
||||
) -> MembershipConfiguration:
|
||||
def get_membership(self, tenant_id: TenantId, timeline_id: TimelineId) -> Configuration:
|
||||
# make mypy happy
|
||||
return self.timeline_status(tenant_id, timeline_id).mconf # type: ignore
|
||||
|
||||
@@ -277,7 +275,7 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
|
||||
return res_json
|
||||
|
||||
def timeline_exclude(
|
||||
self, tenant_id: TenantId, timeline_id: TimelineId, to: MembershipConfiguration
|
||||
self, tenant_id: TenantId, timeline_id: TimelineId, to: Configuration
|
||||
) -> dict[str, Any]:
|
||||
res = self.put(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/exclude",
|
||||
@@ -289,7 +287,7 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
|
||||
return res_json
|
||||
|
||||
def membership_switch(
|
||||
self, tenant_id: TenantId, timeline_id: TimelineId, to: MembershipConfiguration
|
||||
self, tenant_id: TenantId, timeline_id: TimelineId, to: Configuration
|
||||
) -> TimelineMembershipSwitchResponse:
|
||||
res = self.put(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/membership",
|
||||
|
||||
@@ -4073,6 +4073,134 @@ def test_storage_controller_location_conf_equivalence(neon_env_builder: NeonEnvB
|
||||
assert reconciles_after_restart == 0
|
||||
|
||||
|
||||
@pytest.mark.parametrize("restart_storcon", [True, False])
|
||||
@pytest.mark.parametrize("delete_only_timeline", [True, False])
|
||||
def test_storcon_create_delete_sk_down(
|
||||
neon_env_builder: NeonEnvBuilder, restart_storcon: bool, delete_only_timeline: bool
|
||||
):
|
||||
"""
|
||||
Test that the storcon can create and delete tenants and timelines with a safekeeper being down.
|
||||
"""
|
||||
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
neon_env_builder.storage_controller_config = {
|
||||
"timelines_onto_safekeepers": True,
|
||||
}
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.safekeepers[0].stop()
|
||||
|
||||
# Wait for heartbeater to pick up that the safekeeper is gone
|
||||
# This isn't really neccessary
|
||||
def logged_offline():
|
||||
env.storage_controller.assert_log_contains(
|
||||
"Heartbeat round complete for 3 safekeepers, 1 offline"
|
||||
)
|
||||
|
||||
wait_until(logged_offline)
|
||||
|
||||
tenant_id = TenantId.generate()
|
||||
timeline_id = TimelineId.generate()
|
||||
env.create_tenant(tenant_id, timeline_id)
|
||||
branch_timeline_id = env.create_branch("child_of_main", tenant_id)
|
||||
|
||||
log.info(
|
||||
f"Creating tenant {tenant_id} with main timeline {timeline_id} and branch {branch_timeline_id}"
|
||||
)
|
||||
|
||||
env.storage_controller.allowed_errors.extend(
|
||||
[
|
||||
".*Call to safekeeper.* management API still failed after.*",
|
||||
f".*reconcile_one.*tenant_id={tenant_id}.*Call to safekeeper.* management API failed, will retry.*",
|
||||
f".*reconcile_one.*tenant_id={tenant_id}.*Call to safekeeper.* management API still failed after.*",
|
||||
]
|
||||
)
|
||||
|
||||
env.safekeepers[1].assert_log_contains(f"creating new timeline {tenant_id}/{timeline_id}")
|
||||
env.safekeepers[2].assert_log_contains(f"creating new timeline {tenant_id}/{timeline_id}")
|
||||
|
||||
if restart_storcon:
|
||||
# Restart the storcon to check that we persist operations
|
||||
env.storage_controller.stop()
|
||||
env.storage_controller.start()
|
||||
|
||||
config_lines = [
|
||||
"neon.safekeeper_proto_version = 3",
|
||||
]
|
||||
with env.endpoints.create("main", tenant_id=tenant_id, config_lines=config_lines) as ep:
|
||||
# endpoint should start.
|
||||
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3])
|
||||
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")
|
||||
|
||||
with env.endpoints.create("child_of_main", tenant_id=tenant_id, config_lines=config_lines) as ep:
|
||||
# endpoint should start.
|
||||
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3])
|
||||
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")
|
||||
|
||||
env.storage_controller.assert_log_contains("writing pending op for sk id 0")
|
||||
env.safekeepers[0].start()
|
||||
|
||||
# ensure that we applied the operation also for the safekeeper we just brought down
|
||||
def logged_contains_on_sk():
|
||||
env.safekeepers[0].assert_log_contains(
|
||||
f"pulling timeline {tenant_id}/{timeline_id} from safekeeper"
|
||||
)
|
||||
env.safekeepers[0].assert_log_contains(
|
||||
f"pulling timeline {tenant_id}/{branch_timeline_id} from safekeeper"
|
||||
)
|
||||
|
||||
wait_until(logged_contains_on_sk)
|
||||
|
||||
env.safekeepers[1].stop()
|
||||
|
||||
if delete_only_timeline:
|
||||
env.storage_controller.pageserver_api().timeline_delete(tenant_id, branch_timeline_id)
|
||||
else:
|
||||
env.storage_controller.pageserver_api().tenant_delete(tenant_id)
|
||||
|
||||
def logged_delete_finished():
|
||||
env.safekeepers[0].assert_log_contains(f"method=DELETE.*{tenant_id}.*Request handled")
|
||||
env.safekeepers[2].assert_log_contains(f"method=DELETE.*{tenant_id}.*Request handled")
|
||||
|
||||
wait_until(logged_delete_finished)
|
||||
|
||||
env.safekeepers[0].assert_log_contains(
|
||||
f"deleting timeline {tenant_id}/{branch_timeline_id} from disk"
|
||||
)
|
||||
env.safekeepers[2].assert_log_contains(
|
||||
f"deleting timeline {tenant_id}/{branch_timeline_id} from disk"
|
||||
)
|
||||
|
||||
root_was_deleted_on_0 = env.safekeepers[0].log_contains(
|
||||
f"deleting timeline {tenant_id}/{timeline_id} from disk"
|
||||
)
|
||||
root_was_deleted_on_2 = env.safekeepers[2].log_contains(
|
||||
f"deleting timeline {tenant_id}/{timeline_id} from disk"
|
||||
)
|
||||
assert (root_was_deleted_on_0 is None) == (root_was_deleted_on_2 is None)
|
||||
|
||||
# We only delete the root timeline iff the tenant delete was requested
|
||||
if delete_only_timeline:
|
||||
assert not root_was_deleted_on_0
|
||||
else:
|
||||
assert root_was_deleted_on_0
|
||||
|
||||
if restart_storcon:
|
||||
# Restart the storcon to check that we persist operations
|
||||
env.storage_controller.stop()
|
||||
env.storage_controller.start()
|
||||
|
||||
env.safekeepers[1].start()
|
||||
|
||||
# ensure that there is log msgs for the third safekeeper too
|
||||
def logged_deleted_on_sk():
|
||||
env.safekeepers[1].assert_log_contains(
|
||||
f"deleting timeline {tenant_id}/{branch_timeline_id} from disk"
|
||||
)
|
||||
|
||||
wait_until(logged_deleted_on_sk)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("wrong_az", [True, False])
|
||||
def test_storage_controller_graceful_migration(neon_env_builder: NeonEnvBuilder, wrong_az: bool):
|
||||
"""
|
||||
|
||||
@@ -45,7 +45,7 @@ from fixtures.remote_storage import (
|
||||
s3_storage,
|
||||
)
|
||||
from fixtures.safekeeper.http import (
|
||||
MembershipConfiguration,
|
||||
Configuration,
|
||||
SafekeeperHttpClient,
|
||||
SafekeeperId,
|
||||
TimelineCreateRequest,
|
||||
@@ -589,7 +589,7 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re
|
||||
for sk in env.safekeepers:
|
||||
sk.start()
|
||||
cli = sk.http_client()
|
||||
mconf = MembershipConfiguration(generation=0, members=[], new_members=None)
|
||||
mconf = Configuration(generation=0, members=[], new_members=None)
|
||||
# set start_lsn to the beginning of the first segment to allow reading
|
||||
# WAL from there (could you intidb LSN as well).
|
||||
r = TimelineCreateRequest(
|
||||
@@ -1948,7 +1948,7 @@ def test_membership_api(neon_env_builder: NeonEnvBuilder):
|
||||
sk_id_2 = SafekeeperId(11, "localhost", 5434) # just a mock
|
||||
|
||||
# Request to switch before timeline creation should fail.
|
||||
init_conf = MembershipConfiguration(generation=1, members=[sk_id_1], new_members=None)
|
||||
init_conf = Configuration(generation=1, members=[sk_id_1], new_members=None)
|
||||
with pytest.raises(requests.exceptions.HTTPError):
|
||||
http_cli.membership_switch(tenant_id, timeline_id, init_conf)
|
||||
|
||||
@@ -1960,7 +1960,7 @@ def test_membership_api(neon_env_builder: NeonEnvBuilder):
|
||||
http_cli.timeline_create(create_r)
|
||||
|
||||
# Switch into some conf.
|
||||
joint_conf = MembershipConfiguration(generation=4, members=[sk_id_1], new_members=[sk_id_2])
|
||||
joint_conf = Configuration(generation=4, members=[sk_id_1], new_members=[sk_id_2])
|
||||
resp = http_cli.membership_switch(tenant_id, timeline_id, joint_conf)
|
||||
log.info(f"joint switch resp: {resp}")
|
||||
assert resp.previous_conf.generation == 1
|
||||
@@ -1973,26 +1973,24 @@ def test_membership_api(neon_env_builder: NeonEnvBuilder):
|
||||
assert after_restart.generation == 4
|
||||
|
||||
# Switch into non joint conf of which sk is not a member, must fail.
|
||||
non_joint_not_member = MembershipConfiguration(
|
||||
generation=5, members=[sk_id_2], new_members=None
|
||||
)
|
||||
non_joint_not_member = Configuration(generation=5, members=[sk_id_2], new_members=None)
|
||||
with pytest.raises(requests.exceptions.HTTPError):
|
||||
resp = http_cli.membership_switch(tenant_id, timeline_id, non_joint_not_member)
|
||||
|
||||
# Switch into good non joint conf.
|
||||
non_joint = MembershipConfiguration(generation=6, members=[sk_id_1], new_members=None)
|
||||
non_joint = Configuration(generation=6, members=[sk_id_1], new_members=None)
|
||||
resp = http_cli.membership_switch(tenant_id, timeline_id, non_joint)
|
||||
log.info(f"non joint switch resp: {resp}")
|
||||
assert resp.previous_conf.generation == 4
|
||||
assert resp.current_conf.generation == 6
|
||||
|
||||
# Switch request to lower conf should be rejected.
|
||||
lower_conf = MembershipConfiguration(generation=3, members=[sk_id_1], new_members=None)
|
||||
lower_conf = Configuration(generation=3, members=[sk_id_1], new_members=None)
|
||||
with pytest.raises(requests.exceptions.HTTPError):
|
||||
http_cli.membership_switch(tenant_id, timeline_id, lower_conf)
|
||||
|
||||
# Now, exclude sk from the membership, timeline should be deleted.
|
||||
excluded_conf = MembershipConfiguration(generation=7, members=[sk_id_2], new_members=None)
|
||||
excluded_conf = Configuration(generation=7, members=[sk_id_2], new_members=None)
|
||||
http_cli.timeline_exclude(tenant_id, timeline_id, excluded_conf)
|
||||
with pytest.raises(requests.exceptions.HTTPError):
|
||||
http_cli.timeline_status(tenant_id, timeline_id)
|
||||
@@ -2012,6 +2010,11 @@ def test_explicit_timeline_creation(neon_env_builder: NeonEnvBuilder):
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
ps = env.pageservers[0]
|
||||
ps_http_cli = ps.http_client()
|
||||
|
||||
http_clis = [sk.http_client() for sk in env.safekeepers]
|
||||
|
||||
config_lines = [
|
||||
"neon.safekeeper_proto_version = 3",
|
||||
]
|
||||
@@ -2020,11 +2023,22 @@ def test_explicit_timeline_creation(neon_env_builder: NeonEnvBuilder):
|
||||
# expected to fail because timeline is not created on safekeepers
|
||||
with pytest.raises(Exception, match=r".*timed out.*"):
|
||||
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3], timeout="2s")
|
||||
# figure out initial LSN.
|
||||
ps_timeline_detail = ps_http_cli.timeline_detail(tenant_id, timeline_id)
|
||||
init_lsn = ps_timeline_detail["last_record_lsn"]
|
||||
log.info(f"initial LSN: {init_lsn}")
|
||||
# sk timeline creation request expects minor version
|
||||
pg_version = ps_timeline_detail["pg_version"] * 10000
|
||||
# create inital mconf
|
||||
mconf = MembershipConfiguration(
|
||||
generation=1, members=Safekeeper.sks_to_safekeeper_ids(env.safekeepers), new_members=None
|
||||
sk_ids = [SafekeeperId(sk.id, "localhost", sk.port.pg_tenant_only) for sk in env.safekeepers]
|
||||
mconf = Configuration(generation=1, members=sk_ids, new_members=None)
|
||||
create_r = TimelineCreateRequest(
|
||||
tenant_id, timeline_id, mconf, pg_version, Lsn(init_lsn), commit_lsn=None
|
||||
)
|
||||
Safekeeper.create_timeline(tenant_id, timeline_id, env.pageservers[0], mconf, env.safekeepers)
|
||||
log.info(f"sending timeline create: {create_r.to_json()}")
|
||||
|
||||
for sk_http_cli in http_clis:
|
||||
sk_http_cli.timeline_create(create_r)
|
||||
# Once timeline created endpoint should start.
|
||||
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3])
|
||||
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")
|
||||
|
||||
@@ -18,7 +18,6 @@ from fixtures.neon_fixtures import (
|
||||
Safekeeper,
|
||||
)
|
||||
from fixtures.remote_storage import RemoteStorageKind
|
||||
from fixtures.safekeeper.http import MembershipConfiguration
|
||||
from fixtures.utils import skip_in_debug_build
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -453,24 +452,20 @@ def test_concurrent_computes(neon_env_builder: NeonEnvBuilder):
|
||||
asyncio.run(run_concurrent_computes(env))
|
||||
|
||||
|
||||
async def assert_query_hangs(endpoint: Endpoint, query: str):
|
||||
"""
|
||||
Start on endpoint query which is expected to hang and check that it does.
|
||||
"""
|
||||
conn = await endpoint.connect_async()
|
||||
bg_query = asyncio.create_task(conn.execute(query))
|
||||
await asyncio.sleep(2)
|
||||
assert not bg_query.done()
|
||||
return bg_query
|
||||
|
||||
|
||||
# Stop safekeeper and check that query cannot be executed while safekeeper is down.
|
||||
# Query will insert a single row into a table.
|
||||
async def check_unavailability(sk: Safekeeper, ep: Endpoint, key: int, start_delay_sec: int = 2):
|
||||
async def check_unavailability(
|
||||
sk: Safekeeper, conn: asyncpg.Connection, key: int, start_delay_sec: int = 2
|
||||
):
|
||||
# shutdown one of two acceptors, that is, majority
|
||||
sk.stop()
|
||||
|
||||
bg_query = await assert_query_hangs(ep, f"INSERT INTO t values ({key}, 'payload')")
|
||||
bg_query = asyncio.create_task(conn.execute(f"INSERT INTO t values ({key}, 'payload')"))
|
||||
|
||||
await asyncio.sleep(start_delay_sec)
|
||||
# ensure that the query has not been executed yet
|
||||
assert not bg_query.done()
|
||||
|
||||
# start safekeeper and await the query
|
||||
sk.start()
|
||||
await bg_query
|
||||
@@ -485,10 +480,10 @@ async def run_unavailability(env: NeonEnv, endpoint: Endpoint):
|
||||
await conn.execute("INSERT INTO t values (1, 'payload')")
|
||||
|
||||
# stop safekeeper and check that query cannot be executed while safekeeper is down
|
||||
await check_unavailability(env.safekeepers[0], endpoint, 2)
|
||||
await check_unavailability(env.safekeepers[0], conn, 2)
|
||||
|
||||
# for the world's balance, do the same with second safekeeper
|
||||
await check_unavailability(env.safekeepers[1], endpoint, 3)
|
||||
await check_unavailability(env.safekeepers[1], conn, 3)
|
||||
|
||||
# check that we can execute queries after restart
|
||||
await conn.execute("INSERT INTO t values (4, 'payload')")
|
||||
@@ -519,7 +514,15 @@ async def run_recovery_uncommitted(env: NeonEnv):
|
||||
# insert with only one safekeeper up to create tail of flushed but not committed WAL
|
||||
sk1.stop()
|
||||
sk2.stop()
|
||||
await assert_query_hangs(ep, "insert into t select generate_series(1, 2000), 'payload'")
|
||||
conn = await ep.connect_async()
|
||||
# query should hang, so execute in separate task
|
||||
bg_query = asyncio.create_task(
|
||||
conn.execute("insert into t select generate_series(1, 2000), 'payload'")
|
||||
)
|
||||
sleep_sec = 2
|
||||
await asyncio.sleep(sleep_sec)
|
||||
# it must still be not finished
|
||||
assert not bg_query.done()
|
||||
# note: destoy will kill compute_ctl, preventing it waiting for hanging sync-safekeepers.
|
||||
ep.stop_and_destroy()
|
||||
|
||||
@@ -556,7 +559,15 @@ async def run_wal_truncation(env: NeonEnv, safekeeper_proto_version: int):
|
||||
# insert with only one sk3 up to create tail of flushed but not committed WAL on it
|
||||
sk1.stop()
|
||||
sk2.stop()
|
||||
await assert_query_hangs(ep, "insert into t select generate_series(1, 180000), 'Papaya'")
|
||||
conn = await ep.connect_async()
|
||||
# query should hang, so execute in separate task
|
||||
bg_query = asyncio.create_task(
|
||||
conn.execute("insert into t select generate_series(1, 180000), 'Papaya'")
|
||||
)
|
||||
sleep_sec = 2
|
||||
await asyncio.sleep(sleep_sec)
|
||||
# it must still be not finished
|
||||
assert not bg_query.done()
|
||||
# note: destoy will kill compute_ctl, preventing it waiting for hanging sync-safekeepers.
|
||||
ep.stop_and_destroy()
|
||||
|
||||
@@ -596,127 +607,6 @@ def test_wal_truncation(neon_env_builder: NeonEnvBuilder, safekeeper_proto_versi
|
||||
asyncio.run(run_wal_truncation(env, safekeeper_proto_version))
|
||||
|
||||
|
||||
# todo: add should_start when all up; check and exit early if not
|
||||
async def quorum_sanity_single(
|
||||
env: NeonEnv,
|
||||
compute_sks_ids: list[int],
|
||||
members_sks_ids: list[int],
|
||||
new_members_sks_ids: list[int] | None,
|
||||
sks_to_stop_ids: list[int],
|
||||
should_work_when_stopped: bool,
|
||||
):
|
||||
"""
|
||||
*_ids params contain safekeeper node ids; it is assumed they are issued
|
||||
from 1 and sequentially assigned to env.safekeepers.
|
||||
"""
|
||||
members_sks = [env.safekeepers[i - 1] for i in members_sks_ids]
|
||||
new_members_sks = [env.safekeepers[i - 1] for i in new_members_sks_ids] if new_members_sks_ids else None
|
||||
sks_to_stop = [env.safekeepers[i - 1] for i in sks_to_stop_ids]
|
||||
|
||||
mconf = MembershipConfiguration(
|
||||
generation=1,
|
||||
members=Safekeeper.sks_to_safekeeper_ids(members_sks),
|
||||
new_members=Safekeeper.sks_to_safekeeper_ids(new_members_sks) if new_members_sks else None,
|
||||
)
|
||||
members_sks = Safekeeper.mconf_sks(env, mconf)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
compute_sks_ids_str = "-".join([str(sk_id) for sk_id in compute_sks_ids])
|
||||
members_sks_ids_str = "-".join([str(sk.id) for sk in mconf.members])
|
||||
new_members_sks_ids_str = "-".join(
|
||||
[str(sk.id) for sk in mconf.new_members] if mconf.new_members is not None else []
|
||||
)
|
||||
sks_to_stop_ids_str = "-".join([str(sk.id) for sk in sks_to_stop])
|
||||
log.info(
|
||||
f"running quorum_sanity_single with compute_sks={compute_sks_ids_str}, members_sks={members_sks_ids_str}, new_members_sks={new_members_sks_ids_str}, sks_to_stop={sks_to_stop_ids_str}, should_work_when_stopped={should_work_when_stopped}"
|
||||
)
|
||||
branch_name = f"test_quorum_single_c{compute_sks_ids_str}_m{members_sks_ids_str}_{new_members_sks_ids_str}_s{sks_to_stop_ids_str}"
|
||||
timeline_id = env.create_branch(branch_name)
|
||||
|
||||
# create timeline on `members_sks`
|
||||
Safekeeper.create_timeline(tenant_id, timeline_id, env.pageservers[0], mconf, members_sks)
|
||||
|
||||
config_lines = [
|
||||
"neon.safekeeper_proto_version = 3",
|
||||
]
|
||||
ep = env.endpoints.create(branch_name, config_lines=config_lines)
|
||||
ep.start(safekeeper_generation=1, safekeepers=compute_sks_ids)
|
||||
ep.safe_psql("create table t(key int, value text)")
|
||||
|
||||
# stop specified sks and check whether writes work
|
||||
for sk in sks_to_stop:
|
||||
sk.stop()
|
||||
if should_work_when_stopped:
|
||||
log.info("checking that writes still work")
|
||||
ep.safe_psql("insert into t select generate_series(1, 100), 'Papaya'")
|
||||
bg_query = None
|
||||
else:
|
||||
log.info("checking that writes hang")
|
||||
bg_query = await assert_query_hangs(
|
||||
ep, "insert into t select generate_series(1, 100), 'Papaya'"
|
||||
)
|
||||
# start again; now they should work
|
||||
for sk in sks_to_stop:
|
||||
sk.start()
|
||||
if bg_query:
|
||||
log.info("awaiting query")
|
||||
await bg_query
|
||||
|
||||
|
||||
# It's a bit tempting to iterate over all possible combinations, but let's stick
|
||||
# with this for now.
|
||||
async def run_quorum_sanity(env: NeonEnv):
|
||||
# 3 members, all up, should work
|
||||
await quorum_sanity_single(env, [1, 2, 3], [1, 2, 3], None, [], True)
|
||||
# # 3 members, 2/3 up, should work
|
||||
await quorum_sanity_single(env, [1, 2, 3], [1, 2, 3], None, [3], True)
|
||||
# # 3 members, 1/3 up, should not work
|
||||
await quorum_sanity_single(env, [1, 2, 3], [1, 2, 3], None, [2, 3], False)
|
||||
|
||||
# # 3 members, all up, should work; wp redundantly talks to 4th.
|
||||
await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], None, [], True)
|
||||
# # 3 members, all up, should work with wp talking to 2 of these 3 + plus one redundant
|
||||
await quorum_sanity_single(env, [2, 3, 4], [1, 2, 3], None, [], True)
|
||||
# # 3 members, 2/3 up, could work but wp talks to different 3s, so it shouldn't
|
||||
await quorum_sanity_single(env, [2, 3, 4], [1, 2, 3], None, [3], False)
|
||||
|
||||
# # joint conf of 1-2-3 and 4, all up, should work
|
||||
await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], [4], [], True)
|
||||
# # joint conf of 1-2-3 and 4, 4 down, shouldn't work
|
||||
await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], [4], [4], False)
|
||||
|
||||
# # joint conf of 1-2-3 and 2-3-4, all up, should work
|
||||
await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], [2, 3, 4], [], True)
|
||||
# # joint conf of 1-2-3 and 2-3-4, 1 and 4 down, should work
|
||||
await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], [2, 3, 4], [1, 4], True)
|
||||
# # joint conf of 1-2-3 and 2-3-4, 2 down, should work
|
||||
await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], [2, 3, 4], [2], True)
|
||||
# # joint conf of 1-2-3 and 2-3-4, 3 down, should work
|
||||
await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], [2, 3, 4], [3], True)
|
||||
# # joint conf of 1-2-3 and 2-3-4, 1 and 2 down, shouldn't work
|
||||
await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], [2, 3, 4], [1, 2], False)
|
||||
# # joint conf of 1-2-3 and 2-3-4, 2 and 4 down, shouldn't work
|
||||
await quorum_sanity_single(env, [1, 2, 3, 4], [1, 2, 3], [2, 3, 4], [2, 4], False)
|
||||
|
||||
# # joint conf of 1-2-3 and 2-3-4 with wp talking to 2-3-4 only.
|
||||
await quorum_sanity_single(env, [2, 3, 4], [1, 2, 3], [2, 3, 4], [], True)
|
||||
# # with 1 down should still be ok
|
||||
await quorum_sanity_single(env, [2, 3, 4], [1, 2, 3], [2, 3, 4], [1], True)
|
||||
# # but with 2 down not ok
|
||||
await quorum_sanity_single(env, [2, 3, 4], [1, 2, 3], [2, 3, 4], [2], False)
|
||||
|
||||
|
||||
# Test various combinations of membership configurations / neon.safekeepers
|
||||
# (list of safekeepers endpoint connects to) values / up & down safekeepers and
|
||||
# check that endpont can start and write data when we have quorum and can't when
|
||||
# we don't.
|
||||
def test_quorum_sanity(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_safekeepers = 4
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
asyncio.run(run_quorum_sanity(env))
|
||||
|
||||
|
||||
async def run_segment_init_failure(env: NeonEnv):
|
||||
env.create_branch("test_segment_init_failure")
|
||||
ep = env.endpoints.create_start("test_segment_init_failure")
|
||||
|
||||
Reference in New Issue
Block a user