mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 22:10:39 +00:00
safekeeper: add membership configuration switch endpoint (#10241)
## Problem https://github.com/neondatabase/neon/issues/9965 ## Summary of changes Add to safekeeper http endpoint to switch membership configuration. Also add it to python client for tests, and add simple test itself.
This commit is contained in:
@@ -23,6 +23,8 @@ pub const INITIAL_GENERATION: Generation = 1;
|
||||
pub struct SafekeeperId {
|
||||
pub id: NodeId,
|
||||
pub host: String,
|
||||
/// We include here only port for computes -- that is, pg protocol tenant
|
||||
/// only port, or wide pg protocol port if the former is not configured.
|
||||
pub pg_port: u16,
|
||||
}
|
||||
|
||||
|
||||
@@ -175,6 +175,7 @@ pub enum WalReceiverStatus {
|
||||
pub struct TimelineStatus {
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: TimelineId,
|
||||
pub mconf: Configuration,
|
||||
pub acceptor_state: AcceptorStateStatus,
|
||||
pub pg_info: ServerInfo,
|
||||
pub flush_lsn: Lsn,
|
||||
@@ -189,6 +190,20 @@ pub struct TimelineStatus {
|
||||
pub walreceivers: Vec<WalReceiverState>,
|
||||
}
|
||||
|
||||
/// Request to switch membership configuration.
|
||||
#[derive(Serialize, Deserialize)]
|
||||
#[serde(transparent)]
|
||||
pub struct TimelineMembershipSwitchRequest {
|
||||
pub mconf: Configuration,
|
||||
}
|
||||
|
||||
/// In response both previous and current configuration are sent.
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct TimelineMembershipSwitchResponse {
|
||||
pub previous_conf: Configuration,
|
||||
pub current_conf: Configuration,
|
||||
}
|
||||
|
||||
fn lsn_invalid() -> Lsn {
|
||||
Lsn::INVALID
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use hyper::{Body, Request, Response, StatusCode};
|
||||
use safekeeper_api::models;
|
||||
use safekeeper_api::models::AcceptorStateStatus;
|
||||
use safekeeper_api::models::SafekeeperStatus;
|
||||
use safekeeper_api::models::TermSwitchApiEntry;
|
||||
@@ -183,6 +184,7 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
|
||||
let status = TimelineStatus {
|
||||
tenant_id: ttid.tenant_id,
|
||||
timeline_id: ttid.timeline_id,
|
||||
mconf: state.mconf,
|
||||
acceptor_state: acc_state,
|
||||
pg_info: state.server,
|
||||
flush_lsn,
|
||||
@@ -268,6 +270,28 @@ async fn timeline_snapshot_handler(request: Request<Body>) -> Result<Response<Bo
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
/// Consider switching timeline membership configuration to the provided one.
|
||||
async fn timeline_membership_handler(
|
||||
mut request: Request<Body>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let ttid = TenantTimelineId::new(
|
||||
parse_request_param(&request, "tenant_id")?,
|
||||
parse_request_param(&request, "timeline_id")?,
|
||||
);
|
||||
check_permission(&request, Some(ttid.tenant_id))?;
|
||||
|
||||
let global_timelines = get_global_timelines(&request);
|
||||
let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
|
||||
|
||||
let data: models::TimelineMembershipSwitchRequest = json_request(&mut request).await?;
|
||||
let response = tli
|
||||
.membership_switch(data.mconf)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
json_response(StatusCode::OK, response)
|
||||
}
|
||||
|
||||
async fn timeline_copy_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permission(&request, None)?;
|
||||
|
||||
@@ -619,6 +643,10 @@ pub fn make_router(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/snapshot/:destination_id",
|
||||
|r| request_span(r, timeline_snapshot_handler),
|
||||
)
|
||||
.post(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/membership",
|
||||
|r| request_span(r, timeline_membership_handler),
|
||||
)
|
||||
.post(
|
||||
"/v1/tenant/:tenant_id/timeline/:source_timeline_id/copy",
|
||||
|r| request_span(r, timeline_copy_handler),
|
||||
|
||||
@@ -6,9 +6,12 @@ use std::{cmp::max, ops::Deref, time::SystemTime};
|
||||
use anyhow::{bail, Result};
|
||||
use postgres_ffi::WAL_SEGMENT_SIZE;
|
||||
use safekeeper_api::{
|
||||
membership::Configuration, models::TimelineTermBumpResponse, ServerInfo, Term, INITIAL_TERM,
|
||||
membership::Configuration,
|
||||
models::{TimelineMembershipSwitchResponse, TimelineTermBumpResponse},
|
||||
ServerInfo, Term, INITIAL_TERM,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tracing::info;
|
||||
use utils::{
|
||||
id::{TenantId, TenantTimelineId, TimelineId},
|
||||
lsn::Lsn,
|
||||
@@ -258,6 +261,31 @@ where
|
||||
current_term: after,
|
||||
})
|
||||
}
|
||||
|
||||
/// Switch into membership configuration `to` if it is higher than the
|
||||
/// current one.
|
||||
pub async fn membership_switch(
|
||||
&mut self,
|
||||
to: Configuration,
|
||||
) -> Result<TimelineMembershipSwitchResponse> {
|
||||
let before = self.mconf.clone();
|
||||
// Is switch allowed?
|
||||
if to.generation <= self.mconf.generation {
|
||||
info!(
|
||||
"ignoring request to switch membership conf to lower {}, current conf {}",
|
||||
to, self.mconf
|
||||
);
|
||||
} else {
|
||||
let mut state = self.start_change();
|
||||
state.mconf = to.clone();
|
||||
self.finish_change(&state).await?;
|
||||
info!("switched membership conf to {} from {}", to, before);
|
||||
}
|
||||
Ok(TimelineMembershipSwitchResponse {
|
||||
previous_conf: before,
|
||||
current_conf: self.mconf.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<CTRL> Deref for TimelineState<CTRL>
|
||||
|
||||
@@ -4,7 +4,10 @@
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use remote_storage::RemotePath;
|
||||
use safekeeper_api::models::{PeerInfo, TimelineTermBumpResponse};
|
||||
use safekeeper_api::membership::Configuration;
|
||||
use safekeeper_api::models::{
|
||||
PeerInfo, TimelineMembershipSwitchResponse, TimelineTermBumpResponse,
|
||||
};
|
||||
use safekeeper_api::Term;
|
||||
use tokio::fs::{self};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -188,6 +191,13 @@ impl StateSK {
|
||||
self.state_mut().term_bump(to).await
|
||||
}
|
||||
|
||||
pub async fn membership_switch(
|
||||
&mut self,
|
||||
to: Configuration,
|
||||
) -> Result<TimelineMembershipSwitchResponse> {
|
||||
self.state_mut().membership_switch(to).await
|
||||
}
|
||||
|
||||
/// Close open WAL files to release FDs.
|
||||
fn close_wal_store(&mut self) {
|
||||
if let StateSK::Loaded(sk) = self {
|
||||
@@ -768,6 +778,14 @@ impl Timeline {
|
||||
state.sk.term_bump(to).await
|
||||
}
|
||||
|
||||
pub async fn membership_switch(
|
||||
self: &Arc<Self>,
|
||||
to: Configuration,
|
||||
) -> Result<TimelineMembershipSwitchResponse> {
|
||||
let mut state = self.write_shared_state().await;
|
||||
state.sk.membership_switch(to).await
|
||||
}
|
||||
|
||||
/// Guts of [`Self::wal_residence_guard`] and [`Self::try_wal_residence_guard`]
|
||||
async fn do_wal_residence_guard(
|
||||
self: &Arc<Self>,
|
||||
|
||||
@@ -25,6 +25,7 @@ class Walreceiver:
|
||||
|
||||
@dataclass
|
||||
class SafekeeperTimelineStatus:
|
||||
mconf: Configuration | None
|
||||
term: int
|
||||
last_log_term: int
|
||||
pg_version: int # Not exactly a PgVersion, safekeeper returns version as int, for example 150002 for 15.2
|
||||
@@ -73,7 +74,7 @@ class TermBumpResponse:
|
||||
class SafekeeperId:
|
||||
id: int
|
||||
host: str
|
||||
pg_port: str
|
||||
pg_port: int
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -82,6 +83,16 @@ class Configuration:
|
||||
members: list[SafekeeperId]
|
||||
new_members: list[SafekeeperId] | None
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, d: dict[str, Any]) -> Configuration:
|
||||
generation = d["generation"]
|
||||
members = d["members"]
|
||||
new_members = d.get("new_members")
|
||||
return Configuration(generation, members, new_members)
|
||||
|
||||
def to_json(self) -> str:
|
||||
return json.dumps(self, cls=EnhancedJSONEncoder)
|
||||
|
||||
|
||||
@dataclass
|
||||
class TimelineCreateRequest:
|
||||
@@ -97,6 +108,18 @@ class TimelineCreateRequest:
|
||||
return json.dumps(self, cls=EnhancedJSONEncoder)
|
||||
|
||||
|
||||
@dataclass
|
||||
class TimelineMembershipSwitchResponse:
|
||||
previous_conf: Configuration
|
||||
current_conf: Configuration
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, d: dict[str, Any]) -> TimelineMembershipSwitchResponse:
|
||||
previous_conf = Configuration.from_json(d["previous_conf"])
|
||||
current_conf = Configuration.from_json(d["current_conf"])
|
||||
return TimelineMembershipSwitchResponse(previous_conf, current_conf)
|
||||
|
||||
|
||||
class SafekeeperHttpClient(requests.Session, MetricsGetter):
|
||||
HTTPError = requests.HTTPError
|
||||
|
||||
@@ -170,7 +193,10 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
|
||||
res.raise_for_status()
|
||||
resj = res.json()
|
||||
walreceivers = [Walreceiver(wr["conn_id"], wr["status"]) for wr in resj["walreceivers"]]
|
||||
# It is always normally not None, it is allowed only to make forward compat tests happy.
|
||||
mconf = Configuration.from_json(resj["mconf"]) if "mconf" in resj else None
|
||||
return SafekeeperTimelineStatus(
|
||||
mconf=mconf,
|
||||
term=resj["acceptor_state"]["term"],
|
||||
last_log_term=resj["acceptor_state"]["epoch"],
|
||||
pg_version=resj["pg_info"]["pg_version"],
|
||||
@@ -196,6 +222,11 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
|
||||
def get_commit_lsn(self, tenant_id: TenantId, timeline_id: TimelineId) -> Lsn:
|
||||
return self.timeline_status(tenant_id, timeline_id).commit_lsn
|
||||
|
||||
# Get timeline membership configuration.
|
||||
def get_membership(self, tenant_id: TenantId, timeline_id: TimelineId) -> Configuration:
|
||||
# make mypy happy
|
||||
return self.timeline_status(tenant_id, timeline_id).mconf # type: ignore
|
||||
|
||||
# only_local doesn't remove segments in the remote storage.
|
||||
def timeline_delete(
|
||||
self, tenant_id: TenantId, timeline_id: TimelineId, only_local: bool = False
|
||||
@@ -242,6 +273,16 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
|
||||
assert isinstance(res_json, dict)
|
||||
return res_json
|
||||
|
||||
def membership_switch(
|
||||
self, tenant_id: TenantId, timeline_id: TimelineId, to: Configuration
|
||||
) -> TimelineMembershipSwitchResponse:
|
||||
res = self.post(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/membership",
|
||||
data=to.to_json(),
|
||||
)
|
||||
res.raise_for_status()
|
||||
return TimelineMembershipSwitchResponse.from_json(res.json())
|
||||
|
||||
def copy_timeline(self, tenant_id: TenantId, timeline_id: TimelineId, body: dict[str, Any]):
|
||||
res = self.post(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/copy",
|
||||
|
||||
@@ -48,7 +48,12 @@ from fixtures.remote_storage import (
|
||||
default_remote_storage,
|
||||
s3_storage,
|
||||
)
|
||||
from fixtures.safekeeper.http import Configuration, SafekeeperHttpClient, TimelineCreateRequest
|
||||
from fixtures.safekeeper.http import (
|
||||
Configuration,
|
||||
SafekeeperHttpClient,
|
||||
SafekeeperId,
|
||||
TimelineCreateRequest,
|
||||
)
|
||||
from fixtures.safekeeper.utils import wait_walreceivers_absent
|
||||
from fixtures.utils import (
|
||||
PropagatingThread,
|
||||
@@ -2243,6 +2248,63 @@ def test_pull_timeline_while_evicted(neon_env_builder: NeonEnvBuilder):
|
||||
wait_until(unevicted_on_dest, interval=0.1, timeout=1.0)
|
||||
|
||||
|
||||
# Basic test for http API membership related calls: create timeline and switch
|
||||
# configuration. Normally these are called by storage controller, but this
|
||||
# allows to test them separately.
|
||||
@run_only_on_default_postgres("tests only safekeeper API")
|
||||
def test_membership_api(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_safekeepers = 1
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
sk = env.safekeepers[0]
|
||||
http_cli = sk.http_client()
|
||||
|
||||
sk_id_1 = SafekeeperId(env.safekeepers[0].id, "localhost", sk.port.pg_tenant_only)
|
||||
sk_id_2 = SafekeeperId(11, "localhost", 5434) # just a mock
|
||||
|
||||
# Request to switch before timeline creation should fail.
|
||||
init_conf = Configuration(generation=1, members=[sk_id_1], new_members=None)
|
||||
with pytest.raises(requests.exceptions.HTTPError):
|
||||
http_cli.membership_switch(tenant_id, timeline_id, init_conf)
|
||||
|
||||
# Create timeline.
|
||||
create_r = TimelineCreateRequest(
|
||||
tenant_id, timeline_id, init_conf, 150002, Lsn("0/1000000"), commit_lsn=None
|
||||
)
|
||||
log.info(f"sending {create_r.to_json()}")
|
||||
http_cli.timeline_create(create_r)
|
||||
|
||||
# Switch into some conf.
|
||||
joint_conf = Configuration(generation=4, members=[sk_id_1], new_members=[sk_id_2])
|
||||
resp = http_cli.membership_switch(tenant_id, timeline_id, joint_conf)
|
||||
log.info(f"joint switch resp: {resp}")
|
||||
assert resp.previous_conf.generation == 1
|
||||
assert resp.current_conf.generation == 4
|
||||
|
||||
# Restart sk, conf should be preserved.
|
||||
sk.stop().start()
|
||||
after_restart = http_cli.get_membership(tenant_id, timeline_id)
|
||||
log.info(f"conf after restart: {after_restart}")
|
||||
assert after_restart.generation == 4
|
||||
|
||||
# Switch into disjoint conf.
|
||||
non_joint = Configuration(generation=5, members=[sk_id_2], new_members=None)
|
||||
resp = http_cli.membership_switch(tenant_id, timeline_id, non_joint)
|
||||
log.info(f"non joint switch resp: {resp}")
|
||||
assert resp.previous_conf.generation == 4
|
||||
assert resp.current_conf.generation == 5
|
||||
|
||||
# Switch request to lower conf should be ignored.
|
||||
lower_conf = Configuration(generation=3, members=[], new_members=None)
|
||||
resp = http_cli.membership_switch(tenant_id, timeline_id, lower_conf)
|
||||
log.info(f"lower switch resp: {resp}")
|
||||
assert resp.previous_conf.generation == 5
|
||||
assert resp.current_conf.generation == 5
|
||||
|
||||
|
||||
# In this test we check for excessive START_REPLICATION and START_WAL_PUSH queries
|
||||
# when compute is active, but there are no writes to the timeline. In that case
|
||||
# pageserver should maintain a single connection to safekeeper and don't attempt
|
||||
|
||||
Reference in New Issue
Block a user