diff --git a/storage_controller/src/main.rs b/storage_controller/src/main.rs index 801409d612..1fe17ac9b7 100644 --- a/storage_controller/src/main.rs +++ b/storage_controller/src/main.rs @@ -10,6 +10,7 @@ use storage_controller::http::make_router; use storage_controller::metrics::preinitialize_metrics; use storage_controller::persistence::Persistence; use storage_controller::service::chaos_injector::ChaosInjector; +use storage_controller::service::safekeeper_reconciler::SafekeeperReconciler; use storage_controller::service::{ Config, Service, HEARTBEAT_INTERVAL_DEFAULT, LONG_RECONCILE_THRESHOLD_DEFAULT, MAX_OFFLINE_INTERVAL_DEFAULT, MAX_WARMING_UP_INTERVAL_DEFAULT, RECONCILER_CONCURRENCY_DEFAULT, @@ -351,6 +352,24 @@ async fn async_main() -> anyhow::Result<()> { ) }); + const SAFEKEEPER_RECONCILER_INTERVAL: Duration = Duration::from_secs(120); + let safekeeper_reconciler_task = { + let service = service.clone(); + let cancel = CancellationToken::new(); + let cancel_bg = cancel.clone(); + ( + tokio::task::spawn( + async move { + let reconciler = + SafekeeperReconciler::new(service, SAFEKEEPER_RECONCILER_INTERVAL); + reconciler.run(cancel_bg).await + } + .instrument(tracing::info_span!("safekeeper_reconciler")), + ), + cancel, + ) + }; + // Wait until we receive a signal let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())?; let mut sigquit = tokio::signal::unix::signal(SignalKind::quit())?; @@ -384,6 +403,11 @@ async fn async_main() -> anyhow::Result<()> { chaos_cancel.cancel(); chaos_jh.await.ok(); } + // Do the same for the safekeeper reconciler + { + safekeeper_reconciler_task.1.cancel(); + _ = safekeeper_reconciler_task.0.await; + } service.shutdown().await; tracing::info!("Service shutdown complete"); diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index 47f069bb59..ca5b1cff36 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -109,6 +109,8 @@ pub(crate) enum DatabaseOperation { ListMetadataHealthUnhealthy, ListMetadataHealthOutdated, ListSafekeepers, + ListTimelines, + LoadTimeline, InsertTimeline, GetLeader, UpdateLeader, @@ -1250,6 +1252,65 @@ impl Persistence { ) .await } + + pub(crate) async fn get_timeline( + &self, + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> DatabaseResult { + use crate::schema::timelines; + let mut timelines: Vec = self + .with_measured_conn( + DatabaseOperation::LoadTimeline, + move |conn| -> DatabaseResult<_> { + Ok(timelines::table + .filter(timelines::tenant_id.eq(tenant_id.to_string())) + .filter(timelines::timeline_id.eq(timeline_id.to_string())) + .load::(conn)?) + }, + ) + .await?; + if timelines.len() != 1 { + return Err(DatabaseError::Logical(format!( + "incorrect number of returned timelines: ({})", + timelines.len() + ))); + } + + let tl = timelines.pop().unwrap().to_persistence(); + + tracing::info!("get_timeline: loaded timeline"); + + Ok(tl) + } + + pub(crate) async fn timelines_to_be_reconciled( + &self, + ) -> DatabaseResult> { + use crate::schema::timelines; + let timelines: Vec = self + .with_measured_conn( + DatabaseOperation::ListTimelines, + move |conn| -> DatabaseResult<_> { + Ok(timelines::table + .filter( + timelines::status + .eq(String::from(TimelineStatus::Creating)) + .or(timelines::status.eq(String::from(TimelineStatus::Deleting))), + ) + .load::(conn)?) + }, + ) + .await?; + let timelines = timelines + .into_iter() + .map(|tl| tl.to_persistence()) + .collect::>(); + + tracing::info!("list_timelines: loaded {} timelines", timelines.len()); + + Ok(timelines) + } } /// Parts of [`crate::tenant_shard::TenantShard`] that are stored durably @@ -1481,7 +1542,7 @@ struct InsertUpdateSafekeeper<'a> { scheduling_policy: Option<&'a str>, } -#[derive(Insertable, AsChangeset)] +#[derive(Insertable, AsChangeset, Queryable, Selectable)] #[diesel(table_name = crate::schema::timelines)] pub(crate) struct TimelinePersistence { pub(crate) tenant_id: String, @@ -1493,6 +1554,32 @@ pub(crate) struct TimelinePersistence { pub(crate) status: String, } +#[derive(Queryable, Selectable)] +#[diesel(table_name = crate::schema::timelines)] +pub(crate) struct TimelineFromDb { + pub(crate) tenant_id: String, + pub(crate) timeline_id: String, + pub(crate) generation: i32, + pub(crate) sk_set: Vec>, + pub(crate) new_sk_set: Vec>, + pub(crate) cplane_notified_generation: i32, + pub(crate) status: String, +} + +impl TimelineFromDb { + fn to_persistence(self) -> TimelinePersistence { + TimelinePersistence { + tenant_id: self.tenant_id, + timeline_id: self.timeline_id, + generation: self.generation, + sk_set: self.sk_set.into_iter().filter_map(|v| v).collect(), + new_sk_set: self.new_sk_set.into_iter().filter_map(|v| v).collect(), + cplane_notified_generation: self.cplane_notified_generation, + status: self.status, + } + } +} + #[derive(PartialEq, Eq, Copy, Clone, Debug)] pub(crate) enum TimelineStatus { Creating, diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 1dabf2b273..07ed3caa74 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -1,5 +1,6 @@ pub mod chaos_injector; mod context_iterator; +pub mod safekeeper_reconciler; use hyper::Uri; use safekeeper_api::membership::{MemberSet, SafekeeperId}; @@ -80,6 +81,7 @@ use pageserver_client::{mgmt_api, BlockUnblock}; use tokio::{sync::mpsc::error::TrySendError, task::JoinSet}; use tokio_util::sync::CancellationToken; use utils::{ + backoff, completion::Barrier, failpoint_support, generation::Generation, @@ -154,6 +156,7 @@ enum TenantOperations { SecondaryDownload, TimelineCreate, TimelineDelete, + TimelineReconcile, AttachHook, TimelineArchivalConfig, TimelineDetachAncestor, @@ -3474,14 +3477,31 @@ impl Service { let base_url = sk_p.base_url(); let jwt = jwt.clone(); let req = req.clone(); + let cancel = self.cancel.clone(); joinset.spawn(async move { let client = SafekeeperClient::new(sk_clone, base_url, jwt); - // TODO: logging on error, retries - client.create_timeline(req).await.map_err(|e| { - ApiError::InternalServerError( - anyhow::Error::new(e).context("error creating timeline on safekeeper"), - ) - }) + let req = req; + let retry_result = backoff::retry( + || client.create_timeline(&req), + |_e| { + // TODO find right criteria here for deciding on retries + false + }, + 3, + 5, + "create timeline on safekeeper", + &cancel, + ) + .await; + if let Some(res) = retry_result { + res.map_err(|e| { + ApiError::InternalServerError( + anyhow::Error::new(e).context("error creating timeline on safekeeper"), + ) + }) + } else { + Err(ApiError::Cancelled) + } }); } // After we have built the joinset, we now wait for the tasks to complete, @@ -3525,7 +3545,7 @@ impl Service { reconcile_results.push(res); } else { // No error if cancelled or timed out: we already have feedback from a quorum of safekeepers - // TODO: maybe log? + tracing::info!("timeout for third reconciliation"); } // check now if quorum was reached in reconcile_results let successful = reconcile_results diff --git a/storage_controller/src/service/safekeeper_reconciler.rs b/storage_controller/src/service/safekeeper_reconciler.rs new file mode 100644 index 0000000000..b5e9008c2a --- /dev/null +++ b/storage_controller/src/service/safekeeper_reconciler.rs @@ -0,0 +1,94 @@ +use std::{str::FromStr, sync::Arc, time::Duration}; + +use tokio_util::sync::CancellationToken; +use tracing::Instrument; +use utils::{ + failpoint_support, + id::{TenantId, TimelineId}, +}; + +use crate::{ + id_lock_map::trace_shared_lock, + service::{TenantOperations, TimelineStatus}, +}; + +use super::{Service, TimelinePersistence}; + +pub struct SafekeeperReconciler { + service: Arc, + duration: Duration, +} + +impl SafekeeperReconciler { + pub fn new(service: Arc, duration: Duration) -> Self { + SafekeeperReconciler { service, duration } + } + pub async fn run(&self, cancel: CancellationToken) { + while !cancel.is_cancelled() { + tokio::select! { + _ = tokio::time::sleep(self.duration) => (), + _ = cancel.cancelled() => break, + } + match self.reconcile_iteration(&cancel).await { + Ok(()) => (), + Err(e) => { + tracing::warn!("Error during safekeeper reconciliation: {e:?}"); + } + } + } + } + async fn reconcile_iteration(&self, cancel: &CancellationToken) -> Result<(), anyhow::Error> { + let work_list = self + .service + .persistence + .timelines_to_be_reconciled() + .await?; + for tl in work_list { + let reconcile_fut = self.reconcile_timeline(&tl).instrument(tracing::info_span!( + "safekeeper_reconcile_timeline", + timeline_id = tl.timeline_id, + tenant_id = tl.tenant_id + )); + + tokio::select! { + r = reconcile_fut => r?, + _ = cancel.cancelled() => break, + } + } + Ok(()) + } + async fn reconcile_timeline(&self, tl: &TimelinePersistence) -> Result<(), anyhow::Error> { + tracing::info!( + "Reconciling timeline on safekeepers {}/{}", + tl.tenant_id, + tl.timeline_id, + ); + let tenant_id = TenantId::from_slice(tl.tenant_id.as_bytes())?; + let timeline_id = TimelineId::from_slice(tl.timeline_id.as_bytes())?; + + let _tenant_lock = trace_shared_lock( + &self.service.tenant_op_locks, + tenant_id, + TenantOperations::TimelineReconcile, + ) + .await; + + failpoint_support::sleep_millis_async!("safekeeper-reconcile-timeline-shared-lock"); + // Load the timeline again from the db: unless we hold the tenant lock, the timeline can change under our noses. + let tl = self + .service + .persistence + .get_timeline(tenant_id, timeline_id) + .await?; + let status = TimelineStatus::from_str(&tl.status)?; + match status { + TimelineStatus::Created | TimelineStatus::Deleted => return Ok(()), + TimelineStatus::Creating => { + todo!() + } + TimelineStatus::Deleting => { + todo!() + } + } + } +}