From c660d787e4cc3d2583a719fff24d4f8620047426 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Mon, 27 Jan 2025 20:05:11 +0100 Subject: [PATCH] Fixes due to merge --- storage_controller/src/persistence.rs | 96 ++++++++++++++------------- 1 file changed, 50 insertions(+), 46 deletions(-) diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index 1ad2082051..62c5652b4f 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -1194,14 +1194,16 @@ impl Persistence { let safekeepers: Vec = self .with_measured_conn( DatabaseOperation::ListSafekeepers, - move |conn| -> DatabaseResult<_> { - let query = diesel::sql_query("\ - SELECT safekeepers.id as sk_id, safekeepers.availability_zone_id as az_id, COUNT(*) as timeline_count, safekeepers.host as host, safekeepers.port as port \ - FROM (select tenant_id, timeline_id, unnest(sk_set) as sk_id from timelines) as timelines_unnested \ - JOIN safekeepers ON (safekeepers.id = timelines_unnested.id)\ - "); - let results: Vec<_> = query.load(conn)?; - Ok(results) + move |conn| { + Box::pin(async move { + let query = diesel::sql_query("\ + SELECT safekeepers.id as sk_id, safekeepers.availability_zone_id as az_id, COUNT(*) as timeline_count, safekeepers.host as host, safekeepers.port as port \ + FROM (select tenant_id, timeline_id, unnest(sk_set) as sk_id from timelines) as timelines_unnested \ + JOIN safekeepers ON (safekeepers.id = timelines_unnested.id)\ + "); + let results: Vec<_> = query.load(conn).await?; + Ok(results) + }) }, ) .await?; @@ -1322,14 +1324,15 @@ impl Persistence { pub(crate) async fn insert_timeline(&self, entry: TimelinePersistence) -> DatabaseResult<()> { use crate::schema::timelines; - self.with_measured_conn( - DatabaseOperation::InsertTimeline, - move |conn| -> DatabaseResult<()> { + let entry = &entry; + self.with_measured_conn(DatabaseOperation::InsertTimeline, move |conn| { + Box::pin(async move { let inserted_updated = diesel::insert_into(timelines::table) - .values(&entry) + .values(entry) .on_conflict((timelines::tenant_id, timelines::timeline_id)) .do_nothing() - .execute(conn)?; + .execute(conn) + .await?; if inserted_updated != 1 { return Err(DatabaseError::Logical(format!( @@ -1339,8 +1342,8 @@ impl Persistence { } Ok(()) - }, - ) + }) + }) .await } pub(crate) async fn update_timeline_status( @@ -1352,17 +1355,18 @@ impl Persistence { ) -> DatabaseResult<()> { use crate::schema::timelines; - self.with_measured_conn( - DatabaseOperation::InsertTimeline, - move |conn| -> DatabaseResult<()> { + let status = &status; + self.with_measured_conn(DatabaseOperation::InsertTimeline, move |conn| { + Box::pin(async move { let inserted_updated = diesel::update(timelines::table) .filter(timelines::tenant_id.eq(tenant_id.to_string())) .filter(timelines::timeline_id.eq(timeline_id.to_string())) .set(( timelines::status_kind.eq(String::from(status_kind)), - timelines::status.eq(status.clone()), + timelines::status.eq(status), )) - .execute(conn)?; + .execute(conn) + .await?; if inserted_updated != 1 { return Err(DatabaseError::Logical(format!( @@ -1372,8 +1376,8 @@ impl Persistence { } Ok(()) - }, - ) + }) + }) .await } pub(crate) async fn update_timeline_status_deleted( @@ -1384,9 +1388,8 @@ impl Persistence { use crate::schema::timelines; let now = chrono::offset::Utc::now(); - self.with_measured_conn( - DatabaseOperation::InsertTimeline, - move |conn| -> DatabaseResult<()> { + self.with_measured_conn(DatabaseOperation::InsertTimeline, move |conn| { + Box::pin(async move { let inserted_updated = diesel::update(timelines::table) .filter(timelines::tenant_id.eq(tenant_id.to_string())) .filter(timelines::timeline_id.eq(timeline_id.to_string())) @@ -1396,7 +1399,8 @@ impl Persistence { timelines::status.eq("{}"), timelines::deleted_at.eq(now), )) - .execute(conn)?; + .execute(conn) + .await?; if inserted_updated != 1 { return Err(DatabaseError::Logical(format!( @@ -1406,8 +1410,8 @@ impl Persistence { } Ok(()) - }, - ) + }) + }) .await } @@ -1419,15 +1423,15 @@ impl Persistence { ) -> DatabaseResult> { use crate::schema::timelines; let mut timelines: Vec = self - .with_measured_conn( - DatabaseOperation::LoadTimeline, - move |conn| -> DatabaseResult<_> { + .with_measured_conn(DatabaseOperation::LoadTimeline, move |conn| { + Box::pin(async move { Ok(timelines::table .filter(timelines::tenant_id.eq(tenant_id.to_string())) .filter(timelines::timeline_id.eq(timeline_id.to_string())) - .load::(conn)?) - }, - ) + .load::(conn) + .await?) + }) + }) .await?; if timelines.is_empty() { return Ok(None); @@ -1452,9 +1456,8 @@ impl Persistence { ) -> DatabaseResult<()> { use crate::schema::timelines::dsl::*; let count = self - .with_measured_conn( - DatabaseOperation::DeleteTenant, - move |conn| -> DatabaseResult { + .with_measured_conn(DatabaseOperation::DeleteTenant, move |conn| { + Box::pin(async move { Ok(diesel::update(timelines) .filter(tenant_id.eq(del_tenant_id.to_string())) .filter(status_kind.ne(String::from(TimelineStatusKind::Deleted))) @@ -1462,9 +1465,10 @@ impl Persistence { status.eq(String::from("")), status_kind.eq(String::from(TimelineStatusKind::Deleted)), )) - .execute(conn)?) - }, - ) + .execute(conn) + .await?) + }) + }) .await?; tracing::info!("marked {count} timelines for deletion in timelines table"); @@ -1477,9 +1481,8 @@ impl Persistence { ) -> DatabaseResult> { use crate::schema::timelines; let timelines: Vec = self - .with_measured_conn( - DatabaseOperation::ListTimelines, - move |conn| -> DatabaseResult<_> { + .with_measured_conn(DatabaseOperation::ListTimelines, move |conn| { + Box::pin(async move { Ok(timelines::table .filter( timelines::status @@ -1487,9 +1490,10 @@ impl Persistence { .or(timelines::status .eq(String::from(TimelineStatusKind::Deleting))), ) - .load::(conn)?) - }, - ) + .load::(conn) + .await?) + }) + }) .await?; let timelines = timelines .into_iter()