Draft for a reconciler

This commit is contained in:
Arpad Müller
2025-01-17 19:36:46 +01:00
parent f0fe5fae6b
commit b5c29806f0
4 changed files with 233 additions and 8 deletions

View File

@@ -10,6 +10,7 @@ use storage_controller::http::make_router;
use storage_controller::metrics::preinitialize_metrics;
use storage_controller::persistence::Persistence;
use storage_controller::service::chaos_injector::ChaosInjector;
use storage_controller::service::safekeeper_reconciler::SafekeeperReconciler;
use storage_controller::service::{
Config, Service, HEARTBEAT_INTERVAL_DEFAULT, LONG_RECONCILE_THRESHOLD_DEFAULT,
MAX_OFFLINE_INTERVAL_DEFAULT, MAX_WARMING_UP_INTERVAL_DEFAULT, RECONCILER_CONCURRENCY_DEFAULT,
@@ -351,6 +352,24 @@ async fn async_main() -> anyhow::Result<()> {
)
});
const SAFEKEEPER_RECONCILER_INTERVAL: Duration = Duration::from_secs(120);
let safekeeper_reconciler_task = {
let service = service.clone();
let cancel = CancellationToken::new();
let cancel_bg = cancel.clone();
(
tokio::task::spawn(
async move {
let reconciler =
SafekeeperReconciler::new(service, SAFEKEEPER_RECONCILER_INTERVAL);
reconciler.run(cancel_bg).await
}
.instrument(tracing::info_span!("safekeeper_reconciler")),
),
cancel,
)
};
// Wait until we receive a signal
let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())?;
let mut sigquit = tokio::signal::unix::signal(SignalKind::quit())?;
@@ -384,6 +403,11 @@ async fn async_main() -> anyhow::Result<()> {
chaos_cancel.cancel();
chaos_jh.await.ok();
}
// Do the same for the safekeeper reconciler
{
safekeeper_reconciler_task.1.cancel();
_ = safekeeper_reconciler_task.0.await;
}
service.shutdown().await;
tracing::info!("Service shutdown complete");

View File

