mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-08 13:10:37 +00:00
Compare commits
2 Commits
conrad/ref
...
sk-bump-te
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
48e4a93cc5 | ||
|
|
2c4720d0da |
@@ -60,3 +60,16 @@ pub struct TimelineCopyRequest {
|
||||
pub target_timeline_id: TimelineId,
|
||||
pub until_lsn: Lsn,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct TimelineTermBumpRequest {
|
||||
/// bump to
|
||||
pub term: Option<u64>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct TimelineTermBumpResponse {
|
||||
// before the request
|
||||
pub previous_term: u64,
|
||||
pub current_term: u64,
|
||||
}
|
||||
|
||||
@@ -1038,9 +1038,12 @@ DetermineEpochStartLsn(WalProposer *wp)
|
||||
if (SkipXLogPageHeader(wp, wp->propEpochStartLsn) != wp->api.get_redo_start_lsn(wp))
|
||||
{
|
||||
/*
|
||||
* However, allow to proceed if previously elected leader was me;
|
||||
* plain restart of walproposer not intervened by concurrent
|
||||
* compute (who could generate WAL) is ok.
|
||||
* However, allow to proceed if last_log_term on the node which gave
|
||||
* the highest vote (i.e. point where we are going to start writing)
|
||||
* actually had been won by me; plain restart of walproposer not
|
||||
* intervened by concurrent compute which wrote WAL is ok.
|
||||
*
|
||||
* This avoids compute crash after manual term_bump.
|
||||
*/
|
||||
if (!((dth->n_entries >= 1) && (dth->entries[dth->n_entries - 1].term ==
|
||||
pg_atomic_read_u64(&walprop_shared->mineLastElectedTerm))))
|
||||
@@ -1442,12 +1445,17 @@ RecvAppendResponses(Safekeeper *sk)
|
||||
if (sk->appendResponse.term > wp->propTerm)
|
||||
{
|
||||
/*
|
||||
* Another compute with higher term is running. Panic to restart
|
||||
* PG as we likely need to retake basebackup. However, don't dump
|
||||
* core as this is kinda expected scenario.
|
||||
*
|
||||
* Term has changed to higher one, probably another compute is
|
||||
* running. If this is the case we could PANIC as well because
|
||||
* likely it inserted some data and our basebackup is unsuitable
|
||||
* anymore. However, we also bump term manually (term_bump endpoint)
|
||||
* on safekeepers for migration purposes, in this case we do want
|
||||
* compute to stay alive. So restart walproposer with FATAL instead
|
||||
* of panicking; if basebackup is spoiled next election will notice
|
||||
* this.
|
||||
*/
|
||||
disable_core_dump();
|
||||
wp_log(PANIC, "WAL acceptor %s:%s with term " INT64_FORMAT " rejected our request, our term " INT64_FORMAT ", meaning another compute is running at the same time, and it conflicts with us",
|
||||
wp_log(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejected our request, our term " INT64_FORMAT ", meaning another compute is running at the same time, and it conflicts with us",
|
||||
sk->host, sk->port,
|
||||
sk->appendResponse.term, wp->propTerm);
|
||||
}
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
use utils::auth::{AuthError, Claims, Scope};
|
||||
use utils::id::TenantId;
|
||||
|
||||
/// If tenant_id is provided, allow if token (claims) is for this tenant or
|
||||
/// whole safekeeper scope (SafekeeperData). Else, allow only if token is
|
||||
/// SafekeeperData.
|
||||
pub fn check_permission(claims: &Claims, tenant_id: Option<TenantId>) -> Result<(), AuthError> {
|
||||
match (&claims.scope, tenant_id) {
|
||||
(Scope::Tenant, None) => Err(AuthError(
|
||||
|
||||
@@ -18,8 +18,8 @@ use utils::http::endpoint::{prometheus_metrics_handler, request_span, ChannelWri
|
||||
use utils::http::request::parse_query_param;
|
||||
|
||||
use postgres_ffi::WAL_SEGMENT_SIZE;
|
||||
use safekeeper_api::models::TimelineCreateRequest;
|
||||
use safekeeper_api::models::{SkTimelineInfo, TimelineCopyRequest};
|
||||
use safekeeper_api::models::{TimelineCreateRequest, TimelineTermBumpRequest};
|
||||
use utils::{
|
||||
auth::SwappableJwtAuth,
|
||||
http::{
|
||||
@@ -302,12 +302,11 @@ async fn timeline_digest_handler(request: Request<Body>) -> Result<Response<Body
|
||||
|
||||
/// Force persist control file.
|
||||
async fn timeline_checkpoint_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permission(&request, None)?;
|
||||
|
||||
let ttid = TenantTimelineId::new(
|
||||
parse_request_param(&request, "tenant_id")?,
|
||||
parse_request_param(&request, "timeline_id")?,
|
||||
);
|
||||
check_permission(&request, Some(ttid.tenant_id))?;
|
||||
|
||||
let tli = GlobalTimelines::get(ttid)?;
|
||||
tli.write_shared_state()
|
||||
@@ -320,6 +319,28 @@ async fn timeline_checkpoint_handler(request: Request<Body>) -> Result<Response<
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
/// Make term at least as high as one in request. If one in request is None,
|
||||
/// increment current one.
|
||||
async fn timeline_term_bump_handler(
|
||||
mut request: Request<Body>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let ttid = TenantTimelineId::new(
|
||||
parse_request_param(&request, "tenant_id")?,
|
||||
parse_request_param(&request, "timeline_id")?,
|
||||
);
|
||||
check_permission(&request, Some(ttid.tenant_id))?;
|
||||
|
||||
let request_data: TimelineTermBumpRequest = json_request(&mut request).await?;
|
||||
|
||||
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
|
||||
let response = tli
|
||||
.term_bump(request_data.term)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
json_response(StatusCode::OK, response)
|
||||
}
|
||||
|
||||
/// Deactivates the timeline and removes its data directory.
|
||||
async fn timeline_delete_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let ttid = TenantTimelineId::new(
|
||||
@@ -558,6 +579,9 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
|
||||
failpoints_handler(r, cancel).await
|
||||
})
|
||||
})
|
||||
.delete("/v1/tenant/:tenant_id", |r| {
|
||||
request_span(r, tenant_delete_handler)
|
||||
})
|
||||
// Will be used in the future instead of implicit timeline creation
|
||||
.post("/v1/tenant/timeline", |r| {
|
||||
request_span(r, timeline_create_handler)
|
||||
@@ -568,20 +592,10 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
|
||||
.delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
|
||||
request_span(r, timeline_delete_handler)
|
||||
})
|
||||
.delete("/v1/tenant/:tenant_id", |r| {
|
||||
request_span(r, tenant_delete_handler)
|
||||
})
|
||||
.get(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/snapshot/:destination_id",
|
||||
|r| request_span(r, timeline_snapshot_handler),
|
||||
)
|
||||
.post("/v1/pull_timeline", |r| {
|
||||
request_span(r, timeline_pull_handler)
|
||||
})
|
||||
.post(
|
||||
"/v1/tenant/:tenant_id/timeline/:source_timeline_id/copy",
|
||||
|r| request_span(r, timeline_copy_handler),
|
||||
)
|
||||
.patch(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/control_file",
|
||||
|r| request_span(r, patch_control_file_handler),
|
||||
@@ -590,6 +604,17 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/checkpoint",
|
||||
|r| request_span(r, timeline_checkpoint_handler),
|
||||
)
|
||||
.post(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/term_bump",
|
||||
|r| request_span(r, timeline_term_bump_handler),
|
||||
)
|
||||
.post("/v1/pull_timeline", |r| {
|
||||
request_span(r, timeline_pull_handler)
|
||||
})
|
||||
.post(
|
||||
"/v1/tenant/:tenant_id/timeline/:source_timeline_id/copy",
|
||||
|r| request_span(r, timeline_copy_handler),
|
||||
)
|
||||
// for tests
|
||||
.post("/v1/record_safekeeper_info/:tenant_id/:timeline_id", |r| {
|
||||
request_span(r, record_safekeeper_info)
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
//! Defines per timeline data stored persistently (SafeKeeperPersistentState)
|
||||
//! and its wrapper with in memory layer (SafekeeperState).
|
||||
|
||||
use std::ops::Deref;
|
||||
use std::{cmp::max, ops::Deref};
|
||||
|
||||
use anyhow::Result;
|
||||
use safekeeper_api::models::TimelineTermBumpResponse;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::{
|
||||
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
|
||||
@@ -12,7 +13,7 @@ use utils::{
|
||||
|
||||
use crate::{
|
||||
control_file,
|
||||
safekeeper::{AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, TermHistory},
|
||||
safekeeper::{AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, Term, TermHistory},
|
||||
wal_backup_partial::{self},
|
||||
};
|
||||
|
||||
@@ -209,6 +210,27 @@ where
|
||||
let s = self.start_change();
|
||||
self.finish_change(&s).await
|
||||
}
|
||||
|
||||
/// Make term at least as `to`. If `to` is None, increment current one. This
|
||||
/// is not in safekeeper.rs because we want to be able to do it even if
|
||||
/// timeline is offloaded.
|
||||
pub async fn term_bump(&mut self, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
|
||||
let before = self.acceptor_state.term;
|
||||
let mut state = self.start_change();
|
||||
let new = match to {
|
||||
Some(to) => max(state.acceptor_state.term, to),
|
||||
None => state.acceptor_state.term + 1,
|
||||
};
|
||||
if new > state.acceptor_state.term {
|
||||
state.acceptor_state.term = new;
|
||||
self.finish_change(&state).await?;
|
||||
}
|
||||
let after = self.acceptor_state.term;
|
||||
Ok(TimelineTermBumpResponse {
|
||||
previous_term: before,
|
||||
current_term: after,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<CTRL> Deref for TimelineState<CTRL>
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
use camino::Utf8PathBuf;
|
||||
use safekeeper_api::models::TimelineTermBumpResponse;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::fs::{self};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -213,6 +214,10 @@ impl StateSK {
|
||||
.get_last_log_term(self.flush_lsn())
|
||||
}
|
||||
|
||||
pub async fn term_bump(&mut self, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
|
||||
self.state_mut().term_bump(to).await
|
||||
}
|
||||
|
||||
/// Close open WAL files to release FDs.
|
||||
fn close_wal_store(&mut self) {
|
||||
if let StateSK::Loaded(sk) = self {
|
||||
@@ -847,6 +852,11 @@ impl Timeline {
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
pub async fn term_bump(self: &Arc<Self>, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
|
||||
let mut state = self.write_shared_state().await;
|
||||
state.sk.term_bump(to).await
|
||||
}
|
||||
|
||||
/// Get the timeline guard for reading/writing WAL files.
|
||||
/// If WAL files are not present on disk (evicted), they will be automatically
|
||||
/// downloaded from remote storage. This is done in the manager task, which is
|
||||
|
||||
@@ -50,6 +50,19 @@ class SafekeeperMetrics(Metrics):
|
||||
).value
|
||||
|
||||
|
||||
@dataclass
|
||||
class TermBumpResponse:
|
||||
previous_term: int
|
||||
current_term: int
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, d: Dict[str, Any]) -> "TermBumpResponse":
|
||||
return TermBumpResponse(
|
||||
previous_term=d["previous_term"],
|
||||
current_term=d["current_term"],
|
||||
)
|
||||
|
||||
|
||||
class SafekeeperHttpClient(requests.Session, MetricsGetter):
|
||||
HTTPError = requests.HTTPError
|
||||
|
||||
@@ -115,6 +128,22 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
|
||||
assert isinstance(res_json, dict)
|
||||
return res_json
|
||||
|
||||
def term_bump(
|
||||
self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
term: Optional[int],
|
||||
) -> TermBumpResponse:
|
||||
body = {}
|
||||
if term is not None:
|
||||
body["term"] = term
|
||||
res = self.post(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/term_bump",
|
||||
json=body,
|
||||
)
|
||||
res.raise_for_status()
|
||||
return TermBumpResponse.from_json(res.json())
|
||||
|
||||
def pull_timeline(self, body: Dict[str, Any]) -> Dict[str, Any]:
|
||||
res = self.post(f"http://localhost:{self.port}/v1/pull_timeline", json=body)
|
||||
res.raise_for_status()
|
||||
|
||||
@@ -2155,6 +2155,43 @@ def test_patch_control_file(neon_env_builder: NeonEnvBuilder):
|
||||
assert res["timelines"][0]["control_file"]["timeline_start_lsn"] == "0/1"
|
||||
|
||||
|
||||
def test_term_bump(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_safekeepers = 1
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
# initialize safekeeper
|
||||
endpoint.safe_psql("create table t(key int, value text)")
|
||||
|
||||
http_cli = env.safekeepers[0].http_client()
|
||||
|
||||
# check that bump up to specific term works
|
||||
curr_term = http_cli.timeline_status(tenant_id, timeline_id).term
|
||||
bump_to = curr_term + 3
|
||||
res = http_cli.term_bump(tenant_id, timeline_id, bump_to)
|
||||
log.info(f"bump to {bump_to} res: {res}")
|
||||
assert res.current_term >= bump_to
|
||||
|
||||
# check that bump to none increments current term
|
||||
res = http_cli.term_bump(tenant_id, timeline_id, None)
|
||||
log.info(f"bump to None res: {res}")
|
||||
assert res.current_term > bump_to
|
||||
assert res.current_term > res.previous_term
|
||||
|
||||
# check that bumping doesn't work downward
|
||||
res = http_cli.term_bump(tenant_id, timeline_id, 2)
|
||||
log.info(f"bump to 2 res: {res}")
|
||||
assert res.current_term > bump_to
|
||||
assert res.current_term == res.previous_term
|
||||
|
||||
# check that this doesn't kill endpoint because last WAL flush was his and
|
||||
# thus its basebackup is still good
|
||||
endpoint.safe_psql("insert into t values (1, 'payload')")
|
||||
|
||||
|
||||
# Test disables periodic pushes from safekeeper to the broker and checks that
|
||||
# pageserver can still discover safekeepers with discovery requests.
|
||||
def test_broker_discovery(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
Reference in New Issue
Block a user