From 1fdbef9a4442d07766f7eeec7ac3e87b0aa10a48 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Fri, 26 Jul 2024 16:50:12 +0100 Subject: [PATCH] storcon/persistence: add leader table primitives --- storage_controller/src/persistence.rs | 77 +++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index 64a3e597ce..76d106c438 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -95,6 +95,8 @@ pub(crate) enum DatabaseOperation { ListMetadataHealth, ListMetadataHealthUnhealthy, ListMetadataHealthOutdated, + GetLeader, + UpdateLeader, } #[must_use] @@ -785,6 +787,71 @@ impl Persistence { ) .await } + + /// Get the current entry from the `leader` table if one exists. + /// It is an error for the table to contain more than one entry. + pub(crate) async fn get_leader(&self) -> DatabaseResult> { + let mut leader: Vec = self + .with_measured_conn( + DatabaseOperation::GetLeader, + move |conn| -> DatabaseResult<_> { + Ok(crate::schema::leader::table.load::(conn)?) + }, + ) + .await?; + + if leader.len() > 1 { + return Err(DatabaseError::Logical(format!( + "More than one entry present in the leader table: {leader:?}" + ))); + } + + Ok(leader.pop()) + } + + /// Update the new leader with compare-exchange semantics. If `prev` does not + /// match the current leader entry, then the update is treated as a failure. + /// When `prev` is not specified, the update is forced. + pub(crate) async fn update_leader( + &self, + prev: Option, + new: LeaderPersistence, + ) -> DatabaseResult<()> { + use crate::schema::leader::dsl::*; + + let updated = self + .with_measured_conn( + DatabaseOperation::UpdateLeader, + move |conn| -> DatabaseResult { + let updated = match &prev { + Some(prev) => diesel::update(leader) + .filter(hostname.eq(prev.hostname.clone())) + .filter(port.eq(prev.port)) + .filter(started_at.eq(prev.started_at)) + .set(( + hostname.eq(new.hostname.clone()), + port.eq(new.port), + started_at.eq(new.started_at), + )) + .execute(conn)?, + None => diesel::insert_into(leader) + .values(new.clone()) + .execute(conn)?, + }; + + Ok(updated) + }, + ) + .await?; + + if updated == 0 { + return Err(DatabaseError::Logical( + "Leader table update failed".to_string(), + )); + } + + Ok(()) + } } /// Parts of [`crate::tenant_shard::TenantShard`] that are stored durably @@ -910,3 +977,13 @@ impl From for MetadataHealthRecord { } } } + +#[derive( + Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq, Debug, Clone, +)] +#[diesel(table_name = crate::schema::leader)] +pub(crate) struct LeaderPersistence { + pub(crate) hostname: String, + pub(crate) port: i32, + pub(crate) started_at: chrono::DateTime, +}