trigger now only does insertions; app loop will do cleanup; prepare API for cleanup

This commit is contained in:
Christian Schwarz
2025-05-30 20:36:50 +02:00
parent 8eb853b731
commit afa35eea87
8 changed files with 122 additions and 96 deletions

View File

@@ -1905,12 +1905,14 @@ impl Persistence {
pub(crate) async fn update_sk_ps_discovery_attempt(
&self,
pk: SkPsDiscoveryPersistencePk,
intent_state: String,
update: Result<(), ()>,
) -> DatabaseResult<()> {
use crate::schema::sk_ps_discovery::dsl;
self.with_measured_conn(DatabaseOperation::UpdateSkPsDiscoveryAttempt, move |conn| {
let pk = pk.clone();
let intent_state = intent_state.clone();
Box::pin(async move {
match update {
Ok(()) => {
@@ -1921,14 +1923,23 @@ impl Persistence {
ps_generation,
sk_id,
} = pk;
diesel::delete(dsl::sk_ps_discovery)
let nrows = diesel::delete(dsl::sk_ps_discovery)
// primary key
.filter(dsl::tenant_id.eq(tenant_id))
.filter(dsl::shard_number.eq(shard_number))
.filter(dsl::shard_count.eq(shard_count))
.filter(dsl::ps_generation.eq(ps_generation))
.filter(dsl::sk_id.eq(sk_id))
.execute(conn) // TODO: check update count?
// intent_state could have changed beneath us (split brain or concurrent state gc)
// TODO: this could also just be a globally monotonic sequence number, maybe easier to reason about?
.filter(dsl::intent_state.eq(intent_state))
.execute(conn)
.await?;
if nrows != 1 {
return Err(DatabaseError::Logical(format!(
"unexpected number of deletes: {nrows}"
)));
}
}
Err(_) => {
let SkPsDiscoveryPersistencePk {
@@ -1939,18 +1950,28 @@ impl Persistence {
sk_id,
} = pk;
diesel::update(dsl::sk_ps_discovery)
let nrows = diesel::update(dsl::sk_ps_discovery)
// primary key
.filter(dsl::tenant_id.eq(tenant_id))
.filter(dsl::shard_number.eq(shard_number))
.filter(dsl::shard_count.eq(shard_count))
.filter(dsl::ps_generation.eq(ps_generation))
.filter(dsl::sk_id.eq(sk_id))
// intent_state could have changed beneath us (split brain or concurrent state gc)
// TODO: this could also just be a globally monotonic sequence number, maybe easier to reason about?
.filter(dsl::intent_state.eq(intent_state))
// action:
.set((
dsl::retries.eq(dsl::retries.add(1)), // XXX: in split-brain situation we would bump twice...
dsl::last_retry_at.eq(diesel::dsl::now),
))
.execute(conn) // TODO: check update count?
.await?;
if nrows != 1 {
return Err(DatabaseError::Logical(format!(
"unexpected number of updates: {nrows}"
)));
}
}
}
Ok(())
@@ -2543,7 +2564,7 @@ pub(crate) struct TimelineImportPersistence {
pub(crate) shard_statuses: serde_json::Value,
}
#[derive(Insertable, AsChangeset, Selectable, Clone)]
#[derive(Insertable, AsChangeset, Selectable, Clone, PartialEq, Eq, Hash, Debug)]
#[diesel(table_name = crate::schema::sk_ps_discovery)]
pub(crate) struct SkPsDiscoveryPersistencePk {
pub(crate) tenant_id: String,
@@ -2561,10 +2582,12 @@ pub(crate) struct SkPsDiscoveryPersistence {
pub(crate) shard_count: i32,
pub(crate) ps_generation: i32,
pub(crate) sk_id: i64,
pub(crate) intent_state: String,
pub(crate) ps_id: i64,
pub(crate) created_at: chrono::DateTime<chrono::Utc>,
pub(crate) retries: i32,
pub(crate) last_retry_at: Option<chrono::DateTime<chrono::Utc>>,
pub(crate) acknowledged_at: Option<chrono::DateTime<chrono::Utc>>,
}
impl SkPsDiscoveryPersistence {
@@ -2575,4 +2598,26 @@ impl SkPsDiscoveryPersistence {
shard_count: ShardCount::new(self.shard_count as u8),
})
}
pub(crate) fn primary_key(&self) -> SkPsDiscoveryPersistencePk {
let SkPsDiscoveryPersistence {
tenant_id,
shard_number,
shard_count,
ps_generation,
sk_id,
intent_state: _,
ps_id: _,
created_at: _,
retries: _,
last_retry_at: _,
acknowledged_at: _,
} = self;
SkPsDiscoveryPersistencePk {
tenant_id: tenant_id.clone(),
shard_number: *shard_number,
shard_count: *shard_count,
ps_generation: *ps_generation,
sk_id: *sk_id,
}
}
}

View File

@@ -1,6 +1,6 @@
use safekeeper_api::models::{
self, PullTimelineRequest, PullTimelineResponse, SafekeeperUtilization,
TenantShardPageserverAttachments, TimelineCreateRequest,
TenantShardPageserverAttachmentChange, TimelineCreateRequest,
};
use safekeeper_client::mgmt_api::{Client, Result};
use utils::id::{NodeId, TenantId, TimelineId};
@@ -167,17 +167,17 @@ impl SafekeeperClient {
)
}
pub async fn put_tenant_shard_pageserver_attachments(
pub async fn post_tenant_shard_pageserver_attachments(
&self,
tenant_shard_id: TenantShardId,
attachments: TenantShardPageserverAttachments,
body: TenantShardPageserverAttachmentChange,
) -> Result<()> {
measured_request!(
"put_tenant_shard_pageserver_attachments",
crate::metrics::Method::Put,
"post_tenant_shard_pageserver_attachments",
crate::metrics::Method::Post,
&self.node_id_label,
self.inner
.put_tenant_shard_pageserver_attachments(tenant_shard_id, attachments)
.post_tenant_shard_pageserver_attachments(tenant_shard_id, body)
.await
)
}

View File

@@ -67,10 +67,12 @@ diesel::table! {
shard_count -> Int4,
ps_generation -> Int4,
sk_id -> Int8,
intent_state -> Varchar,
ps_id -> Int8,
created_at -> Timestamptz,
retries -> Int4,
last_retry_at -> Nullable<Timestamptz>,
acknowledged_at -> Nullable<Timestamptz>,
}
}

View File

@@ -6,7 +6,9 @@ use std::{
use anyhow::Context;
use futures::{StreamExt, stream::FuturesUnordered};
use safekeeper_api::models::{TenantShardPageserverAttachment, TenantShardPageserverAttachments};
use safekeeper_api::models::{
TenantShardPageserverAttachment, TenantShardPageserverAttachmentChange,
};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, Span, error, info, info_span};
@@ -14,12 +16,11 @@ use utils::{
generation::Generation,
id::{NodeId, TenantId},
logging::SecretString,
shard::TenantShardId,
};
use crate::{
heartbeater::SafekeeperState,
persistence::{Persistence, SkPsDiscoveryPersistence, SkPsDiscoveryPersistencePk},
persistence::{Persistence, SkPsDiscoveryPersistence},
};
use super::Service;
@@ -68,27 +69,11 @@ impl Actor {
let mut sync_full_ticker = tokio::time::interval(std::time::Duration::from_secs(5));
#[derive(PartialEq, Eq, Hash, Debug)]
struct TaskKey {
tenant_shard_id: TenantShardId,
ps_generation: Generation,
sk_id: NodeId,
}
struct Task {
work: SkPsDiscoveryPersistence,
cancel: CancellationToken,
join_handle: Option<JoinHandle<()>>,
}
impl<'a> TryFrom<&'a SkPsDiscoveryPersistence> for TaskKey {
type Error = hex::FromHexError;
fn try_from(value: &'a SkPsDiscoveryPersistence) -> Result<Self, Self::Error> {
Ok(Self {
tenant_shard_id: value.tenant_shard_id()?,
ps_generation: Generation::new(value.ps_generation as u32),
sk_id: NodeId(value.sk_id as u64),
})
}
}
let mut tasks = HashMap::new();
loop {
@@ -123,19 +108,19 @@ impl Actor {
.context("get_all_sk_ps_discovery_work")?
.into_iter()
.map(|work: SkPsDiscoveryPersistence| {
anyhow::Ok((
TaskKey::try_from(&work)?,
(
work.primary_key(),
Task {
work,
cancel: CancellationToken::new(),
join_handle: None,
},
))
)
})
.collect::<Result<HashMap<_, _>, _>>()?;
.collect::<HashMap<_, _>>();
// Carry over ongoing tasks
let mut ongoing_wait = FuturesUnordered::new();
let mut cancelled_wait = FuturesUnordered::new();
for (
task_key,
Task {
@@ -156,26 +141,19 @@ impl Actor {
if *planned_persistence == ongoing_persistence {
*planned_jh = join_handle;
*planned_cancel = cancel;
} else {
match join_handle {
Some(jh) => {
cancel.cancel();
ongoing_wait.push(jh);
}
None => (),
}
continue;
}
}
hash_map::Entry::Vacant(_) => match join_handle {
Some(jh) => {
cancel.cancel();
ongoing_wait.push(jh);
}
None => (),
},
hash_map::Entry::Vacant(_) => (),
}
cancel.cancel();
cancelled_wait.push(async move {
if let Some(jh) = join_handle {
let _ = jh.await;
}
});
}
while let Some(_) = ongoing_wait.next().await {}
while let Some(_) = cancelled_wait.next().await {}
tasks = new_tasks;
// Kick off new tasks
@@ -222,13 +200,8 @@ impl DeliveryAttempt {
let res = self
.persistence
.update_sk_ps_discovery_attempt(
SkPsDiscoveryPersistencePk {
tenant_id: self.work.tenant_id,
shard_number: self.work.shard_number,
shard_count: self.work.shard_count,
ps_generation: self.work.ps_generation,
sk_id: self.work.sk_id,
},
self.work.primary_key(),
self.work.intent_state.clone(),
res.map_err(|_| ()),
)
.await;
@@ -247,20 +220,27 @@ impl DeliveryAttempt {
anyhow::bail!("safekeeper is offline");
}
}
let body = {
let val = TenantShardPageserverAttachment {
ps_id: NodeId(self.work.ps_id as u64),
generation: Generation::new(self.work.ps_generation as u32),
};
match self.work.intent_state.as_str() {
"attached" => TenantShardPageserverAttachmentChange::Attach(val),
"detached" => TenantShardPageserverAttachmentChange::Detach(val),
x => anyhow::bail!("unknown intent state {x:?}"),
}
};
let tenant_shard_id = self.work.tenant_shard_id()?;
sk.with_client_retries(
|client| async move {
client
.put_tenant_shard_pageserver_attachments(
tenant_shard_id,
TenantShardPageserverAttachments {
attachments: vec![TenantShardPageserverAttachment {
ps_id: NodeId(self.work.ps_id as u64),
generation: Generation::new(self.work.ps_generation as u32),
}],
},
)
.await
|client| {
let body = body.clone();
async move {
client
.post_tenant_shard_pageserver_attachments(tenant_shard_id, body)
.await
}
},
&self.http_client,
&self