mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-22 21:59:59 +00:00
storcon: implement safekeeper_migrate handler (#11849)
This PR implements a safekeeper migration algorithm from RFC-035 https://github.com/neondatabase/neon/blob/main/docs/rfcs/035-safekeeper-dynamic-membership-change.md#change-algorithm - Closes: https://github.com/neondatabase/neon/issues/11823 It is not production-ready yet, but I think it's good enough to commit and start testing. There are some known issues which will be addressed in later PRs: - https://github.com/neondatabase/neon/issues/12186 - https://github.com/neondatabase/neon/issues/12187 - https://github.com/neondatabase/neon/issues/12188 - https://github.com/neondatabase/neon/issues/12189 - https://github.com/neondatabase/neon/issues/12190 - https://github.com/neondatabase/neon/issues/12191 - https://github.com/neondatabase/neon/issues/12192 ## Summary of changes - Implement `tenant_timeline_safekeeper_migrate` handler to drive the migration - Add possibility to specify number of safekeepers per timeline in tests (`timeline_safekeeper_count`) - Add `term` and `flush_lsn` to `TimelineMembershipSwitchResponse` - Implement compare-and-swap (CAS) operation over timeline in DB for updating membership configuration safely. - Write simple test to verify that migration code works
This commit is contained in:
@@ -546,6 +546,11 @@ pub struct TimelineImportRequest {
|
||||
pub sk_set: Vec<NodeId>,
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize, serde::Deserialize, Clone)]
|
||||
pub struct TimelineSafekeeperMigrateRequest {
|
||||
pub new_sk_set: Vec<NodeId>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use serde_json;
|
||||
|
||||
@@ -210,7 +210,7 @@ pub struct TimelineStatus {
|
||||
}
|
||||
|
||||
/// Request to switch membership configuration.
|
||||
#[derive(Serialize, Deserialize)]
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
#[serde(transparent)]
|
||||
pub struct TimelineMembershipSwitchRequest {
|
||||
pub mconf: Configuration,
|
||||
@@ -221,6 +221,8 @@ pub struct TimelineMembershipSwitchRequest {
|
||||
pub struct TimelineMembershipSwitchResponse {
|
||||
pub previous_conf: Configuration,
|
||||
pub current_conf: Configuration,
|
||||
pub term: Term,
|
||||
pub flush_lsn: Lsn,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Serialize, Deserialize)]
|
||||
|
||||
@@ -9,7 +9,7 @@ use anyhow::{Result, bail};
|
||||
use postgres_ffi::WAL_SEGMENT_SIZE;
|
||||
use postgres_versioninfo::{PgMajorVersion, PgVersionId};
|
||||
use safekeeper_api::membership::Configuration;
|
||||
use safekeeper_api::models::{TimelineMembershipSwitchResponse, TimelineTermBumpResponse};
|
||||
use safekeeper_api::models::TimelineTermBumpResponse;
|
||||
use safekeeper_api::{INITIAL_TERM, ServerInfo, Term};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tracing::info;
|
||||
@@ -83,6 +83,11 @@ pub enum EvictionState {
|
||||
Offloaded(Lsn),
|
||||
}
|
||||
|
||||
pub struct MembershipSwitchResult {
|
||||
pub previous_conf: Configuration,
|
||||
pub current_conf: Configuration,
|
||||
}
|
||||
|
||||
impl TimelinePersistentState {
|
||||
/// commit_lsn is the same as start_lsn in the normal creaiton; see
|
||||
/// `TimelineCreateRequest` comments.`
|
||||
@@ -261,10 +266,7 @@ where
|
||||
|
||||
/// Switch into membership configuration `to` if it is higher than the
|
||||
/// current one.
|
||||
pub async fn membership_switch(
|
||||
&mut self,
|
||||
to: Configuration,
|
||||
) -> Result<TimelineMembershipSwitchResponse> {
|
||||
pub async fn membership_switch(&mut self, to: Configuration) -> Result<MembershipSwitchResult> {
|
||||
let before = self.mconf.clone();
|
||||
// Is switch allowed?
|
||||
if to.generation <= self.mconf.generation {
|
||||
@@ -278,7 +280,7 @@ where
|
||||
self.finish_change(&state).await?;
|
||||
info!("switched membership conf to {} from {}", to, before);
|
||||
}
|
||||
Ok(TimelineMembershipSwitchResponse {
|
||||
Ok(MembershipSwitchResult {
|
||||
previous_conf: before,
|
||||
current_conf: self.mconf.clone(),
|
||||
})
|
||||
|
||||
@@ -190,7 +190,14 @@ impl StateSK {
|
||||
&mut self,
|
||||
to: Configuration,
|
||||
) -> Result<TimelineMembershipSwitchResponse> {
|
||||
self.state_mut().membership_switch(to).await
|
||||
let result = self.state_mut().membership_switch(to).await?;
|
||||
|
||||
Ok(TimelineMembershipSwitchResponse {
|
||||
previous_conf: result.previous_conf,
|
||||
current_conf: result.current_conf,
|
||||
term: self.state().acceptor_state.term,
|
||||
flush_lsn: self.flush_lsn(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Close open WAL files to release FDs.
|
||||
|
||||
@@ -154,8 +154,8 @@ pub struct PhysicalStorage {
|
||||
/// record
|
||||
///
|
||||
/// Partial segment 002 has no WAL records, and it will be removed by the
|
||||
/// next truncate_wal(). This flag will be set to true after the first
|
||||
/// truncate_wal() call.
|
||||
/// next truncate_wal(). This flag will be set to false after the first
|
||||
/// successful truncate_wal() call.
|
||||
///
|
||||
/// [`write_lsn`]: Self::write_lsn
|
||||
pending_wal_truncation: bool,
|
||||
@@ -202,6 +202,8 @@ impl PhysicalStorage {
|
||||
ttid.timeline_id, flush_lsn, state.commit_lsn, state.peer_horizon_lsn,
|
||||
);
|
||||
if flush_lsn < state.commit_lsn {
|
||||
// note: can never happen. find_end_of_wal returns provided start_lsn
|
||||
// (state.commit_lsn in our case) if it doesn't find anything.
|
||||
bail!(
|
||||
"timeline {} potential data loss: flush_lsn {} by find_end_of_wal is less than commit_lsn {} from control file",
|
||||
ttid.timeline_id,
|
||||
|
||||
@@ -22,7 +22,7 @@ use pageserver_api::controller_api::{
|
||||
MetadataHealthListUnhealthyResponse, MetadataHealthUpdateRequest, MetadataHealthUpdateResponse,
|
||||
NodeAvailability, NodeConfigureRequest, NodeRegisterRequest, SafekeeperSchedulingPolicyRequest,
|
||||
ShardsPreferredAzsRequest, TenantCreateRequest, TenantPolicyRequest, TenantShardMigrateRequest,
|
||||
TimelineImportRequest,
|
||||
TimelineImportRequest, TimelineSafekeeperMigrateRequest,
|
||||
};
|
||||
use pageserver_api::models::{
|
||||
DetachBehavior, LsnLeaseRequest, TenantConfigPatchRequest, TenantConfigRequest,
|
||||
@@ -34,6 +34,7 @@ use pageserver_api::upcall_api::{
|
||||
PutTimelineImportStatusRequest, ReAttachRequest, TimelineImportStatusRequest, ValidateRequest,
|
||||
};
|
||||
use pageserver_client::{BlockUnblock, mgmt_api};
|
||||
|
||||
use routerify::Middleware;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::warn;
|
||||
@@ -635,6 +636,32 @@ async fn handle_tenant_timeline_download_heatmap_layers(
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn handle_tenant_timeline_safekeeper_migrate(
|
||||
service: Arc<Service>,
|
||||
req: Request<Body>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
|
||||
check_permissions(&req, Scope::PageServerApi)?;
|
||||
maybe_rate_limit(&req, tenant_id).await;
|
||||
|
||||
let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
|
||||
|
||||
let mut req = match maybe_forward(req).await {
|
||||
ForwardOutcome::Forwarded(res) => {
|
||||
return res;
|
||||
}
|
||||
ForwardOutcome::NotForwarded(req) => req,
|
||||
};
|
||||
|
||||
let migrate_req = json_request::<TimelineSafekeeperMigrateRequest>(&mut req).await?;
|
||||
|
||||
service
|
||||
.tenant_timeline_safekeeper_migrate(tenant_id, timeline_id, migrate_req)
|
||||
.await?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn handle_tenant_timeline_lsn_lease(
|
||||
service: Arc<Service>,
|
||||
req: Request<Body>,
|
||||
@@ -2458,6 +2485,16 @@ pub fn make_router(
|
||||
)
|
||||
},
|
||||
)
|
||||
.post(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/safekeeper_migrate",
|
||||
|r| {
|
||||
tenant_service_handler(
|
||||
r,
|
||||
handle_tenant_timeline_safekeeper_migrate,
|
||||
RequestName("v1_tenant_timeline_safekeeper_migrate"),
|
||||
)
|
||||
},
|
||||
)
|
||||
// LSN lease passthrough to all shards
|
||||
.post(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id/lsn_lease",
|
||||
|
||||
@@ -333,6 +333,7 @@ pub(crate) enum DatabaseErrorLabel {
|
||||
ConnectionPool,
|
||||
Logical,
|
||||
Migration,
|
||||
Cas,
|
||||
}
|
||||
|
||||
impl DatabaseError {
|
||||
@@ -343,6 +344,7 @@ impl DatabaseError {
|
||||
Self::ConnectionPool(_) => DatabaseErrorLabel::ConnectionPool,
|
||||
Self::Logical(_) => DatabaseErrorLabel::Logical,
|
||||
Self::Migration(_) => DatabaseErrorLabel::Migration,
|
||||
Self::Cas(_) => DatabaseErrorLabel::Cas,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,6 +29,7 @@ use pageserver_api::shard::{
|
||||
use rustls::client::WebPkiServerVerifier;
|
||||
use rustls::client::danger::{ServerCertVerified, ServerCertVerifier};
|
||||
use rustls::crypto::ring;
|
||||
use safekeeper_api::membership::SafekeeperGeneration;
|
||||
use scoped_futures::ScopedBoxFuture;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::generation::Generation;
|
||||
@@ -94,6 +95,8 @@ pub(crate) enum DatabaseError {
|
||||
Logical(String),
|
||||
#[error("Migration error: {0}")]
|
||||
Migration(String),
|
||||
#[error("CAS error: {0}")]
|
||||
Cas(String),
|
||||
}
|
||||
|
||||
#[derive(measured::FixedCardinalityLabel, Copy, Clone)]
|
||||
@@ -126,6 +129,7 @@ pub(crate) enum DatabaseOperation {
|
||||
UpdateLeader,
|
||||
SetPreferredAzs,
|
||||
InsertTimeline,
|
||||
UpdateTimelineMembership,
|
||||
GetTimeline,
|
||||
InsertTimelineReconcile,
|
||||
RemoveTimelineReconcile,
|
||||
@@ -1410,6 +1414,56 @@ impl Persistence {
|
||||
.await
|
||||
}
|
||||
|
||||
/// Update timeline membership configuration in the database.
|
||||
/// Perform a compare-and-swap (CAS) operation on the timeline's generation.
|
||||
/// The `new_generation` must be the next (+1) generation after the one in the database.
|
||||
pub(crate) async fn update_timeline_membership(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
new_generation: SafekeeperGeneration,
|
||||
sk_set: &[NodeId],
|
||||
new_sk_set: Option<&[NodeId]>,
|
||||
) -> DatabaseResult<()> {
|
||||
use crate::schema::timelines::dsl;
|
||||
|
||||
let prev_generation = new_generation.previous().unwrap();
|
||||
|
||||
let tenant_id = &tenant_id;
|
||||
let timeline_id = &timeline_id;
|
||||
self.with_measured_conn(DatabaseOperation::UpdateTimelineMembership, move |conn| {
|
||||
Box::pin(async move {
|
||||
let updated = diesel::update(dsl::timelines)
|
||||
.filter(dsl::tenant_id.eq(&tenant_id.to_string()))
|
||||
.filter(dsl::timeline_id.eq(&timeline_id.to_string()))
|
||||
.filter(dsl::generation.eq(prev_generation.into_inner() as i32))
|
||||
.set((
|
||||
dsl::generation.eq(new_generation.into_inner() as i32),
|
||||
dsl::sk_set.eq(sk_set.iter().map(|id| id.0 as i64).collect::<Vec<_>>()),
|
||||
dsl::new_sk_set.eq(new_sk_set
|
||||
.map(|set| set.iter().map(|id| id.0 as i64).collect::<Vec<_>>())),
|
||||
))
|
||||
.execute(conn)
|
||||
.await?;
|
||||
|
||||
match updated {
|
||||
0 => {
|
||||
// TODO(diko): It makes sense to select the current generation
|
||||
// and include it in the error message for better debuggability.
|
||||
Err(DatabaseError::Cas(
|
||||
"Failed to update membership configuration".to_string(),
|
||||
))
|
||||
}
|
||||
1 => Ok(()),
|
||||
_ => Err(DatabaseError::Logical(format!(
|
||||
"unexpected number of rows ({updated})"
|
||||
))),
|
||||
}
|
||||
})
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Load timeline from db. Returns `None` if not present.
|
||||
pub(crate) async fn get_timeline(
|
||||
&self,
|
||||
|
||||
@@ -2,6 +2,7 @@ use std::time::Duration;
|
||||
|
||||
use pageserver_api::controller_api::{SafekeeperDescribeResponse, SkSchedulingPolicy};
|
||||
use reqwest::StatusCode;
|
||||
use safekeeper_api::membership::SafekeeperId;
|
||||
use safekeeper_client::mgmt_api;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::backoff;
|
||||
@@ -92,6 +93,13 @@ impl Safekeeper {
|
||||
pub(crate) fn has_https_port(&self) -> bool {
|
||||
self.listen_https_port.is_some()
|
||||
}
|
||||
pub(crate) fn get_safekeeper_id(&self) -> SafekeeperId {
|
||||
SafekeeperId {
|
||||
id: self.id,
|
||||
host: self.skp.host.clone(),
|
||||
pg_port: self.skp.port as u16,
|
||||
}
|
||||
}
|
||||
/// Perform an operation (which is given a [`SafekeeperClient`]) with retries
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) async fn with_client_retries<T, O, F>(
|
||||
|
||||
@@ -56,6 +56,10 @@ impl SafekeeperClient {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn node_id_label(&self) -> &str {
|
||||
&self.node_id_label
|
||||
}
|
||||
|
||||
pub(crate) async fn create_timeline(
|
||||
&self,
|
||||
req: &TimelineCreateRequest,
|
||||
|
||||
@@ -161,6 +161,7 @@ enum TenantOperations {
|
||||
DropDetached,
|
||||
DownloadHeatmapLayers,
|
||||
TimelineLsnLease,
|
||||
TimelineSafekeeperMigrate,
|
||||
}
|
||||
|
||||
#[derive(Clone, strum_macros::Display)]
|
||||
@@ -491,6 +492,7 @@ impl From<DatabaseError> for ApiError {
|
||||
DatabaseError::Logical(reason) | DatabaseError::Migration(reason) => {
|
||||
ApiError::InternalServerError(anyhow::anyhow!(reason))
|
||||
}
|
||||
DatabaseError::Cas(reason) => ApiError::Conflict(reason),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -145,7 +145,7 @@ pub(crate) async fn load_schedule_requests(
|
||||
}
|
||||
let Some(sk) = safekeepers.get(&other_node_id) else {
|
||||
tracing::warn!(
|
||||
"couldnt find safekeeper with pending op id {other_node_id}, not pulling from it"
|
||||
"couldn't find safekeeper with pending op id {other_node_id}, not pulling from it"
|
||||
);
|
||||
return None;
|
||||
};
|
||||
|
||||
@@ -5,20 +5,29 @@ use std::time::Duration;
|
||||
|
||||
use super::safekeeper_reconciler::ScheduleRequest;
|
||||
use crate::heartbeater::SafekeeperState;
|
||||
use crate::id_lock_map::trace_shared_lock;
|
||||
use crate::metrics;
|
||||
use crate::persistence::{
|
||||
DatabaseError, SafekeeperTimelineOpKind, TimelinePendingOpPersistence, TimelinePersistence,
|
||||
};
|
||||
use crate::safekeeper::Safekeeper;
|
||||
use crate::safekeeper_client::SafekeeperClient;
|
||||
use crate::service::TenantOperations;
|
||||
use crate::timeline_import::TimelineImportFinalizeError;
|
||||
use anyhow::Context;
|
||||
use http_utils::error::ApiError;
|
||||
use pageserver_api::controller_api::{
|
||||
SafekeeperDescribeResponse, SkSchedulingPolicy, TimelineImportRequest,
|
||||
TimelineSafekeeperMigrateRequest,
|
||||
};
|
||||
use pageserver_api::models::{SafekeeperInfo, SafekeepersInfo, TimelineInfo};
|
||||
use safekeeper_api::PgVersionId;
|
||||
use safekeeper_api::membership::{MemberSet, SafekeeperGeneration, SafekeeperId};
|
||||
use safekeeper_api::membership::{self, MemberSet, SafekeeperGeneration};
|
||||
use safekeeper_api::models::{
|
||||
PullTimelineRequest, TimelineMembershipSwitchRequest, TimelineMembershipSwitchResponse,
|
||||
};
|
||||
use safekeeper_api::{INITIAL_TERM, Term};
|
||||
use safekeeper_client::mgmt_api;
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::id::{NodeId, TenantId, TimelineId};
|
||||
@@ -35,6 +44,33 @@ pub struct TimelineLocateResponse {
|
||||
}
|
||||
|
||||
impl Service {
|
||||
fn make_member_set(safekeepers: &[Safekeeper]) -> Result<MemberSet, ApiError> {
|
||||
let members = safekeepers
|
||||
.iter()
|
||||
.map(|sk| sk.get_safekeeper_id())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
MemberSet::new(members).map_err(ApiError::InternalServerError)
|
||||
}
|
||||
|
||||
fn get_safekeepers(&self, ids: &[i64]) -> Result<Vec<Safekeeper>, ApiError> {
|
||||
let safekeepers = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
locked.safekeepers.clone()
|
||||
};
|
||||
|
||||
ids.iter()
|
||||
.map(|&id| {
|
||||
let node_id = NodeId(id as u64);
|
||||
safekeepers.get(&node_id).cloned().ok_or_else(|| {
|
||||
ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"safekeeper {node_id} is not registered"
|
||||
))
|
||||
})
|
||||
})
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
}
|
||||
|
||||
/// Timeline creation on safekeepers
|
||||
///
|
||||
/// Returns `Ok(left)` if the timeline has been created on a quorum of safekeepers,
|
||||
@@ -47,35 +83,9 @@ impl Service {
|
||||
pg_version: PgVersionId,
|
||||
timeline_persistence: &TimelinePersistence,
|
||||
) -> Result<Vec<NodeId>, ApiError> {
|
||||
// If quorum is reached, return if we are outside of a specified timeout
|
||||
let jwt = self
|
||||
.config
|
||||
.safekeeper_jwt_token
|
||||
.clone()
|
||||
.map(SecretString::from);
|
||||
let mut joinset = JoinSet::new();
|
||||
let safekeepers = self.get_safekeepers(&timeline_persistence.sk_set)?;
|
||||
|
||||
// Prepare membership::Configuration from choosen safekeepers.
|
||||
let safekeepers = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
locked.safekeepers.clone()
|
||||
};
|
||||
|
||||
let mut members = Vec::new();
|
||||
for sk_id in timeline_persistence.sk_set.iter() {
|
||||
let sk_id = NodeId(*sk_id as u64);
|
||||
let Some(safekeeper) = safekeepers.get(&sk_id) else {
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"couldn't find entry for safekeeper with id {sk_id}"
|
||||
)))?;
|
||||
};
|
||||
members.push(SafekeeperId {
|
||||
id: sk_id,
|
||||
host: safekeeper.skp.host.clone(),
|
||||
pg_port: safekeeper.skp.port as u16,
|
||||
});
|
||||
}
|
||||
let mset = MemberSet::new(members).map_err(ApiError::InternalServerError)?;
|
||||
let mset = Self::make_member_set(&safekeepers)?;
|
||||
let mconf = safekeeper_api::membership::Configuration::new(mset);
|
||||
|
||||
let req = safekeeper_api::models::TimelineCreateRequest {
|
||||
@@ -88,79 +98,150 @@ impl Service {
|
||||
timeline_id,
|
||||
wal_seg_size: None,
|
||||
};
|
||||
|
||||
const SK_CREATE_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
for sk in timeline_persistence.sk_set.iter() {
|
||||
let sk_id = NodeId(*sk as u64);
|
||||
let safekeepers = safekeepers.clone();
|
||||
|
||||
let results = self
|
||||
.tenant_timeline_safekeeper_op_quorum(
|
||||
&safekeepers,
|
||||
move |client| {
|
||||
let req = req.clone();
|
||||
async move { client.create_timeline(&req).await }
|
||||
},
|
||||
SK_CREATE_TIMELINE_RECONCILE_TIMEOUT,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(results
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.filter_map(|(idx, res)| {
|
||||
if res.is_ok() {
|
||||
None // Success, don't return this safekeeper
|
||||
} else {
|
||||
Some(safekeepers[idx].get_id()) // Failure, return this safekeeper
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>())
|
||||
}
|
||||
|
||||
/// Perform an operation on a list of safekeepers in parallel with retries.
|
||||
///
|
||||
/// Return the results of the operation on each safekeeper in the input order.
|
||||
async fn tenant_timeline_safekeeper_op<T, O, F>(
|
||||
&self,
|
||||
safekeepers: &[Safekeeper],
|
||||
op: O,
|
||||
timeout: Duration,
|
||||
) -> Result<Vec<mgmt_api::Result<T>>, ApiError>
|
||||
where
|
||||
O: FnMut(SafekeeperClient) -> F + Send + 'static,
|
||||
O: Clone,
|
||||
F: std::future::Future<Output = mgmt_api::Result<T>> + Send + 'static,
|
||||
T: Sync + Send + 'static,
|
||||
{
|
||||
let jwt = self
|
||||
.config
|
||||
.safekeeper_jwt_token
|
||||
.clone()
|
||||
.map(SecretString::from);
|
||||
let mut joinset = JoinSet::new();
|
||||
|
||||
for (idx, sk) in safekeepers.iter().enumerate() {
|
||||
let sk = sk.clone();
|
||||
let http_client = self.http_client.clone();
|
||||
let jwt = jwt.clone();
|
||||
let req = req.clone();
|
||||
let op = op.clone();
|
||||
joinset.spawn(async move {
|
||||
// Unwrap is fine as we already would have returned error above
|
||||
let sk_p = safekeepers.get(&sk_id).unwrap();
|
||||
let res = sk_p
|
||||
let res = sk
|
||||
.with_client_retries(
|
||||
|client| {
|
||||
let req = req.clone();
|
||||
async move { client.create_timeline(&req).await }
|
||||
},
|
||||
op,
|
||||
&http_client,
|
||||
&jwt,
|
||||
3,
|
||||
3,
|
||||
SK_CREATE_TIMELINE_RECONCILE_TIMEOUT,
|
||||
// TODO(diko): This is a wrong timeout.
|
||||
// It should be scaled to the retry count.
|
||||
timeout,
|
||||
&CancellationToken::new(),
|
||||
)
|
||||
.await;
|
||||
(sk_id, sk_p.skp.host.clone(), res)
|
||||
(idx, res)
|
||||
});
|
||||
}
|
||||
|
||||
// Initialize results with timeout errors in case we never get a response.
|
||||
let mut results: Vec<mgmt_api::Result<T>> = safekeepers
|
||||
.iter()
|
||||
.map(|_| {
|
||||
Err(mgmt_api::Error::Timeout(
|
||||
"safekeeper operation timed out".to_string(),
|
||||
))
|
||||
})
|
||||
.collect();
|
||||
|
||||
// After we have built the joinset, we now wait for the tasks to complete,
|
||||
// but with a specified timeout to make sure we return swiftly, either with
|
||||
// a failure or success.
|
||||
let reconcile_deadline = tokio::time::Instant::now() + SK_CREATE_TIMELINE_RECONCILE_TIMEOUT;
|
||||
let reconcile_deadline = tokio::time::Instant::now() + timeout;
|
||||
|
||||
// Wait until all tasks finish or timeout is hit, whichever occurs
|
||||
// first.
|
||||
let mut reconcile_results = Vec::new();
|
||||
let mut result_count = 0;
|
||||
loop {
|
||||
if let Ok(res) = tokio::time::timeout_at(reconcile_deadline, joinset.join_next()).await
|
||||
{
|
||||
let Some(res) = res else { break };
|
||||
match res {
|
||||
Ok(res) => {
|
||||
Ok((idx, res)) => {
|
||||
let sk = &safekeepers[idx];
|
||||
tracing::info!(
|
||||
"response from safekeeper id:{} at {}: {:?}",
|
||||
res.0,
|
||||
res.1,
|
||||
res.2
|
||||
sk.get_id(),
|
||||
sk.skp.host,
|
||||
// Only print errors, as there is no Debug trait for T.
|
||||
res.as_ref().map(|_| ()),
|
||||
);
|
||||
reconcile_results.push(res);
|
||||
results[idx] = res;
|
||||
result_count += 1;
|
||||
}
|
||||
Err(join_err) => {
|
||||
tracing::info!("join_err for task in joinset: {join_err}");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
tracing::info!(
|
||||
"timeout for creation call after {} responses",
|
||||
reconcile_results.len()
|
||||
);
|
||||
tracing::info!("timeout for operation call after {result_count} responses",);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Now check now if quorum was reached in reconcile_results.
|
||||
let total_result_count = reconcile_results.len();
|
||||
let remaining = reconcile_results
|
||||
.into_iter()
|
||||
.filter_map(|res| res.2.is_err().then_some(res.0))
|
||||
.collect::<Vec<_>>();
|
||||
tracing::info!(
|
||||
"Got {} non-successful responses from initial creation request of total {total_result_count} responses",
|
||||
remaining.len()
|
||||
);
|
||||
let target_sk_count = timeline_persistence.sk_set.len();
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
/// Perform an operation on a list of safekeepers in parallel with retries,
|
||||
/// and validates that we reach a quorum of successful responses.
|
||||
///
|
||||
/// Return the results of the operation on each safekeeper in the input order.
|
||||
/// It's guaranteed that at least a quorum of the responses are successful.
|
||||
async fn tenant_timeline_safekeeper_op_quorum<T, O, F>(
|
||||
&self,
|
||||
safekeepers: &[Safekeeper],
|
||||
op: O,
|
||||
timeout: Duration,
|
||||
) -> Result<Vec<mgmt_api::Result<T>>, ApiError>
|
||||
where
|
||||
O: FnMut(SafekeeperClient) -> F,
|
||||
O: Clone + Send + 'static,
|
||||
F: std::future::Future<Output = mgmt_api::Result<T>> + Send + 'static,
|
||||
T: Sync + Send + 'static,
|
||||
{
|
||||
let results = self
|
||||
.tenant_timeline_safekeeper_op(safekeepers, op, timeout)
|
||||
.await?;
|
||||
|
||||
// Now check if quorum was reached in results.
|
||||
|
||||
let target_sk_count = safekeepers.len();
|
||||
let quorum_size = match target_sk_count {
|
||||
0 => {
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
@@ -179,7 +260,7 @@ impl Service {
|
||||
// in order to schedule work to them
|
||||
tracing::warn!(
|
||||
"couldn't find at least 3 safekeepers for timeline, found: {:?}",
|
||||
timeline_persistence.sk_set
|
||||
target_sk_count
|
||||
);
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"couldn't find at least 3 safekeepers to put timeline to"
|
||||
@@ -188,7 +269,7 @@ impl Service {
|
||||
}
|
||||
_ => target_sk_count / 2 + 1,
|
||||
};
|
||||
let success_count = target_sk_count - remaining.len();
|
||||
let success_count = results.iter().filter(|res| res.is_ok()).count();
|
||||
if success_count < quorum_size {
|
||||
// Failure
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
@@ -196,7 +277,7 @@ impl Service {
|
||||
)));
|
||||
}
|
||||
|
||||
Ok(remaining)
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
/// Create timeline in controller database and on safekeepers.
|
||||
@@ -797,4 +878,435 @@ impl Service {
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Call `switch_timeline_membership` on all safekeepers with retries
|
||||
/// till the quorum of successful responses is reached.
|
||||
///
|
||||
/// If min_position is not None, validates that majority of safekeepers
|
||||
/// reached at least min_position.
|
||||
///
|
||||
/// Return responses from safekeepers in the input order.
|
||||
async fn tenant_timeline_set_membership_quorum(
|
||||
self: &Arc<Self>,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
safekeepers: &[Safekeeper],
|
||||
config: &membership::Configuration,
|
||||
min_position: Option<(Term, Lsn)>,
|
||||
) -> Result<Vec<mgmt_api::Result<TimelineMembershipSwitchResponse>>, ApiError> {
|
||||
let req = TimelineMembershipSwitchRequest {
|
||||
mconf: config.clone(),
|
||||
};
|
||||
|
||||
const SK_SET_MEM_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
|
||||
let results = self
|
||||
.tenant_timeline_safekeeper_op_quorum(
|
||||
safekeepers,
|
||||
move |client| {
|
||||
let req = req.clone();
|
||||
async move {
|
||||
let mut res = client
|
||||
.switch_timeline_membership(tenant_id, timeline_id, &req)
|
||||
.await;
|
||||
|
||||
// If min_position is not reached, map the response to an error,
|
||||
// so it isn't counted toward the quorum.
|
||||
if let Some(min_position) = min_position {
|
||||
if let Ok(ok_res) = &res {
|
||||
if (ok_res.term, ok_res.flush_lsn) < min_position {
|
||||
// Use Error::Timeout to make this error retriable.
|
||||
res = Err(mgmt_api::Error::Timeout(
|
||||
format!(
|
||||
"safekeeper {} returned position {:?} which is less than minimum required position {:?}",
|
||||
client.node_id_label(),
|
||||
(ok_res.term, ok_res.flush_lsn),
|
||||
min_position
|
||||
)
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
res
|
||||
}
|
||||
},
|
||||
SK_SET_MEM_TIMELINE_RECONCILE_TIMEOUT,
|
||||
)
|
||||
.await?;
|
||||
|
||||
for res in results.iter().flatten() {
|
||||
if res.current_conf.generation > config.generation {
|
||||
// Antoher switch_membership raced us.
|
||||
return Err(ApiError::Conflict(format!(
|
||||
"received configuration with generation {} from safekeeper, but expected {}",
|
||||
res.current_conf.generation, config.generation
|
||||
)));
|
||||
} else if res.current_conf.generation < config.generation {
|
||||
// Note: should never happen.
|
||||
// If we get a response, it should be at least the sent generation.
|
||||
tracing::error!(
|
||||
"received configuration with generation {} from safekeeper, but expected {}",
|
||||
res.current_conf.generation,
|
||||
config.generation
|
||||
);
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"received configuration with generation {} from safekeeper, but expected {}",
|
||||
res.current_conf.generation,
|
||||
config.generation
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
/// Pull timeline to to_safekeepers from from_safekeepers with retries.
|
||||
///
|
||||
/// Returns Ok(()) only if all the pull_timeline requests were successful.
|
||||
async fn tenant_timeline_pull_from_peers(
|
||||
self: &Arc<Self>,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
to_safekeepers: &[Safekeeper],
|
||||
from_safekeepers: &[Safekeeper],
|
||||
) -> Result<(), ApiError> {
|
||||
let http_hosts = from_safekeepers
|
||||
.iter()
|
||||
.map(|sk| sk.base_url())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
tracing::info!(
|
||||
"pulling timeline to {:?} from {:?}",
|
||||
to_safekeepers
|
||||
.iter()
|
||||
.map(|sk| sk.get_id())
|
||||
.collect::<Vec<_>>(),
|
||||
from_safekeepers
|
||||
.iter()
|
||||
.map(|sk| sk.get_id())
|
||||
.collect::<Vec<_>>()
|
||||
);
|
||||
|
||||
// TODO(diko): need to pass mconf/generation with the request
|
||||
// to properly handle tombstones. Ignore tombstones for now.
|
||||
// Worst case: we leave a timeline on a safekeeper which is not in the current set.
|
||||
let req = PullTimelineRequest {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
http_hosts,
|
||||
ignore_tombstone: Some(true),
|
||||
};
|
||||
|
||||
const SK_PULL_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
|
||||
let responses = self
|
||||
.tenant_timeline_safekeeper_op(
|
||||
to_safekeepers,
|
||||
move |client| {
|
||||
let req = req.clone();
|
||||
async move { client.pull_timeline(&req).await }
|
||||
},
|
||||
SK_PULL_TIMELINE_RECONCILE_TIMEOUT,
|
||||
)
|
||||
.await?;
|
||||
|
||||
if let Some((idx, err)) = responses
|
||||
.iter()
|
||||
.enumerate()
|
||||
.find_map(|(idx, res)| Some((idx, res.as_ref().err()?)))
|
||||
{
|
||||
let sk_id = to_safekeepers[idx].get_id();
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"pull_timeline to {sk_id} failed: {err}",
|
||||
)));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Exclude a timeline from safekeepers in parallel with retries.
|
||||
/// If an exclude request is unsuccessful, it will be added to
|
||||
/// the reconciler, and after that the function will succeed.
|
||||
async fn tenant_timeline_safekeeper_exclude(
|
||||
self: &Arc<Self>,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
safekeepers: &[Safekeeper],
|
||||
config: &membership::Configuration,
|
||||
) -> Result<(), ApiError> {
|
||||
let req = TimelineMembershipSwitchRequest {
|
||||
mconf: config.clone(),
|
||||
};
|
||||
|
||||
const SK_EXCLUDE_TIMELINE_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
|
||||
let results = self
|
||||
.tenant_timeline_safekeeper_op(
|
||||
safekeepers,
|
||||
move |client| {
|
||||
let req = req.clone();
|
||||
async move { client.exclude_timeline(tenant_id, timeline_id, &req).await }
|
||||
},
|
||||
SK_EXCLUDE_TIMELINE_TIMEOUT,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut reconcile_requests = Vec::new();
|
||||
|
||||
for (idx, res) in results.iter().enumerate() {
|
||||
if res.is_err() {
|
||||
let sk_id = safekeepers[idx].skp.id;
|
||||
let pending_op = TimelinePendingOpPersistence {
|
||||
tenant_id: tenant_id.to_string(),
|
||||
timeline_id: timeline_id.to_string(),
|
||||
generation: config.generation.into_inner() as i32,
|
||||
op_kind: SafekeeperTimelineOpKind::Exclude,
|
||||
sk_id,
|
||||
};
|
||||
tracing::info!("writing pending exclude op for sk id {sk_id}");
|
||||
self.persistence.insert_pending_op(pending_op).await?;
|
||||
|
||||
let req = ScheduleRequest {
|
||||
safekeeper: Box::new(safekeepers[idx].clone()),
|
||||
host_list: Vec::new(),
|
||||
tenant_id,
|
||||
timeline_id: Some(timeline_id),
|
||||
generation: config.generation.into_inner(),
|
||||
kind: SafekeeperTimelineOpKind::Exclude,
|
||||
};
|
||||
reconcile_requests.push(req);
|
||||
}
|
||||
}
|
||||
|
||||
if !reconcile_requests.is_empty() {
|
||||
let locked = self.inner.read().unwrap();
|
||||
for req in reconcile_requests {
|
||||
locked.safekeeper_reconcilers.schedule_request(req);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Migrate timeline safekeeper set to a new set.
|
||||
///
|
||||
/// This function implements an algorithm from RFC-035.
|
||||
/// <https://github.com/neondatabase/neon/blob/main/docs/rfcs/035-safekeeper-dynamic-membership-change.md>
|
||||
pub(crate) async fn tenant_timeline_safekeeper_migrate(
|
||||
self: &Arc<Self>,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
req: TimelineSafekeeperMigrateRequest,
|
||||
) -> Result<(), ApiError> {
|
||||
let all_safekeepers = self.inner.read().unwrap().safekeepers.clone();
|
||||
|
||||
let new_sk_set = req.new_sk_set;
|
||||
|
||||
for sk_id in new_sk_set.iter() {
|
||||
if !all_safekeepers.contains_key(sk_id) {
|
||||
return Err(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"safekeeper {sk_id} does not exist"
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(diko): per-tenant lock is too wide. Consider introducing per-timeline locks.
|
||||
let _tenant_lock = trace_shared_lock(
|
||||
&self.tenant_op_locks,
|
||||
tenant_id,
|
||||
TenantOperations::TimelineSafekeeperMigrate,
|
||||
)
|
||||
.await;
|
||||
|
||||
// 1. Fetch current timeline configuration from the configuration storage.
|
||||
|
||||
let timeline = self
|
||||
.persistence
|
||||
.get_timeline(tenant_id, timeline_id)
|
||||
.await?;
|
||||
|
||||
let Some(timeline) = timeline else {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!(
|
||||
"timeline {tenant_id}/{timeline_id} doesn't exist in timelines table"
|
||||
)
|
||||
.into(),
|
||||
));
|
||||
};
|
||||
|
||||
let cur_sk_set = timeline
|
||||
.sk_set
|
||||
.iter()
|
||||
.map(|&id| NodeId(id as u64))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
tracing::info!(
|
||||
?cur_sk_set,
|
||||
?new_sk_set,
|
||||
"Migrating timeline to new safekeeper set",
|
||||
);
|
||||
|
||||
let mut generation = SafekeeperGeneration::new(timeline.generation as u32);
|
||||
|
||||
if let Some(ref presistent_new_sk_set) = timeline.new_sk_set {
|
||||
// 2. If it is already joint one and new_set is different from desired_set refuse to change.
|
||||
if presistent_new_sk_set
|
||||
.iter()
|
||||
.map(|&id| NodeId(id as u64))
|
||||
.ne(new_sk_set.iter().cloned())
|
||||
{
|
||||
tracing::info!(
|
||||
?presistent_new_sk_set,
|
||||
?new_sk_set,
|
||||
"different new safekeeper set is already set in the database",
|
||||
);
|
||||
return Err(ApiError::Conflict(format!(
|
||||
"the timeline is already migrating to a different safekeeper set: {presistent_new_sk_set:?}"
|
||||
)));
|
||||
}
|
||||
// It it is the same new_sk_set, we can continue the migration (retry).
|
||||
} else {
|
||||
// 3. No active migration yet.
|
||||
// Increment current generation and put desired_set to new_sk_set.
|
||||
generation = generation.next();
|
||||
|
||||
self.persistence
|
||||
.update_timeline_membership(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
generation,
|
||||
&cur_sk_set,
|
||||
Some(&new_sk_set),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
let cur_safekeepers = self.get_safekeepers(&timeline.sk_set)?;
|
||||
let cur_sk_member_set = Self::make_member_set(&cur_safekeepers)?;
|
||||
|
||||
let new_sk_set_i64 = new_sk_set.iter().map(|id| id.0 as i64).collect::<Vec<_>>();
|
||||
let new_safekeepers = self.get_safekeepers(&new_sk_set_i64)?;
|
||||
let new_sk_member_set = Self::make_member_set(&new_safekeepers)?;
|
||||
|
||||
let joint_config = membership::Configuration {
|
||||
generation,
|
||||
members: cur_sk_member_set,
|
||||
new_members: Some(new_sk_member_set.clone()),
|
||||
};
|
||||
|
||||
// 4. Call PUT configuration on safekeepers from the current set,
|
||||
// delivering them joint_conf.
|
||||
|
||||
// TODO(diko): need to notify cplane with an updated set of safekeepers.
|
||||
|
||||
let results = self
|
||||
.tenant_timeline_set_membership_quorum(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
&cur_safekeepers,
|
||||
&joint_config,
|
||||
None, // no min position
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut sync_position = (INITIAL_TERM, Lsn::INVALID);
|
||||
for res in results.into_iter().flatten() {
|
||||
let sk_position = (res.term, res.flush_lsn);
|
||||
if sync_position < sk_position {
|
||||
sync_position = sk_position;
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!(
|
||||
%generation,
|
||||
?sync_position,
|
||||
"safekeepers set membership updated",
|
||||
);
|
||||
|
||||
// 5. Initialize timeline on safekeeper(s) from new_sk_set where it doesn't exist yet
|
||||
// by doing pull_timeline from the majority of the current set.
|
||||
|
||||
// Filter out safekeepers which are already in the current set.
|
||||
let from_ids: HashSet<NodeId> = cur_safekeepers.iter().map(|sk| sk.get_id()).collect();
|
||||
let pull_to_safekeepers = new_safekeepers
|
||||
.iter()
|
||||
.filter(|sk| !from_ids.contains(&sk.get_id()))
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
self.tenant_timeline_pull_from_peers(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
&pull_to_safekeepers,
|
||||
&cur_safekeepers,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// 6. Call POST bump_term(sync_term) on safekeepers from the new set. Success on majority is enough.
|
||||
|
||||
// TODO(diko): do we need to bump timeline term?
|
||||
|
||||
// 7. Repeatedly call PUT configuration on safekeepers from the new set,
|
||||
// delivering them joint_conf and collecting their positions.
|
||||
|
||||
tracing::info!(?sync_position, "waiting for safekeepers to sync position");
|
||||
|
||||
self.tenant_timeline_set_membership_quorum(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
&new_safekeepers,
|
||||
&joint_config,
|
||||
Some(sync_position),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// 8. Create new_conf: Configuration incrementing joint_conf generation and
|
||||
// having new safekeeper set as sk_set and None new_sk_set.
|
||||
|
||||
let generation = generation.next();
|
||||
|
||||
let new_conf = membership::Configuration {
|
||||
generation,
|
||||
members: new_sk_member_set,
|
||||
new_members: None,
|
||||
};
|
||||
|
||||
self.persistence
|
||||
.update_timeline_membership(tenant_id, timeline_id, generation, &new_sk_set, None)
|
||||
.await?;
|
||||
|
||||
// TODO(diko): at this point we have already updated the timeline in the database,
|
||||
// but we still need to notify safekeepers and cplane about the new configuration,
|
||||
// and put delition of the timeline from the old safekeepers into the reconciler.
|
||||
// Ideally it should be done atomically, but now it's not.
|
||||
// Worst case: the timeline is not deleted from old safekeepers,
|
||||
// the compute may require both quorums till the migration is retried and completed.
|
||||
|
||||
self.tenant_timeline_set_membership_quorum(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
&new_safekeepers,
|
||||
&new_conf,
|
||||
None, // no min position
|
||||
)
|
||||
.await?;
|
||||
|
||||
let new_ids: HashSet<NodeId> = new_safekeepers.iter().map(|sk| sk.get_id()).collect();
|
||||
let exclude_safekeepers = cur_safekeepers
|
||||
.into_iter()
|
||||
.filter(|sk| !new_ids.contains(&sk.get_id()))
|
||||
.collect::<Vec<_>>();
|
||||
self.tenant_timeline_safekeeper_exclude(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
&exclude_safekeepers,
|
||||
&new_conf,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// TODO(diko): need to notify cplane with an updated set of safekeepers.
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1215,6 +1215,13 @@ class NeonEnv:
|
||||
storage_controller_config = storage_controller_config or {}
|
||||
storage_controller_config["use_https_safekeeper_api"] = True
|
||||
|
||||
# TODO(diko): uncomment when timeline_safekeeper_count option is in the release branch,
|
||||
# so the compat tests will not fail bacause of it presence.
|
||||
# if config.num_safekeepers < 3:
|
||||
# storage_controller_config = storage_controller_config or {}
|
||||
# if "timeline_safekeeper_count" not in storage_controller_config:
|
||||
# storage_controller_config["timeline_safekeeper_count"] = config.num_safekeepers
|
||||
|
||||
if storage_controller_config is not None:
|
||||
cfg["storage_controller"] = storage_controller_config
|
||||
|
||||
@@ -2226,6 +2233,21 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
response.raise_for_status()
|
||||
log.info(f"timeline_create success: {response.json()}")
|
||||
|
||||
def migrate_safekeepers(
|
||||
self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
new_sk_set: list[int],
|
||||
):
|
||||
response = self.request(
|
||||
"POST",
|
||||
f"{self.api}/v1/tenant/{tenant_id}/timeline/{timeline_id}/safekeeper_migrate",
|
||||
json={"new_sk_set": new_sk_set},
|
||||
headers=self.headers(TokenScope.PAGE_SERVER_API),
|
||||
)
|
||||
response.raise_for_status()
|
||||
log.info(f"migrate_safekeepers success: {response.json()}")
|
||||
|
||||
def locate(self, tenant_id: TenantId) -> list[dict[str, Any]]:
|
||||
"""
|
||||
:return: list of {"shard_id": "", "node_id": int, "listen_pg_addr": str, "listen_pg_port": int, "listen_http_addr": str, "listen_http_port": int}
|
||||
|
||||
64
test_runner/regress/test_safekeeper_migration.py
Normal file
64
test_runner/regress/test_safekeeper_migration.py
Normal file
@@ -0,0 +1,64 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
|
||||
|
||||
def test_safekeeper_migration_simple(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Simple safekeeper migration test.
|
||||
Creates 3 safekeepers. The timeline is configuret to use only one safekeeper.
|
||||
1. Go through all safekeepers, migrate the timeline to it.
|
||||
2. Stop the other safekeepers. Validate that the insert is successful.
|
||||
3. Start the other safekeepers again and go to the next safekeeper.
|
||||
4. Validate that the table contains all inserted values.
|
||||
"""
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
neon_env_builder.storage_controller_config = {
|
||||
"timelines_onto_safekeepers": True,
|
||||
"timeline_safekeeper_count": 1,
|
||||
}
|
||||
env = neon_env_builder.init_start()
|
||||
# TODO(diko): pageserver spams with various errors during safekeeper migration.
|
||||
# Fix the code so it handles the migration better.
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
".*Timeline .* was cancelled and cannot be used anymore.*",
|
||||
".*Timeline .* has been deleted.*",
|
||||
".*wal receiver task finished with an error.*",
|
||||
]
|
||||
)
|
||||
|
||||
ep = env.endpoints.create("main", tenant_id=env.initial_tenant)
|
||||
# We specify all safekeepers, so compute will connect to all of them.
|
||||
# Only those from the current membership configuration will be used.
|
||||
# TODO(diko): set only current safekeepers when cplane notify is implemented.
|
||||
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3])
|
||||
ep.safe_psql("CREATE EXTENSION neon_test_utils;")
|
||||
ep.safe_psql("CREATE TABLE t(a int)")
|
||||
|
||||
for active_sk in range(1, 4):
|
||||
env.storage_controller.migrate_safekeepers(
|
||||
env.initial_tenant, env.initial_timeline, [active_sk]
|
||||
)
|
||||
|
||||
other_sks = [sk for sk in range(1, 4) if sk != active_sk]
|
||||
|
||||
for sk in other_sks:
|
||||
env.safekeepers[sk - 1].stop()
|
||||
|
||||
ep.safe_psql(f"INSERT INTO t VALUES ({active_sk})")
|
||||
|
||||
for sk in other_sks:
|
||||
env.safekeepers[sk - 1].start()
|
||||
|
||||
ep.clear_buffers()
|
||||
|
||||
assert ep.safe_psql("SELECT * FROM t") == [(i,) for i in range(1, 4)]
|
||||
|
||||
ep.stop()
|
||||
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3])
|
||||
|
||||
assert ep.safe_psql("SELECT * FROM t") == [(i,) for i in range(1, 4)]
|
||||
Reference in New Issue
Block a user