Compare commits

...

2 Commits

Author SHA1 Message Date
Arseny Sher
48e4a93cc5 safekeeper: add term_bump endpoint.
When walproposer observes now higher term it restarts instead of
crashing whole compute with PANIC; this avoids compute crash after
term_bump call. After successfull election we're still checking
last_log_term of the highest given vote to ensure basebackup is good,
and PANIC otherwise.

It will be used for migration per
035-safekeeper-dynamic-membership-change.md
and
https://github.com/neondatabase/docs/pull/21

ref https://github.com/neondatabase/neon/issues/8700
2024-08-19 14:47:39 +03:00
Arseny Sher
2c4720d0da Reorder sk routes a bit. 2024-08-19 14:47:32 +03:00
8 changed files with 170 additions and 23 deletions

View File

@@ -60,3 +60,16 @@ pub struct TimelineCopyRequest {
pub target_timeline_id: TimelineId,
pub until_lsn: Lsn,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct TimelineTermBumpRequest {
/// bump to
pub term: Option<u64>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct TimelineTermBumpResponse {
// before the request
pub previous_term: u64,
pub current_term: u64,
}

View File

@@ -1038,9 +1038,12 @@ DetermineEpochStartLsn(WalProposer *wp)
if (SkipXLogPageHeader(wp, wp->propEpochStartLsn) != wp->api.get_redo_start_lsn(wp))
{
/*
* However, allow to proceed if previously elected leader was me;
* plain restart of walproposer not intervened by concurrent
* compute (who could generate WAL) is ok.
* However, allow to proceed if last_log_term on the node which gave
* the highest vote (i.e. point where we are going to start writing)
* actually had been won by me; plain restart of walproposer not
* intervened by concurrent compute which wrote WAL is ok.
*
* This avoids compute crash after manual term_bump.
*/
if (!((dth->n_entries >= 1) && (dth->entries[dth->n_entries - 1].term ==
pg_atomic_read_u64(&walprop_shared->mineLastElectedTerm))))
@@ -1442,12 +1445,17 @@ RecvAppendResponses(Safekeeper *sk)
if (sk->appendResponse.term > wp->propTerm)
{
/*
* Another compute with higher term is running. Panic to restart
* PG as we likely need to retake basebackup. However, don't dump
* core as this is kinda expected scenario.
*
* Term has changed to higher one, probably another compute is
* running. If this is the case we could PANIC as well because
* likely it inserted some data and our basebackup is unsuitable
* anymore. However, we also bump term manually (term_bump endpoint)
* on safekeepers for migration purposes, in this case we do want
* compute to stay alive. So restart walproposer with FATAL instead
* of panicking; if basebackup is spoiled next election will notice
* this.
*/
disable_core_dump();
wp_log(PANIC, "WAL acceptor %s:%s with term " INT64_FORMAT " rejected our request, our term " INT64_FORMAT ", meaning another compute is running at the same time, and it conflicts with us",
wp_log(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejected our request, our term " INT64_FORMAT ", meaning another compute is running at the same time, and it conflicts with us",
sk->host, sk->port,
sk->appendResponse.term, wp->propTerm);
}

View File

@@ -1,6 +1,9 @@
use utils::auth::{AuthError, Claims, Scope};
use utils::id::TenantId;
/// If tenant_id is provided, allow if token (claims) is for this tenant or
/// whole safekeeper scope (SafekeeperData). Else, allow only if token is
/// SafekeeperData.
pub fn check_permission(claims: &Claims, tenant_id: Option<TenantId>) -> Result<(), AuthError> {
match (&claims.scope, tenant_id) {
(Scope::Tenant, None) => Err(AuthError(

View File

@@ -18,8 +18,8 @@ use utils::http::endpoint::{prometheus_metrics_handler, request_span, ChannelWri
use utils::http::request::parse_query_param;
use postgres_ffi::WAL_SEGMENT_SIZE;
use safekeeper_api::models::TimelineCreateRequest;
use safekeeper_api::models::{SkTimelineInfo, TimelineCopyRequest};
use safekeeper_api::models::{TimelineCreateRequest, TimelineTermBumpRequest};
use utils::{
auth::SwappableJwtAuth,
http::{
@@ -302,12 +302,11 @@ async fn timeline_digest_handler(request: Request<Body>) -> Result<Response<Body
/// Force persist control file.
async fn timeline_checkpoint_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?;
let ttid = TenantTimelineId::new(
parse_request_param(&request, "tenant_id")?,
parse_request_param(&request, "timeline_id")?,
);
check_permission(&request, Some(ttid.tenant_id))?;
let tli = GlobalTimelines::get(ttid)?;
tli.write_shared_state()
@@ -320,6 +319,28 @@ async fn timeline_checkpoint_handler(request: Request<Body>) -> Result<Response<
json_response(StatusCode::OK, ())
}
/// Make term at least as high as one in request. If one in request is None,
/// increment current one.
async fn timeline_term_bump_handler(
mut request: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let ttid = TenantTimelineId::new(
parse_request_param(&request, "tenant_id")?,
parse_request_param(&request, "timeline_id")?,
);
check_permission(&request, Some(ttid.tenant_id))?;
let request_data: TimelineTermBumpRequest = json_request(&mut request).await?;
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
let response = tli
.term_bump(request_data.term)
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, response)
}
/// Deactivates the timeline and removes its data directory.
async fn timeline_delete_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
let ttid = TenantTimelineId::new(
@@ -558,6 +579,9 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
failpoints_handler(r, cancel).await
})
})
.delete("/v1/tenant/:tenant_id", |r| {
request_span(r, tenant_delete_handler)
})
// Will be used in the future instead of implicit timeline creation
.post("/v1/tenant/timeline", |r| {
request_span(r, timeline_create_handler)
@@ -568,20 +592,10 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
.delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
request_span(r, timeline_delete_handler)
})
.delete("/v1/tenant/:tenant_id", |r| {
request_span(r, tenant_delete_handler)
})
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id/snapshot/:destination_id",
|r| request_span(r, timeline_snapshot_handler),
)
.post("/v1/pull_timeline", |r| {
request_span(r, timeline_pull_handler)
})
.post(
"/v1/tenant/:tenant_id/timeline/:source_timeline_id/copy",
|r| request_span(r, timeline_copy_handler),
)
.patch(
"/v1/tenant/:tenant_id/timeline/:timeline_id/control_file",
|r| request_span(r, patch_control_file_handler),
@@ -590,6 +604,17 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
"/v1/tenant/:tenant_id/timeline/:timeline_id/checkpoint",
|r| request_span(r, timeline_checkpoint_handler),
)
.post(
"/v1/tenant/:tenant_id/timeline/:timeline_id/term_bump",
|r| request_span(r, timeline_term_bump_handler),
)
.post("/v1/pull_timeline", |r| {
request_span(r, timeline_pull_handler)
})
.post(
"/v1/tenant/:tenant_id/timeline/:source_timeline_id/copy",
|r| request_span(r, timeline_copy_handler),
)
// for tests
.post("/v1/record_safekeeper_info/:tenant_id/:timeline_id", |r| {
request_span(r, record_safekeeper_info)

View File

@@ -1,9 +1,10 @@
//! Defines per timeline data stored persistently (SafeKeeperPersistentState)
//! and its wrapper with in memory layer (SafekeeperState).
use std::ops::Deref;
use std::{cmp::max, ops::Deref};
use anyhow::Result;
use safekeeper_api::models::TimelineTermBumpResponse;
use serde::{Deserialize, Serialize};
use utils::{
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
@@ -12,7 +13,7 @@ use utils::{
use crate::{
control_file,
safekeeper::{AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, TermHistory},
safekeeper::{AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, Term, TermHistory},
wal_backup_partial::{self},
};
@@ -209,6 +210,27 @@ where
let s = self.start_change();
self.finish_change(&s).await
}
/// Make term at least as `to`. If `to` is None, increment current one. This
/// is not in safekeeper.rs because we want to be able to do it even if
/// timeline is offloaded.
pub async fn term_bump(&mut self, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
let before = self.acceptor_state.term;
let mut state = self.start_change();
let new = match to {
Some(to) => max(state.acceptor_state.term, to),
None => state.acceptor_state.term + 1,
};
if new > state.acceptor_state.term {
state.acceptor_state.term = new;
self.finish_change(&state).await?;
}
let after = self.acceptor_state.term;
Ok(TimelineTermBumpResponse {
previous_term: before,
current_term: after,
})
}
}
impl<CTRL> Deref for TimelineState<CTRL>

View File

@@ -3,6 +3,7 @@
use anyhow::{anyhow, bail, Result};
use camino::Utf8PathBuf;
use safekeeper_api::models::TimelineTermBumpResponse;
use serde::{Deserialize, Serialize};
use tokio::fs::{self};
use tokio_util::sync::CancellationToken;
@@ -213,6 +214,10 @@ impl StateSK {
.get_last_log_term(self.flush_lsn())
}
pub async fn term_bump(&mut self, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
self.state_mut().term_bump(to).await
}
/// Close open WAL files to release FDs.
fn close_wal_store(&mut self) {
if let StateSK::Loaded(sk) = self {
@@ -847,6 +852,11 @@ impl Timeline {
Ok(res)
}
pub async fn term_bump(self: &Arc<Self>, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
let mut state = self.write_shared_state().await;
state.sk.term_bump(to).await
}
/// Get the timeline guard for reading/writing WAL files.
/// If WAL files are not present on disk (evicted), they will be automatically
/// downloaded from remote storage. This is done in the manager task, which is

View File

@@ -50,6 +50,19 @@ class SafekeeperMetrics(Metrics):
).value
@dataclass
class TermBumpResponse:
previous_term: int
current_term: int
@classmethod
def from_json(cls, d: Dict[str, Any]) -> "TermBumpResponse":
return TermBumpResponse(
previous_term=d["previous_term"],
current_term=d["current_term"],
)
class SafekeeperHttpClient(requests.Session, MetricsGetter):
HTTPError = requests.HTTPError
@@ -115,6 +128,22 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
assert isinstance(res_json, dict)
return res_json
def term_bump(
self,
tenant_id: TenantId,
timeline_id: TimelineId,
term: Optional[int],
) -> TermBumpResponse:
body = {}
if term is not None:
body["term"] = term
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/term_bump",
json=body,
)
res.raise_for_status()
return TermBumpResponse.from_json(res.json())
def pull_timeline(self, body: Dict[str, Any]) -> Dict[str, Any]:
res = self.post(f"http://localhost:{self.port}/v1/pull_timeline", json=body)
res.raise_for_status()

View File

@@ -2155,6 +2155,43 @@ def test_patch_control_file(neon_env_builder: NeonEnvBuilder):
assert res["timelines"][0]["control_file"]["timeline_start_lsn"] == "0/1"
def test_term_bump(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 1
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
endpoint = env.endpoints.create_start("main")
# initialize safekeeper
endpoint.safe_psql("create table t(key int, value text)")
http_cli = env.safekeepers[0].http_client()
# check that bump up to specific term works
curr_term = http_cli.timeline_status(tenant_id, timeline_id).term
bump_to = curr_term + 3
res = http_cli.term_bump(tenant_id, timeline_id, bump_to)
log.info(f"bump to {bump_to} res: {res}")
assert res.current_term >= bump_to
# check that bump to none increments current term
res = http_cli.term_bump(tenant_id, timeline_id, None)
log.info(f"bump to None res: {res}")
assert res.current_term > bump_to
assert res.current_term > res.previous_term
# check that bumping doesn't work downward
res = http_cli.term_bump(tenant_id, timeline_id, 2)
log.info(f"bump to 2 res: {res}")
assert res.current_term > bump_to
assert res.current_term == res.previous_term
# check that this doesn't kill endpoint because last WAL flush was his and
# thus its basebackup is still good
endpoint.safe_psql("insert into t values (1, 'payload')")
# Test disables periodic pushes from safekeeper to the broker and checks that
# pageserver can still discover safekeepers with discovery requests.
def test_broker_discovery(neon_env_builder: NeonEnvBuilder):