Fixes due to merge

This commit is contained in:
Arpad Müller
2025-01-27 20:05:11 +01:00
parent ad56b9f76b
commit c660d787e4

View File

@@ -1194,14 +1194,16 @@ impl Persistence {
let safekeepers: Vec<SafekeeperTimelineCountResponse> = self
.with_measured_conn(
DatabaseOperation::ListSafekeepers,
move |conn| -> DatabaseResult<_> {
let query = diesel::sql_query("\
SELECT safekeepers.id as sk_id, safekeepers.availability_zone_id as az_id, COUNT(*) as timeline_count, safekeepers.host as host, safekeepers.port as port \
FROM (select tenant_id, timeline_id, unnest(sk_set) as sk_id from timelines) as timelines_unnested \
JOIN safekeepers ON (safekeepers.id = timelines_unnested.id)\
");
let results: Vec<_> = query.load(conn)?;
Ok(results)
move |conn| {
Box::pin(async move {
let query = diesel::sql_query("\
SELECT safekeepers.id as sk_id, safekeepers.availability_zone_id as az_id, COUNT(*) as timeline_count, safekeepers.host as host, safekeepers.port as port \
FROM (select tenant_id, timeline_id, unnest(sk_set) as sk_id from timelines) as timelines_unnested \
JOIN safekeepers ON (safekeepers.id = timelines_unnested.id)\
");
let results: Vec<_> = query.load(conn).await?;
Ok(results)
})
},
)
.await?;
@@ -1322,14 +1324,15 @@ impl Persistence {
pub(crate) async fn insert_timeline(&self, entry: TimelinePersistence) -> DatabaseResult<()> {
use crate::schema::timelines;
self.with_measured_conn(
DatabaseOperation::InsertTimeline,
move |conn| -> DatabaseResult<()> {
let entry = &entry;
self.with_measured_conn(DatabaseOperation::InsertTimeline, move |conn| {
Box::pin(async move {
let inserted_updated = diesel::insert_into(timelines::table)
.values(&entry)
.values(entry)
.on_conflict((timelines::tenant_id, timelines::timeline_id))
.do_nothing()
.execute(conn)?;
.execute(conn)
.await?;
if inserted_updated != 1 {
return Err(DatabaseError::Logical(format!(
@@ -1339,8 +1342,8 @@ impl Persistence {
}
Ok(())
},
)
})
})
.await
}
pub(crate) async fn update_timeline_status(
@@ -1352,17 +1355,18 @@ impl Persistence {
) -> DatabaseResult<()> {
use crate::schema::timelines;
self.with_measured_conn(
DatabaseOperation::InsertTimeline,
move |conn| -> DatabaseResult<()> {
let status = &status;
self.with_measured_conn(DatabaseOperation::InsertTimeline, move |conn| {
Box::pin(async move {
let inserted_updated = diesel::update(timelines::table)
.filter(timelines::tenant_id.eq(tenant_id.to_string()))
.filter(timelines::timeline_id.eq(timeline_id.to_string()))
.set((
timelines::status_kind.eq(String::from(status_kind)),
timelines::status.eq(status.clone()),
timelines::status.eq(status),
))
.execute(conn)?;
.execute(conn)
.await?;
if inserted_updated != 1 {
return Err(DatabaseError::Logical(format!(
@@ -1372,8 +1376,8 @@ impl Persistence {
}
Ok(())
},
)
})
})
.await
}
pub(crate) async fn update_timeline_status_deleted(
@@ -1384,9 +1388,8 @@ impl Persistence {
use crate::schema::timelines;
let now = chrono::offset::Utc::now();
self.with_measured_conn(
DatabaseOperation::InsertTimeline,
move |conn| -> DatabaseResult<()> {
self.with_measured_conn(DatabaseOperation::InsertTimeline, move |conn| {
Box::pin(async move {
let inserted_updated = diesel::update(timelines::table)
.filter(timelines::tenant_id.eq(tenant_id.to_string()))
.filter(timelines::timeline_id.eq(timeline_id.to_string()))
@@ -1396,7 +1399,8 @@ impl Persistence {
timelines::status.eq("{}"),
timelines::deleted_at.eq(now),
))
.execute(conn)?;
.execute(conn)
.await?;
if inserted_updated != 1 {
return Err(DatabaseError::Logical(format!(
@@ -1406,8 +1410,8 @@ impl Persistence {
}
Ok(())
},
)
})
})
.await
}
@@ -1419,15 +1423,15 @@ impl Persistence {
) -> DatabaseResult<Option<TimelinePersistence>> {
use crate::schema::timelines;
let mut timelines: Vec<TimelineFromDb> = self
.with_measured_conn(
DatabaseOperation::LoadTimeline,
move |conn| -> DatabaseResult<_> {
.with_measured_conn(DatabaseOperation::LoadTimeline, move |conn| {
Box::pin(async move {
Ok(timelines::table
.filter(timelines::tenant_id.eq(tenant_id.to_string()))
.filter(timelines::timeline_id.eq(timeline_id.to_string()))
.load::<TimelineFromDb>(conn)?)
},
)
.load::<TimelineFromDb>(conn)
.await?)
})
})
.await?;
if timelines.is_empty() {
return Ok(None);
@@ -1452,9 +1456,8 @@ impl Persistence {
) -> DatabaseResult<()> {
use crate::schema::timelines::dsl::*;
let count = self
.with_measured_conn(
DatabaseOperation::DeleteTenant,
move |conn| -> DatabaseResult<usize> {
.with_measured_conn(DatabaseOperation::DeleteTenant, move |conn| {
Box::pin(async move {
Ok(diesel::update(timelines)
.filter(tenant_id.eq(del_tenant_id.to_string()))
.filter(status_kind.ne(String::from(TimelineStatusKind::Deleted)))
@@ -1462,9 +1465,10 @@ impl Persistence {
status.eq(String::from("")),
status_kind.eq(String::from(TimelineStatusKind::Deleted)),
))
.execute(conn)?)
},
)
.execute(conn)
.await?)
})
})
.await?;
tracing::info!("marked {count} timelines for deletion in timelines table");
@@ -1477,9 +1481,8 @@ impl Persistence {
) -> DatabaseResult<Vec<TimelinePersistence>> {
use crate::schema::timelines;
let timelines: Vec<TimelineFromDb> = self
.with_measured_conn(
DatabaseOperation::ListTimelines,
move |conn| -> DatabaseResult<_> {
.with_measured_conn(DatabaseOperation::ListTimelines, move |conn| {
Box::pin(async move {
Ok(timelines::table
.filter(
timelines::status
@@ -1487,9 +1490,10 @@ impl Persistence {
.or(timelines::status
.eq(String::from(TimelineStatusKind::Deleting))),
)
.load::<TimelineFromDb>(conn)?)
},
)
.load::<TimelineFromDb>(conn)
.await?)
})
})
.await?;
let timelines = timelines
.into_iter()