@@ -109,6 +109,8 @@ pub(crate) enum DatabaseOperation {
ListMetadataHealthUnhealthy,
ListMetadataHealthOutdated,
ListSafekeepers,
ListTimelines,
LoadTimeline,
InsertTimeline,
GetLeader,
UpdateLeader,
@@ -1250,6 +1252,65 @@ impl Persistence {
)
.await
}
pub(crate) async fn get_timeline(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> DatabaseResult<TimelinePersistence> {
use crate::schema::timelines;
let mut timelines: Vec<TimelineFromDb> = self
.with_measured_conn(
DatabaseOperation::LoadTimeline,
move |conn| -> DatabaseResult<_> {
Ok(timelines::table
.filter(timelines::tenant_id.eq(tenant_id.to_string()))
.filter(timelines::timeline_id.eq(timeline_id.to_string()))
.load::<TimelineFromDb>(conn)?)
},
)
.await?;
if timelines.len() != 1 {
return Err(DatabaseError::Logical(format!(
"incorrect number of returned timelines: ({})",
timelines.len()
)));
}
let tl = timelines.pop().unwrap().to_persistence();
tracing::info!("get_timeline: loaded timeline");
Ok(tl)
}
pub(crate) async fn timelines_to_be_reconciled(
&self,
) -> DatabaseResult<Vec<TimelinePersistence>> {
use crate::schema::timelines;
let timelines: Vec<TimelineFromDb> = self
.with_measured_conn(
DatabaseOperation::ListTimelines,
move |conn| -> DatabaseResult<_> {
Ok(timelines::table
.filter(
timelines::status
.eq(String::from(TimelineStatus::Creating))
.or(timelines::status.eq(String::from(TimelineStatus::Deleting))),
)
.load::<TimelineFromDb>(conn)?)
},
)
.await?;
let timelines = timelines
.into_iter()
.map(|tl| tl.to_persistence())
.collect::<Vec<_>>();
tracing::info!("list_timelines: loaded {} timelines", timelines.len());
Ok(timelines)
}
}
/// Parts of [`crate::tenant_shard::TenantShard`] that are stored durably
@@ -1481,7 +1542,7 @@ struct InsertUpdateSafekeeper<'a> {
scheduling_policy: Option<&'a str>,
}
#[derive(Insertable, AsChangeset)]
#[derive(Insertable, AsChangeset, Queryable, Selectable)]
#[diesel(table_name = crate::schema::timelines)]
pub(crate) struct TimelinePersistence {
pub(crate) tenant_id: String,
@@ -1493,6 +1554,32 @@ pub(crate) struct TimelinePersistence {
pub(crate) status: String,
}
#[derive(Queryable, Selectable)]
#[diesel(table_name = crate::schema::timelines)]
pub(crate) struct TimelineFromDb {
pub(crate) tenant_id: String,
pub(crate) timeline_id: String,
pub(crate) generation: i32,
pub(crate) sk_set: Vec<Option<i64>>,
pub(crate) new_sk_set: Vec<Option<i64>>,
pub(crate) cplane_notified_generation: i32,
pub(crate) status: String,
}
impl TimelineFromDb {
fn to_persistence(self) -> TimelinePersistence {
TimelinePersistence {
tenant_id: self.tenant_id,
timeline_id: self.timeline_id,
generation: self.generation,
sk_set: self.sk_set.into_iter().filter_map(|v| v).collect(),
new_sk_set: self.new_sk_set.into_iter().filter_map(|v| v).collect(),
cplane_notified_generation: self.cplane_notified_generation,
status: self.status,
}
}
}
#[derive(PartialEq, Eq, Copy, Clone, Debug)]
pub(crate) enum TimelineStatus {
Creating,

View File

@@ -1,5 +1,6 @@
pub mod chaos_injector;
mod context_iterator;
pub mod safekeeper_reconciler;
use hyper::Uri;
use safekeeper_api::membership::{MemberSet, SafekeeperId};
@@ -80,6 +81,7 @@ use pageserver_client::{mgmt_api, BlockUnblock};
use tokio::{sync::mpsc::error::TrySendError, task::JoinSet};
use tokio_util::sync::CancellationToken;
use utils::{
backoff,
completion::Barrier,
failpoint_support,
generation::Generation,
@@ -154,6 +156,7 @@ enum TenantOperations {
SecondaryDownload,
TimelineCreate,
TimelineDelete,
TimelineReconcile,
AttachHook,
TimelineArchivalConfig,
TimelineDetachAncestor,
@@ -3474,14 +3477,31 @@ impl Service {
let base_url = sk_p.base_url();
let jwt = jwt.clone();
let req = req.clone();
let cancel = self.cancel.clone();
joinset.spawn(async move {
let client = SafekeeperClient::new(sk_clone, base_url, jwt);
// TODO: logging on error, retries
client.create_timeline(req).await.map_err(|e| {
ApiError::InternalServerError(
anyhow::Error::new(e).context("error creating timeline on safekeeper"),
)
})
let req = req;
let retry_result = backoff::retry(
|| client.create_timeline(&req),
|_e| {
// TODO find right criteria here for deciding on retries
false
},
3,
5,
"create timeline on safekeeper",
&cancel,
)
.await;
if let Some(res) = retry_result {
res.map_err(|e| {
ApiError::InternalServerError(
anyhow::Error::new(e).context("error creating timeline on safekeeper"),
)
})
} else {
Err(ApiError::Cancelled)
}
});
}
// After we have built the joinset, we now wait for the tasks to complete,
@@ -3525,7 +3545,7 @@ impl Service {
reconcile_results.push(res);
} else {
// No error if cancelled or timed out: we already have feedback from a quorum of safekeepers
// TODO: maybe log?
tracing::info!("timeout for third reconciliation");
}
// check now if quorum was reached in reconcile_results
let successful = reconcile_results

View File

@@ -0,0 +1,94 @@
use std::{str::FromStr, sync::Arc, time::Duration};
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
use utils::{
failpoint_support,
id::{TenantId, TimelineId},
};
use crate::{
id_lock_map::trace_shared_lock,
service::{TenantOperations, TimelineStatus},
};
use super::{Service, TimelinePersistence};
pub struct SafekeeperReconciler {
service: Arc<Service>,
duration: Duration,
}
impl SafekeeperReconciler {
pub fn new(service: Arc<Service>, duration: Duration) -> Self {
SafekeeperReconciler { service, duration }
}
pub async fn run(&self, cancel: CancellationToken) {
while !cancel.is_cancelled() {
tokio::select! {
_ = tokio::time::sleep(self.duration) => (),
_ = cancel.cancelled() => break,
}
match self.reconcile_iteration(&cancel).await {
Ok(()) => (),
Err(e) => {
tracing::warn!("Error during safekeeper reconciliation: {e:?}");
}
}
}
}
async fn reconcile_iteration(&self, cancel: &CancellationToken) -> Result<(), anyhow::Error> {
let work_list = self
.service
.persistence
.timelines_to_be_reconciled()
.await?;
for tl in work_list {
let reconcile_fut = self.reconcile_timeline(&tl).instrument(tracing::info_span!(
"safekeeper_reconcile_timeline",
timeline_id = tl.timeline_id,
tenant_id = tl.tenant_id
));
tokio::select! {
r = reconcile_fut => r?,
_ = cancel.cancelled() => break,
}
}
Ok(())
}
async fn reconcile_timeline(&self, tl: &TimelinePersistence) -> Result<(), anyhow::Error> {
tracing::info!(
"Reconciling timeline on safekeepers {}/{}",
tl.tenant_id,
tl.timeline_id,
);
let tenant_id = TenantId::from_slice(tl.tenant_id.as_bytes())?;
let timeline_id = TimelineId::from_slice(tl.timeline_id.as_bytes())?;
let _tenant_lock = trace_shared_lock(
&self.service.tenant_op_locks,
tenant_id,
TenantOperations::TimelineReconcile,
)
.await;
failpoint_support::sleep_millis_async!("safekeeper-reconcile-timeline-shared-lock");
// Load the timeline again from the db: unless we hold the tenant lock, the timeline can change under our noses.
let tl = self
.service
.persistence
.get_timeline(tenant_id, timeline_id)
.await?;
let status = TimelineStatus::from_str(&tl.status)?;
match status {
TimelineStatus::Created | TimelineStatus::Deleted => return Ok(()),
TimelineStatus::Creating => {
todo!()
}
TimelineStatus::Deleting => {
todo!()
}
}
}
}