mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 01:12:56 +00:00
Add support for timeline deletion
This commit is contained in:
@@ -1224,11 +1224,12 @@ impl Persistence {
|
||||
)
|
||||
.await
|
||||
}
|
||||
pub(crate) async fn update_timeline(
|
||||
pub(crate) async fn update_timeline_status(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
timeline_status: TimelineStatusKind,
|
||||
status_kind: TimelineStatusKind,
|
||||
status: String,
|
||||
) -> DatabaseResult<()> {
|
||||
use crate::schema::timelines;
|
||||
|
||||
@@ -1238,7 +1239,10 @@ impl Persistence {
|
||||
let inserted_updated = diesel::update(timelines::table)
|
||||
.filter(timelines::tenant_id.eq(tenant_id.to_string()))
|
||||
.filter(timelines::timeline_id.eq(timeline_id.to_string()))
|
||||
.set(timelines::status.eq(String::from(timeline_status)))
|
||||
.set((
|
||||
timelines::status_kind.eq(String::from(status_kind)),
|
||||
timelines::status.eq(status.clone()),
|
||||
))
|
||||
.execute(conn)?;
|
||||
|
||||
if inserted_updated != 1 {
|
||||
@@ -1297,7 +1301,8 @@ impl Persistence {
|
||||
.filter(
|
||||
timelines::status
|
||||
.eq(String::from(TimelineStatusKind::Creating))
|
||||
.or(timelines::status.eq(String::from(TimelineStatusKind::Deleting))),
|
||||
.or(timelines::status
|
||||
.eq(String::from(TimelineStatusKind::Deleting))),
|
||||
)
|
||||
.load::<TimelineFromDb>(conn)?)
|
||||
},
|
||||
@@ -1621,4 +1626,4 @@ impl From<TimelineStatusKind> for String {
|
||||
pub(crate) struct TimelineStatusCreating {
|
||||
pub(crate) pg_version: u32,
|
||||
pub(crate) start_lsn: Lsn,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
use safekeeper_api::models::{TimelineCreateRequest, TimelineStatus};
|
||||
use safekeeper_client::mgmt_api::{Client, Result};
|
||||
use utils::{id::NodeId, logging::SecretString};
|
||||
use utils::{
|
||||
id::{NodeId, TenantId, TimelineId},
|
||||
logging::SecretString,
|
||||
};
|
||||
|
||||
/// Thin wrapper around [`safekeeper_client::mgmt_api::Client`]. It allows the storage
|
||||
/// controller to collect metrics in a non-intrusive manner.
|
||||
@@ -74,4 +77,17 @@ impl SafekeeperClient {
|
||||
self.inner.create_timeline(&req).await
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_timeline(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> Result<TimelineStatus> {
|
||||
measured_request!(
|
||||
"delete_timeline",
|
||||
crate::metrics::Method::Delete,
|
||||
&self.node_id_label,
|
||||
self.inner.delete_timeline(tenant_id, timeline_id).await
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3550,7 +3550,7 @@ impl Service {
|
||||
// TODO
|
||||
|
||||
self.persistence
|
||||
.update_timeline(tenant_id, timeline_id, status_kind)
|
||||
.update_timeline_status(tenant_id, timeline_id, status_kind, "{}".to_owned())
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -4118,6 +4118,183 @@ impl Service {
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn tenant_timeline_delete_safekeepers_reconcile(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
tl_p: &TimelinePersistence,
|
||||
sk_persistences: &HashMap<i64, SafekeeperPersistence>,
|
||||
) -> Result<(), ApiError> {
|
||||
// If at least one deletion succeeded, return if we are outside of a specified timeout
|
||||
let jwt = self.config.jwt_token.clone().map(SecretString::from);
|
||||
let mut joinset = JoinSet::new();
|
||||
|
||||
let mut members = Vec::new();
|
||||
for sk in tl_p.sk_set.iter() {
|
||||
let Some(sk_p) = sk_persistences.get(&sk) else {
|
||||
return Err(ApiError::InternalServerError(anyhow!(
|
||||
"couldn't find persisted entry for safekeeper with id {sk}"
|
||||
)))?;
|
||||
};
|
||||
members.push(SafekeeperId {
|
||||
id: NodeId(sk_p.id as u64),
|
||||
host: sk_p.host.clone(),
|
||||
pg_port: sk_p.port as u16,
|
||||
});
|
||||
}
|
||||
|
||||
let sks_to_reconcile = &tl_p.sk_set;
|
||||
for sk in sks_to_reconcile.iter() {
|
||||
// Unwrap is fine as we already would have returned error above
|
||||
let sk_p = sk_persistences.get(&sk).unwrap();
|
||||
let sk_clone = NodeId(*sk as u64);
|
||||
let base_url = sk_p.base_url();
|
||||
let jwt = jwt.clone();
|
||||
let cancel = self.cancel.clone();
|
||||
joinset.spawn(async move {
|
||||
let client = SafekeeperClient::new(sk_clone, base_url, jwt);
|
||||
let retry_result = backoff::retry(
|
||||
|| client.delete_timeline(tenant_id, timeline_id),
|
||||
|_e| {
|
||||
// TODO find right criteria here for deciding on retries
|
||||
false
|
||||
},
|
||||
3,
|
||||
5,
|
||||
"delete timeline on safekeeper",
|
||||
&cancel,
|
||||
)
|
||||
.await;
|
||||
if let Some(res) = retry_result {
|
||||
res.map_err(|e| {
|
||||
ApiError::InternalServerError(
|
||||
anyhow::Error::new(e).context("error deleting timeline on safekeeper"),
|
||||
)
|
||||
})
|
||||
} else {
|
||||
Err(ApiError::Cancelled)
|
||||
}
|
||||
});
|
||||
}
|
||||
// After we have built the joinset, we now wait for the tasks to complete,
|
||||
// but with a specified timeout to make sure we return swiftly, either with
|
||||
// a failure or success.
|
||||
const SK_DELETE_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
let reconcile_deadline = tokio::time::Instant::now() + SK_DELETE_TIMELINE_RECONCILE_TIMEOUT;
|
||||
|
||||
// Treat the first task to finish differently, mostly when it times out,
|
||||
// because then we won't have any successful deletion.
|
||||
// For the second and third task, we don't rely on them succeeding, and we need this to support
|
||||
// continuing operations even if a safekeeper is down.
|
||||
let timeout_or_first = tokio::time::timeout_at(reconcile_deadline, async {
|
||||
joinset.join_next().await.unwrap()
|
||||
})
|
||||
.await;
|
||||
let mut reconcile_results = Vec::new();
|
||||
match timeout_or_first {
|
||||
Ok(Ok(res_1)) => {
|
||||
reconcile_results.push(res_1);
|
||||
}
|
||||
Ok(Err(_)) => {
|
||||
return Err(ApiError::InternalServerError(anyhow!(
|
||||
"task was cancelled while reconciling timeline deletion"
|
||||
)));
|
||||
}
|
||||
Err(_) => {
|
||||
return Err(ApiError::InternalServerError(anyhow!(
|
||||
"couldn't reconcile timeline deletion on safekeepers within timeout"
|
||||
)));
|
||||
}
|
||||
}
|
||||
let timeout_or_last = tokio::time::timeout_at(reconcile_deadline, async {
|
||||
while let Some(next_res) = joinset.join_next().await {
|
||||
match next_res {
|
||||
Ok(res) => {
|
||||
reconcile_results.push(res);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::info!("aborting reconciliation due to join error: {e:?}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
if let Err(_) = timeout_or_last.await {
|
||||
// No error if cancelled or timed out: we already have feedback from a quorum of safekeepers
|
||||
tracing::info!(
|
||||
"timeout for last {} reconciliations",
|
||||
sks_to_reconcile.len() - 1
|
||||
);
|
||||
}
|
||||
// check now if quorum was reached in reconcile_results
|
||||
let successful = reconcile_results
|
||||
.into_iter()
|
||||
.filter_map(|res| res.ok())
|
||||
.collect::<Vec<_>>();
|
||||
tracing::info!(
|
||||
"Got {} successful results from reconciliation",
|
||||
successful.len()
|
||||
);
|
||||
let new_status_kind = if successful.len() < 1 {
|
||||
// Failure
|
||||
return Err(ApiError::InternalServerError(anyhow!(
|
||||
"not enough successful reconciliations to reach quorum, please retry: {}",
|
||||
successful.len()
|
||||
)));
|
||||
} else if successful.len() == sks_to_reconcile.len() {
|
||||
// Success, state of timeline is Deleted
|
||||
TimelineStatusKind::Deleted
|
||||
} else if successful.len() == 2 {
|
||||
// Success, state of timeline remains Creating
|
||||
TimelineStatusKind::Deleting
|
||||
} else {
|
||||
unreachable!(
|
||||
"unexpected number of successful reconciliations {}",
|
||||
successful.len()
|
||||
);
|
||||
};
|
||||
|
||||
if new_status_kind == TimelineStatusKind::Deleted {
|
||||
self.persistence
|
||||
.update_timeline_status(tenant_id, timeline_id, new_status_kind, "{}".to_owned())
|
||||
.await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
async fn tenant_timeline_delete_safekeepers(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> Result<(), ApiError> {
|
||||
let tl = self
|
||||
.persistence
|
||||
.get_timeline(tenant_id, timeline_id)
|
||||
.await?;
|
||||
let status_kind =
|
||||
TimelineStatusKind::from_str(&tl.status_kind).map_err(ApiError::InternalServerError)?;
|
||||
if status_kind != TimelineStatusKind::Deleting {
|
||||
// Set status to deleting
|
||||
let new_status_kind = TimelineStatusKind::Deleting;
|
||||
self.persistence
|
||||
.update_timeline_status(tenant_id, timeline_id, new_status_kind, "{}".to_owned())
|
||||
.await?;
|
||||
}
|
||||
let sk_persistences = self
|
||||
.persistence
|
||||
.list_safekeepers()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|p| (p.id, p))
|
||||
.collect::<HashMap<_, _>>();
|
||||
self.tenant_timeline_delete_safekeepers_reconcile(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
&tl,
|
||||
&sk_persistences,
|
||||
)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
pub(crate) async fn tenant_timeline_delete(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
@@ -4131,7 +4308,7 @@ impl Service {
|
||||
)
|
||||
.await;
|
||||
|
||||
self.tenant_remote_mutation(tenant_id, move |mut targets| async move {
|
||||
let ps_fut = self.tenant_remote_mutation(tenant_id, move |mut targets| async move {
|
||||
if targets.0.is_empty() {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Tenant not found").into(),
|
||||
@@ -4203,7 +4380,13 @@ impl Service {
|
||||
)
|
||||
.await?;
|
||||
Ok(shard_zero_status)
|
||||
}).await?
|
||||
});
|
||||
|
||||
let sk_fut = self.tenant_timeline_delete_safekeepers(tenant_id, timeline_id);
|
||||
|
||||
let (ps_res, sk_res) = tokio::join!(ps_fut, sk_fut);
|
||||
sk_res?;
|
||||
ps_res?
|
||||
}
|
||||
|
||||
/// When you need to send an HTTP request to the pageserver that holds shard0 of a tenant, this
|
||||
|
||||
@@ -114,7 +114,14 @@ impl SafekeeperReconciler {
|
||||
.await?;
|
||||
}
|
||||
TimelineStatusKind::Deleting => {
|
||||
todo!()
|
||||
self.service
|
||||
.tenant_timeline_delete_safekeepers_reconcile(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
&tl,
|
||||
sk_persistences,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
||||
Reference in New Issue
Block a user