diff --git a/libs/pageserver_api/src/controller_api.rs b/libs/pageserver_api/src/controller_api.rs index ff18d40bfe..a8080a57e9 100644 --- a/libs/pageserver_api/src/controller_api.rs +++ b/libs/pageserver_api/src/controller_api.rs @@ -546,6 +546,11 @@ pub struct TimelineImportRequest { pub sk_set: Vec, } +#[derive(serde::Serialize, serde::Deserialize, Clone)] +pub struct TimelineSafekeeperMigrateRequest { + pub new_sk_set: Vec, +} + #[cfg(test)] mod test { use serde_json; diff --git a/libs/safekeeper_api/src/models.rs b/libs/safekeeper_api/src/models.rs index 5c1ee41f7b..1774489c1c 100644 --- a/libs/safekeeper_api/src/models.rs +++ b/libs/safekeeper_api/src/models.rs @@ -210,7 +210,7 @@ pub struct TimelineStatus { } /// Request to switch membership configuration. -#[derive(Serialize, Deserialize)] +#[derive(Clone, Serialize, Deserialize)] #[serde(transparent)] pub struct TimelineMembershipSwitchRequest { pub mconf: Configuration, @@ -221,6 +221,8 @@ pub struct TimelineMembershipSwitchRequest { pub struct TimelineMembershipSwitchResponse { pub previous_conf: Configuration, pub current_conf: Configuration, + pub term: Term, + pub flush_lsn: Lsn, } #[derive(Clone, Copy, Serialize, Deserialize)] diff --git a/safekeeper/src/state.rs b/safekeeper/src/state.rs index b6cf73be2e..32624d260d 100644 --- a/safekeeper/src/state.rs +++ b/safekeeper/src/state.rs @@ -9,7 +9,7 @@ use anyhow::{Result, bail}; use postgres_ffi::WAL_SEGMENT_SIZE; use postgres_versioninfo::{PgMajorVersion, PgVersionId}; use safekeeper_api::membership::Configuration; -use safekeeper_api::models::{TimelineMembershipSwitchResponse, TimelineTermBumpResponse}; +use safekeeper_api::models::TimelineTermBumpResponse; use safekeeper_api::{INITIAL_TERM, ServerInfo, Term}; use serde::{Deserialize, Serialize}; use tracing::info; @@ -83,6 +83,11 @@ pub enum EvictionState { Offloaded(Lsn), } +pub struct MembershipSwitchResult { + pub previous_conf: Configuration, + pub current_conf: Configuration, +} + impl TimelinePersistentState { /// commit_lsn is the same as start_lsn in the normal creaiton; see /// `TimelineCreateRequest` comments.` @@ -261,10 +266,7 @@ where /// Switch into membership configuration `to` if it is higher than the /// current one. - pub async fn membership_switch( - &mut self, - to: Configuration, - ) -> Result { + pub async fn membership_switch(&mut self, to: Configuration) -> Result { let before = self.mconf.clone(); // Is switch allowed? if to.generation <= self.mconf.generation { @@ -278,7 +280,7 @@ where self.finish_change(&state).await?; info!("switched membership conf to {} from {}", to, before); } - Ok(TimelineMembershipSwitchResponse { + Ok(MembershipSwitchResult { previous_conf: before, current_conf: self.mconf.clone(), }) diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 2bee41537f..0a27876862 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -190,7 +190,14 @@ impl StateSK { &mut self, to: Configuration, ) -> Result { - self.state_mut().membership_switch(to).await + let result = self.state_mut().membership_switch(to).await?; + + Ok(TimelineMembershipSwitchResponse { + previous_conf: result.previous_conf, + current_conf: result.current_conf, + term: self.state().acceptor_state.term, + flush_lsn: self.flush_lsn(), + }) } /// Close open WAL files to release FDs. diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index 33310706be..70e53d86ee 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -154,8 +154,8 @@ pub struct PhysicalStorage { /// record /// /// Partial segment 002 has no WAL records, and it will be removed by the - /// next truncate_wal(). This flag will be set to true after the first - /// truncate_wal() call. + /// next truncate_wal(). This flag will be set to false after the first + /// successful truncate_wal() call. /// /// [`write_lsn`]: Self::write_lsn pending_wal_truncation: bool, @@ -202,6 +202,8 @@ impl PhysicalStorage { ttid.timeline_id, flush_lsn, state.commit_lsn, state.peer_horizon_lsn, ); if flush_lsn < state.commit_lsn { + // note: can never happen. find_end_of_wal returns provided start_lsn + // (state.commit_lsn in our case) if it doesn't find anything. bail!( "timeline {} potential data loss: flush_lsn {} by find_end_of_wal is less than commit_lsn {} from control file", ttid.timeline_id, diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs index a7e86b5224..66c44b5674 100644 --- a/storage_controller/src/http.rs +++ b/storage_controller/src/http.rs @@ -22,7 +22,7 @@ use pageserver_api::controller_api::{ MetadataHealthListUnhealthyResponse, MetadataHealthUpdateRequest, MetadataHealthUpdateResponse, NodeAvailability, NodeConfigureRequest, NodeRegisterRequest, SafekeeperSchedulingPolicyRequest, ShardsPreferredAzsRequest, TenantCreateRequest, TenantPolicyRequest, TenantShardMigrateRequest, - TimelineImportRequest, + TimelineImportRequest, TimelineSafekeeperMigrateRequest, }; use pageserver_api::models::{ DetachBehavior, LsnLeaseRequest, TenantConfigPatchRequest, TenantConfigRequest, @@ -34,6 +34,7 @@ use pageserver_api::upcall_api::{ PutTimelineImportStatusRequest, ReAttachRequest, TimelineImportStatusRequest, ValidateRequest, }; use pageserver_client::{BlockUnblock, mgmt_api}; + use routerify::Middleware; use tokio_util::sync::CancellationToken; use tracing::warn; @@ -635,6 +636,32 @@ async fn handle_tenant_timeline_download_heatmap_layers( json_response(StatusCode::OK, ()) } +async fn handle_tenant_timeline_safekeeper_migrate( + service: Arc, + req: Request, +) -> Result, ApiError> { + let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?; + check_permissions(&req, Scope::PageServerApi)?; + maybe_rate_limit(&req, tenant_id).await; + + let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?; + + let mut req = match maybe_forward(req).await { + ForwardOutcome::Forwarded(res) => { + return res; + } + ForwardOutcome::NotForwarded(req) => req, + }; + + let migrate_req = json_request::(&mut req).await?; + + service + .tenant_timeline_safekeeper_migrate(tenant_id, timeline_id, migrate_req) + .await?; + + json_response(StatusCode::OK, ()) +} + async fn handle_tenant_timeline_lsn_lease( service: Arc, req: Request, @@ -2458,6 +2485,16 @@ pub fn make_router( ) }, ) + .post( + "/v1/tenant/:tenant_id/timeline/:timeline_id/safekeeper_migrate", + |r| { + tenant_service_handler( + r, + handle_tenant_timeline_safekeeper_migrate, + RequestName("v1_tenant_timeline_safekeeper_migrate"), + ) + }, + ) // LSN lease passthrough to all shards .post( "/v1/tenant/:tenant_id/timeline/:timeline_id/lsn_lease", diff --git a/storage_controller/src/metrics.rs b/storage_controller/src/metrics.rs index 07713c3fbc..f7f77cdd23 100644 --- a/storage_controller/src/metrics.rs +++ b/storage_controller/src/metrics.rs @@ -333,6 +333,7 @@ pub(crate) enum DatabaseErrorLabel { ConnectionPool, Logical, Migration, + Cas, } impl DatabaseError { @@ -343,6 +344,7 @@ impl DatabaseError { Self::ConnectionPool(_) => DatabaseErrorLabel::ConnectionPool, Self::Logical(_) => DatabaseErrorLabel::Logical, Self::Migration(_) => DatabaseErrorLabel::Migration, + Self::Cas(_) => DatabaseErrorLabel::Cas, } } } diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index 2948e9019f..56f4d03111 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -29,6 +29,7 @@ use pageserver_api::shard::{ use rustls::client::WebPkiServerVerifier; use rustls::client::danger::{ServerCertVerified, ServerCertVerifier}; use rustls::crypto::ring; +use safekeeper_api::membership::SafekeeperGeneration; use scoped_futures::ScopedBoxFuture; use serde::{Deserialize, Serialize}; use utils::generation::Generation; @@ -94,6 +95,8 @@ pub(crate) enum DatabaseError { Logical(String), #[error("Migration error: {0}")] Migration(String), + #[error("CAS error: {0}")] + Cas(String), } #[derive(measured::FixedCardinalityLabel, Copy, Clone)] @@ -126,6 +129,7 @@ pub(crate) enum DatabaseOperation { UpdateLeader, SetPreferredAzs, InsertTimeline, + UpdateTimelineMembership, GetTimeline, InsertTimelineReconcile, RemoveTimelineReconcile, @@ -1410,6 +1414,56 @@ impl Persistence { .await } + /// Update timeline membership configuration in the database. + /// Perform a compare-and-swap (CAS) operation on the timeline's generation. + /// The `new_generation` must be the next (+1) generation after the one in the database. + pub(crate) async fn update_timeline_membership( + &self, + tenant_id: TenantId, + timeline_id: TimelineId, + new_generation: SafekeeperGeneration, + sk_set: &[NodeId], + new_sk_set: Option<&[NodeId]>, + ) -> DatabaseResult<()> { + use crate::schema::timelines::dsl; + + let prev_generation = new_generation.previous().unwrap(); + + let tenant_id = &tenant_id; + let timeline_id = &timeline_id; + self.with_measured_conn(DatabaseOperation::UpdateTimelineMembership, move |conn| { + Box::pin(async move { + let updated = diesel::update(dsl::timelines) + .filter(dsl::tenant_id.eq(&tenant_id.to_string())) + .filter(dsl::timeline_id.eq(&timeline_id.to_string())) + .filter(dsl::generation.eq(prev_generation.into_inner() as i32)) + .set(( + dsl::generation.eq(new_generation.into_inner() as i32), + dsl::sk_set.eq(sk_set.iter().map(|id| id.0 as i64).collect::>()), + dsl::new_sk_set.eq(new_sk_set + .map(|set| set.iter().map(|id| id.0 as i64).collect::>())), + )) + .execute(conn) + .await?; + + match updated { + 0 => { + // TODO(diko): It makes sense to select the current generation + // and include it in the error message for better debuggability. + Err(DatabaseError::Cas( + "Failed to update membership configuration".to_string(), + )) + } + 1 => Ok(()), + _ => Err(DatabaseError::Logical(format!( + "unexpected number of rows ({updated})" + ))), + } + }) + }) + .await + } + /// Load timeline from db. Returns `None` if not present. pub(crate) async fn get_timeline( &self, diff --git a/storage_controller/src/safekeeper.rs b/storage_controller/src/safekeeper.rs index 5a13ef750e..91154f4fa3 100644 --- a/storage_controller/src/safekeeper.rs +++ b/storage_controller/src/safekeeper.rs @@ -2,6 +2,7 @@ use std::time::Duration; use pageserver_api::controller_api::{SafekeeperDescribeResponse, SkSchedulingPolicy}; use reqwest::StatusCode; +use safekeeper_api::membership::SafekeeperId; use safekeeper_client::mgmt_api; use tokio_util::sync::CancellationToken; use utils::backoff; @@ -92,6 +93,13 @@ impl Safekeeper { pub(crate) fn has_https_port(&self) -> bool { self.listen_https_port.is_some() } + pub(crate) fn get_safekeeper_id(&self) -> SafekeeperId { + SafekeeperId { + id: self.id, + host: self.skp.host.clone(), + pg_port: self.skp.port as u16, + } + } /// Perform an operation (which is given a [`SafekeeperClient`]) with retries #[allow(clippy::too_many_arguments)] pub(crate) async fn with_client_retries( diff --git a/storage_controller/src/safekeeper_client.rs b/storage_controller/src/safekeeper_client.rs index bcf223c731..47a785e7d3 100644 --- a/storage_controller/src/safekeeper_client.rs +++ b/storage_controller/src/safekeeper_client.rs @@ -56,6 +56,10 @@ impl SafekeeperClient { } } + pub(crate) fn node_id_label(&self) -> &str { + &self.node_id_label + } + pub(crate) async fn create_timeline( &self, req: &TimelineCreateRequest, diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 75ce7bc37b..bbf93fd751 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -161,6 +161,7 @@ enum TenantOperations { DropDetached, DownloadHeatmapLayers, TimelineLsnLease, + TimelineSafekeeperMigrate, } #[derive(Clone, strum_macros::Display)] @@ -491,6 +492,7 @@ impl From for ApiError { DatabaseError::Logical(reason) | DatabaseError::Migration(reason) => { ApiError::InternalServerError(anyhow::anyhow!(reason)) } + DatabaseError::Cas(reason) => ApiError::Conflict(reason), } } } diff --git a/storage_controller/src/service/safekeeper_reconciler.rs b/storage_controller/src/service/safekeeper_reconciler.rs index a3c5082be6..b67a679fad 100644 --- a/storage_controller/src/service/safekeeper_reconciler.rs +++ b/storage_controller/src/service/safekeeper_reconciler.rs @@ -145,7 +145,7 @@ pub(crate) async fn load_schedule_requests( } let Some(sk) = safekeepers.get(&other_node_id) else { tracing::warn!( - "couldnt find safekeeper with pending op id {other_node_id}, not pulling from it" + "couldn't find safekeeper with pending op id {other_node_id}, not pulling from it" ); return None; }; diff --git a/storage_controller/src/service/safekeeper_service.rs b/storage_controller/src/service/safekeeper_service.rs index 92d15f3fca..fc33a24198 100644 --- a/storage_controller/src/service/safekeeper_service.rs +++ b/storage_controller/src/service/safekeeper_service.rs @@ -5,20 +5,29 @@ use std::time::Duration; use super::safekeeper_reconciler::ScheduleRequest; use crate::heartbeater::SafekeeperState; +use crate::id_lock_map::trace_shared_lock; use crate::metrics; use crate::persistence::{ DatabaseError, SafekeeperTimelineOpKind, TimelinePendingOpPersistence, TimelinePersistence, }; use crate::safekeeper::Safekeeper; +use crate::safekeeper_client::SafekeeperClient; +use crate::service::TenantOperations; use crate::timeline_import::TimelineImportFinalizeError; use anyhow::Context; use http_utils::error::ApiError; use pageserver_api::controller_api::{ SafekeeperDescribeResponse, SkSchedulingPolicy, TimelineImportRequest, + TimelineSafekeeperMigrateRequest, }; use pageserver_api::models::{SafekeeperInfo, SafekeepersInfo, TimelineInfo}; use safekeeper_api::PgVersionId; -use safekeeper_api::membership::{MemberSet, SafekeeperGeneration, SafekeeperId}; +use safekeeper_api::membership::{self, MemberSet, SafekeeperGeneration}; +use safekeeper_api::models::{ + PullTimelineRequest, TimelineMembershipSwitchRequest, TimelineMembershipSwitchResponse, +}; +use safekeeper_api::{INITIAL_TERM, Term}; +use safekeeper_client::mgmt_api; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; use utils::id::{NodeId, TenantId, TimelineId}; @@ -35,6 +44,33 @@ pub struct TimelineLocateResponse { } impl Service { + fn make_member_set(safekeepers: &[Safekeeper]) -> Result { + let members = safekeepers + .iter() + .map(|sk| sk.get_safekeeper_id()) + .collect::>(); + + MemberSet::new(members).map_err(ApiError::InternalServerError) + } + + fn get_safekeepers(&self, ids: &[i64]) -> Result, ApiError> { + let safekeepers = { + let locked = self.inner.read().unwrap(); + locked.safekeepers.clone() + }; + + ids.iter() + .map(|&id| { + let node_id = NodeId(id as u64); + safekeepers.get(&node_id).cloned().ok_or_else(|| { + ApiError::InternalServerError(anyhow::anyhow!( + "safekeeper {node_id} is not registered" + )) + }) + }) + .collect::, _>>() + } + /// Timeline creation on safekeepers /// /// Returns `Ok(left)` if the timeline has been created on a quorum of safekeepers, @@ -47,35 +83,9 @@ impl Service { pg_version: PgVersionId, timeline_persistence: &TimelinePersistence, ) -> Result, ApiError> { - // If quorum is reached, return if we are outside of a specified timeout - let jwt = self - .config - .safekeeper_jwt_token - .clone() - .map(SecretString::from); - let mut joinset = JoinSet::new(); + let safekeepers = self.get_safekeepers(&timeline_persistence.sk_set)?; - // Prepare membership::Configuration from choosen safekeepers. - let safekeepers = { - let locked = self.inner.read().unwrap(); - locked.safekeepers.clone() - }; - - let mut members = Vec::new(); - for sk_id in timeline_persistence.sk_set.iter() { - let sk_id = NodeId(*sk_id as u64); - let Some(safekeeper) = safekeepers.get(&sk_id) else { - return Err(ApiError::InternalServerError(anyhow::anyhow!( - "couldn't find entry for safekeeper with id {sk_id}" - )))?; - }; - members.push(SafekeeperId { - id: sk_id, - host: safekeeper.skp.host.clone(), - pg_port: safekeeper.skp.port as u16, - }); - } - let mset = MemberSet::new(members).map_err(ApiError::InternalServerError)?; + let mset = Self::make_member_set(&safekeepers)?; let mconf = safekeeper_api::membership::Configuration::new(mset); let req = safekeeper_api::models::TimelineCreateRequest { @@ -88,79 +98,150 @@ impl Service { timeline_id, wal_seg_size: None, }; + const SK_CREATE_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30); - for sk in timeline_persistence.sk_set.iter() { - let sk_id = NodeId(*sk as u64); - let safekeepers = safekeepers.clone(); + + let results = self + .tenant_timeline_safekeeper_op_quorum( + &safekeepers, + move |client| { + let req = req.clone(); + async move { client.create_timeline(&req).await } + }, + SK_CREATE_TIMELINE_RECONCILE_TIMEOUT, + ) + .await?; + + Ok(results + .into_iter() + .enumerate() + .filter_map(|(idx, res)| { + if res.is_ok() { + None // Success, don't return this safekeeper + } else { + Some(safekeepers[idx].get_id()) // Failure, return this safekeeper + } + }) + .collect::>()) + } + + /// Perform an operation on a list of safekeepers in parallel with retries. + /// + /// Return the results of the operation on each safekeeper in the input order. + async fn tenant_timeline_safekeeper_op( + &self, + safekeepers: &[Safekeeper], + op: O, + timeout: Duration, + ) -> Result>, ApiError> + where + O: FnMut(SafekeeperClient) -> F + Send + 'static, + O: Clone, + F: std::future::Future> + Send + 'static, + T: Sync + Send + 'static, + { + let jwt = self + .config + .safekeeper_jwt_token + .clone() + .map(SecretString::from); + let mut joinset = JoinSet::new(); + + for (idx, sk) in safekeepers.iter().enumerate() { + let sk = sk.clone(); let http_client = self.http_client.clone(); let jwt = jwt.clone(); - let req = req.clone(); + let op = op.clone(); joinset.spawn(async move { - // Unwrap is fine as we already would have returned error above - let sk_p = safekeepers.get(&sk_id).unwrap(); - let res = sk_p + let res = sk .with_client_retries( - |client| { - let req = req.clone(); - async move { client.create_timeline(&req).await } - }, + op, &http_client, &jwt, 3, 3, - SK_CREATE_TIMELINE_RECONCILE_TIMEOUT, + // TODO(diko): This is a wrong timeout. + // It should be scaled to the retry count. + timeout, &CancellationToken::new(), ) .await; - (sk_id, sk_p.skp.host.clone(), res) + (idx, res) }); } + + // Initialize results with timeout errors in case we never get a response. + let mut results: Vec> = safekeepers + .iter() + .map(|_| { + Err(mgmt_api::Error::Timeout( + "safekeeper operation timed out".to_string(), + )) + }) + .collect(); + // After we have built the joinset, we now wait for the tasks to complete, // but with a specified timeout to make sure we return swiftly, either with // a failure or success. - let reconcile_deadline = tokio::time::Instant::now() + SK_CREATE_TIMELINE_RECONCILE_TIMEOUT; + let reconcile_deadline = tokio::time::Instant::now() + timeout; // Wait until all tasks finish or timeout is hit, whichever occurs // first. - let mut reconcile_results = Vec::new(); + let mut result_count = 0; loop { if let Ok(res) = tokio::time::timeout_at(reconcile_deadline, joinset.join_next()).await { let Some(res) = res else { break }; match res { - Ok(res) => { + Ok((idx, res)) => { + let sk = &safekeepers[idx]; tracing::info!( "response from safekeeper id:{} at {}: {:?}", - res.0, - res.1, - res.2 + sk.get_id(), + sk.skp.host, + // Only print errors, as there is no Debug trait for T. + res.as_ref().map(|_| ()), ); - reconcile_results.push(res); + results[idx] = res; + result_count += 1; } Err(join_err) => { tracing::info!("join_err for task in joinset: {join_err}"); } } } else { - tracing::info!( - "timeout for creation call after {} responses", - reconcile_results.len() - ); + tracing::info!("timeout for operation call after {result_count} responses",); break; } } - // Now check now if quorum was reached in reconcile_results. - let total_result_count = reconcile_results.len(); - let remaining = reconcile_results - .into_iter() - .filter_map(|res| res.2.is_err().then_some(res.0)) - .collect::>(); - tracing::info!( - "Got {} non-successful responses from initial creation request of total {total_result_count} responses", - remaining.len() - ); - let target_sk_count = timeline_persistence.sk_set.len(); + Ok(results) + } + + /// Perform an operation on a list of safekeepers in parallel with retries, + /// and validates that we reach a quorum of successful responses. + /// + /// Return the results of the operation on each safekeeper in the input order. + /// It's guaranteed that at least a quorum of the responses are successful. + async fn tenant_timeline_safekeeper_op_quorum( + &self, + safekeepers: &[Safekeeper], + op: O, + timeout: Duration, + ) -> Result>, ApiError> + where + O: FnMut(SafekeeperClient) -> F, + O: Clone + Send + 'static, + F: std::future::Future> + Send + 'static, + T: Sync + Send + 'static, + { + let results = self + .tenant_timeline_safekeeper_op(safekeepers, op, timeout) + .await?; + + // Now check if quorum was reached in results. + + let target_sk_count = safekeepers.len(); let quorum_size = match target_sk_count { 0 => { return Err(ApiError::InternalServerError(anyhow::anyhow!( @@ -179,7 +260,7 @@ impl Service { // in order to schedule work to them tracing::warn!( "couldn't find at least 3 safekeepers for timeline, found: {:?}", - timeline_persistence.sk_set + target_sk_count ); return Err(ApiError::InternalServerError(anyhow::anyhow!( "couldn't find at least 3 safekeepers to put timeline to" @@ -188,7 +269,7 @@ impl Service { } _ => target_sk_count / 2 + 1, }; - let success_count = target_sk_count - remaining.len(); + let success_count = results.iter().filter(|res| res.is_ok()).count(); if success_count < quorum_size { // Failure return Err(ApiError::InternalServerError(anyhow::anyhow!( @@ -196,7 +277,7 @@ impl Service { ))); } - Ok(remaining) + Ok(results) } /// Create timeline in controller database and on safekeepers. @@ -797,4 +878,435 @@ impl Service { } Ok(()) } + + /// Call `switch_timeline_membership` on all safekeepers with retries + /// till the quorum of successful responses is reached. + /// + /// If min_position is not None, validates that majority of safekeepers + /// reached at least min_position. + /// + /// Return responses from safekeepers in the input order. + async fn tenant_timeline_set_membership_quorum( + self: &Arc, + tenant_id: TenantId, + timeline_id: TimelineId, + safekeepers: &[Safekeeper], + config: &membership::Configuration, + min_position: Option<(Term, Lsn)>, + ) -> Result>, ApiError> { + let req = TimelineMembershipSwitchRequest { + mconf: config.clone(), + }; + + const SK_SET_MEM_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30); + + let results = self + .tenant_timeline_safekeeper_op_quorum( + safekeepers, + move |client| { + let req = req.clone(); + async move { + let mut res = client + .switch_timeline_membership(tenant_id, timeline_id, &req) + .await; + + // If min_position is not reached, map the response to an error, + // so it isn't counted toward the quorum. + if let Some(min_position) = min_position { + if let Ok(ok_res) = &res { + if (ok_res.term, ok_res.flush_lsn) < min_position { + // Use Error::Timeout to make this error retriable. + res = Err(mgmt_api::Error::Timeout( + format!( + "safekeeper {} returned position {:?} which is less than minimum required position {:?}", + client.node_id_label(), + (ok_res.term, ok_res.flush_lsn), + min_position + ) + )); + } + } + } + + res + } + }, + SK_SET_MEM_TIMELINE_RECONCILE_TIMEOUT, + ) + .await?; + + for res in results.iter().flatten() { + if res.current_conf.generation > config.generation { + // Antoher switch_membership raced us. + return Err(ApiError::Conflict(format!( + "received configuration with generation {} from safekeeper, but expected {}", + res.current_conf.generation, config.generation + ))); + } else if res.current_conf.generation < config.generation { + // Note: should never happen. + // If we get a response, it should be at least the sent generation. + tracing::error!( + "received configuration with generation {} from safekeeper, but expected {}", + res.current_conf.generation, + config.generation + ); + return Err(ApiError::InternalServerError(anyhow::anyhow!( + "received configuration with generation {} from safekeeper, but expected {}", + res.current_conf.generation, + config.generation + ))); + } + } + + Ok(results) + } + + /// Pull timeline to to_safekeepers from from_safekeepers with retries. + /// + /// Returns Ok(()) only if all the pull_timeline requests were successful. + async fn tenant_timeline_pull_from_peers( + self: &Arc, + tenant_id: TenantId, + timeline_id: TimelineId, + to_safekeepers: &[Safekeeper], + from_safekeepers: &[Safekeeper], + ) -> Result<(), ApiError> { + let http_hosts = from_safekeepers + .iter() + .map(|sk| sk.base_url()) + .collect::>(); + + tracing::info!( + "pulling timeline to {:?} from {:?}", + to_safekeepers + .iter() + .map(|sk| sk.get_id()) + .collect::>(), + from_safekeepers + .iter() + .map(|sk| sk.get_id()) + .collect::>() + ); + + // TODO(diko): need to pass mconf/generation with the request + // to properly handle tombstones. Ignore tombstones for now. + // Worst case: we leave a timeline on a safekeeper which is not in the current set. + let req = PullTimelineRequest { + tenant_id, + timeline_id, + http_hosts, + ignore_tombstone: Some(true), + }; + + const SK_PULL_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30); + + let responses = self + .tenant_timeline_safekeeper_op( + to_safekeepers, + move |client| { + let req = req.clone(); + async move { client.pull_timeline(&req).await } + }, + SK_PULL_TIMELINE_RECONCILE_TIMEOUT, + ) + .await?; + + if let Some((idx, err)) = responses + .iter() + .enumerate() + .find_map(|(idx, res)| Some((idx, res.as_ref().err()?))) + { + let sk_id = to_safekeepers[idx].get_id(); + return Err(ApiError::InternalServerError(anyhow::anyhow!( + "pull_timeline to {sk_id} failed: {err}", + ))); + } + + Ok(()) + } + + /// Exclude a timeline from safekeepers in parallel with retries. + /// If an exclude request is unsuccessful, it will be added to + /// the reconciler, and after that the function will succeed. + async fn tenant_timeline_safekeeper_exclude( + self: &Arc, + tenant_id: TenantId, + timeline_id: TimelineId, + safekeepers: &[Safekeeper], + config: &membership::Configuration, + ) -> Result<(), ApiError> { + let req = TimelineMembershipSwitchRequest { + mconf: config.clone(), + }; + + const SK_EXCLUDE_TIMELINE_TIMEOUT: Duration = Duration::from_secs(30); + + let results = self + .tenant_timeline_safekeeper_op( + safekeepers, + move |client| { + let req = req.clone(); + async move { client.exclude_timeline(tenant_id, timeline_id, &req).await } + }, + SK_EXCLUDE_TIMELINE_TIMEOUT, + ) + .await?; + + let mut reconcile_requests = Vec::new(); + + for (idx, res) in results.iter().enumerate() { + if res.is_err() { + let sk_id = safekeepers[idx].skp.id; + let pending_op = TimelinePendingOpPersistence { + tenant_id: tenant_id.to_string(), + timeline_id: timeline_id.to_string(), + generation: config.generation.into_inner() as i32, + op_kind: SafekeeperTimelineOpKind::Exclude, + sk_id, + }; + tracing::info!("writing pending exclude op for sk id {sk_id}"); + self.persistence.insert_pending_op(pending_op).await?; + + let req = ScheduleRequest { + safekeeper: Box::new(safekeepers[idx].clone()), + host_list: Vec::new(), + tenant_id, + timeline_id: Some(timeline_id), + generation: config.generation.into_inner(), + kind: SafekeeperTimelineOpKind::Exclude, + }; + reconcile_requests.push(req); + } + } + + if !reconcile_requests.is_empty() { + let locked = self.inner.read().unwrap(); + for req in reconcile_requests { + locked.safekeeper_reconcilers.schedule_request(req); + } + } + + Ok(()) + } + + /// Migrate timeline safekeeper set to a new set. + /// + /// This function implements an algorithm from RFC-035. + /// + pub(crate) async fn tenant_timeline_safekeeper_migrate( + self: &Arc, + tenant_id: TenantId, + timeline_id: TimelineId, + req: TimelineSafekeeperMigrateRequest, + ) -> Result<(), ApiError> { + let all_safekeepers = self.inner.read().unwrap().safekeepers.clone(); + + let new_sk_set = req.new_sk_set; + + for sk_id in new_sk_set.iter() { + if !all_safekeepers.contains_key(sk_id) { + return Err(ApiError::BadRequest(anyhow::anyhow!( + "safekeeper {sk_id} does not exist" + ))); + } + } + + // TODO(diko): per-tenant lock is too wide. Consider introducing per-timeline locks. + let _tenant_lock = trace_shared_lock( + &self.tenant_op_locks, + tenant_id, + TenantOperations::TimelineSafekeeperMigrate, + ) + .await; + + // 1. Fetch current timeline configuration from the configuration storage. + + let timeline = self + .persistence + .get_timeline(tenant_id, timeline_id) + .await?; + + let Some(timeline) = timeline else { + return Err(ApiError::NotFound( + anyhow::anyhow!( + "timeline {tenant_id}/{timeline_id} doesn't exist in timelines table" + ) + .into(), + )); + }; + + let cur_sk_set = timeline + .sk_set + .iter() + .map(|&id| NodeId(id as u64)) + .collect::>(); + + tracing::info!( + ?cur_sk_set, + ?new_sk_set, + "Migrating timeline to new safekeeper set", + ); + + let mut generation = SafekeeperGeneration::new(timeline.generation as u32); + + if let Some(ref presistent_new_sk_set) = timeline.new_sk_set { + // 2. If it is already joint one and new_set is different from desired_set refuse to change. + if presistent_new_sk_set + .iter() + .map(|&id| NodeId(id as u64)) + .ne(new_sk_set.iter().cloned()) + { + tracing::info!( + ?presistent_new_sk_set, + ?new_sk_set, + "different new safekeeper set is already set in the database", + ); + return Err(ApiError::Conflict(format!( + "the timeline is already migrating to a different safekeeper set: {presistent_new_sk_set:?}" + ))); + } + // It it is the same new_sk_set, we can continue the migration (retry). + } else { + // 3. No active migration yet. + // Increment current generation and put desired_set to new_sk_set. + generation = generation.next(); + + self.persistence + .update_timeline_membership( + tenant_id, + timeline_id, + generation, + &cur_sk_set, + Some(&new_sk_set), + ) + .await?; + } + + let cur_safekeepers = self.get_safekeepers(&timeline.sk_set)?; + let cur_sk_member_set = Self::make_member_set(&cur_safekeepers)?; + + let new_sk_set_i64 = new_sk_set.iter().map(|id| id.0 as i64).collect::>(); + let new_safekeepers = self.get_safekeepers(&new_sk_set_i64)?; + let new_sk_member_set = Self::make_member_set(&new_safekeepers)?; + + let joint_config = membership::Configuration { + generation, + members: cur_sk_member_set, + new_members: Some(new_sk_member_set.clone()), + }; + + // 4. Call PUT configuration on safekeepers from the current set, + // delivering them joint_conf. + + // TODO(diko): need to notify cplane with an updated set of safekeepers. + + let results = self + .tenant_timeline_set_membership_quorum( + tenant_id, + timeline_id, + &cur_safekeepers, + &joint_config, + None, // no min position + ) + .await?; + + let mut sync_position = (INITIAL_TERM, Lsn::INVALID); + for res in results.into_iter().flatten() { + let sk_position = (res.term, res.flush_lsn); + if sync_position < sk_position { + sync_position = sk_position; + } + } + + tracing::info!( + %generation, + ?sync_position, + "safekeepers set membership updated", + ); + + // 5. Initialize timeline on safekeeper(s) from new_sk_set where it doesn't exist yet + // by doing pull_timeline from the majority of the current set. + + // Filter out safekeepers which are already in the current set. + let from_ids: HashSet = cur_safekeepers.iter().map(|sk| sk.get_id()).collect(); + let pull_to_safekeepers = new_safekeepers + .iter() + .filter(|sk| !from_ids.contains(&sk.get_id())) + .cloned() + .collect::>(); + + self.tenant_timeline_pull_from_peers( + tenant_id, + timeline_id, + &pull_to_safekeepers, + &cur_safekeepers, + ) + .await?; + + // 6. Call POST bump_term(sync_term) on safekeepers from the new set. Success on majority is enough. + + // TODO(diko): do we need to bump timeline term? + + // 7. Repeatedly call PUT configuration on safekeepers from the new set, + // delivering them joint_conf and collecting their positions. + + tracing::info!(?sync_position, "waiting for safekeepers to sync position"); + + self.tenant_timeline_set_membership_quorum( + tenant_id, + timeline_id, + &new_safekeepers, + &joint_config, + Some(sync_position), + ) + .await?; + + // 8. Create new_conf: Configuration incrementing joint_conf generation and + // having new safekeeper set as sk_set and None new_sk_set. + + let generation = generation.next(); + + let new_conf = membership::Configuration { + generation, + members: new_sk_member_set, + new_members: None, + }; + + self.persistence + .update_timeline_membership(tenant_id, timeline_id, generation, &new_sk_set, None) + .await?; + + // TODO(diko): at this point we have already updated the timeline in the database, + // but we still need to notify safekeepers and cplane about the new configuration, + // and put delition of the timeline from the old safekeepers into the reconciler. + // Ideally it should be done atomically, but now it's not. + // Worst case: the timeline is not deleted from old safekeepers, + // the compute may require both quorums till the migration is retried and completed. + + self.tenant_timeline_set_membership_quorum( + tenant_id, + timeline_id, + &new_safekeepers, + &new_conf, + None, // no min position + ) + .await?; + + let new_ids: HashSet = new_safekeepers.iter().map(|sk| sk.get_id()).collect(); + let exclude_safekeepers = cur_safekeepers + .into_iter() + .filter(|sk| !new_ids.contains(&sk.get_id())) + .collect::>(); + self.tenant_timeline_safekeeper_exclude( + tenant_id, + timeline_id, + &exclude_safekeepers, + &new_conf, + ) + .await?; + + // TODO(diko): need to notify cplane with an updated set of safekeepers. + + Ok(()) + } } diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 48c6597c7c..508e3d8dd2 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1215,6 +1215,13 @@ class NeonEnv: storage_controller_config = storage_controller_config or {} storage_controller_config["use_https_safekeeper_api"] = True + # TODO(diko): uncomment when timeline_safekeeper_count option is in the release branch, + # so the compat tests will not fail bacause of it presence. + # if config.num_safekeepers < 3: + # storage_controller_config = storage_controller_config or {} + # if "timeline_safekeeper_count" not in storage_controller_config: + # storage_controller_config["timeline_safekeeper_count"] = config.num_safekeepers + if storage_controller_config is not None: cfg["storage_controller"] = storage_controller_config @@ -2226,6 +2233,21 @@ class NeonStorageController(MetricsGetter, LogUtils): response.raise_for_status() log.info(f"timeline_create success: {response.json()}") + def migrate_safekeepers( + self, + tenant_id: TenantId, + timeline_id: TimelineId, + new_sk_set: list[int], + ): + response = self.request( + "POST", + f"{self.api}/v1/tenant/{tenant_id}/timeline/{timeline_id}/safekeeper_migrate", + json={"new_sk_set": new_sk_set}, + headers=self.headers(TokenScope.PAGE_SERVER_API), + ) + response.raise_for_status() + log.info(f"migrate_safekeepers success: {response.json()}") + def locate(self, tenant_id: TenantId) -> list[dict[str, Any]]: """ :return: list of {"shard_id": "", "node_id": int, "listen_pg_addr": str, "listen_pg_port": int, "listen_http_addr": str, "listen_http_port": int} diff --git a/test_runner/regress/test_safekeeper_migration.py b/test_runner/regress/test_safekeeper_migration.py new file mode 100644 index 0000000000..f67b6afc95 --- /dev/null +++ b/test_runner/regress/test_safekeeper_migration.py @@ -0,0 +1,64 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from fixtures.neon_fixtures import NeonEnvBuilder + + +def test_safekeeper_migration_simple(neon_env_builder: NeonEnvBuilder): + """ + Simple safekeeper migration test. + Creates 3 safekeepers. The timeline is configuret to use only one safekeeper. + 1. Go through all safekeepers, migrate the timeline to it. + 2. Stop the other safekeepers. Validate that the insert is successful. + 3. Start the other safekeepers again and go to the next safekeeper. + 4. Validate that the table contains all inserted values. + """ + neon_env_builder.num_safekeepers = 3 + neon_env_builder.storage_controller_config = { + "timelines_onto_safekeepers": True, + "timeline_safekeeper_count": 1, + } + env = neon_env_builder.init_start() + # TODO(diko): pageserver spams with various errors during safekeeper migration. + # Fix the code so it handles the migration better. + env.pageserver.allowed_errors.extend( + [ + ".*Timeline .* was cancelled and cannot be used anymore.*", + ".*Timeline .* has been deleted.*", + ".*wal receiver task finished with an error.*", + ] + ) + + ep = env.endpoints.create("main", tenant_id=env.initial_tenant) + # We specify all safekeepers, so compute will connect to all of them. + # Only those from the current membership configuration will be used. + # TODO(diko): set only current safekeepers when cplane notify is implemented. + ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3]) + ep.safe_psql("CREATE EXTENSION neon_test_utils;") + ep.safe_psql("CREATE TABLE t(a int)") + + for active_sk in range(1, 4): + env.storage_controller.migrate_safekeepers( + env.initial_tenant, env.initial_timeline, [active_sk] + ) + + other_sks = [sk for sk in range(1, 4) if sk != active_sk] + + for sk in other_sks: + env.safekeepers[sk - 1].stop() + + ep.safe_psql(f"INSERT INTO t VALUES ({active_sk})") + + for sk in other_sks: + env.safekeepers[sk - 1].start() + + ep.clear_buffers() + + assert ep.safe_psql("SELECT * FROM t") == [(i,) for i in range(1, 4)] + + ep.stop() + ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3]) + + assert ep.safe_psql("SELECT * FROM t") == [(i,) for i in range(1, 4)]