This commit is contained in:
Arpad Müller
2025-01-17 17:48:14 +01:00
parent e805058364
commit f0fe5fae6b
2 changed files with 47 additions and 15 deletions

View File

@@ -22,6 +22,7 @@ use pageserver_api::shard::ShardStripeSize;
use pageserver_api::shard::{ShardCount, ShardNumber, TenantShardId};
use serde::{Deserialize, Serialize};
use utils::generation::Generation;
use utils::id::TimelineId;
use utils::id::{NodeId, TenantId};
use crate::metrics::{
@@ -1220,6 +1221,35 @@ impl Persistence {
)
.await
}
pub(crate) async fn update_timeline(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
timeline_status: TimelineStatus,
) -> DatabaseResult<()> {
use crate::schema::timelines;
self.with_measured_conn(
DatabaseOperation::InsertTimeline,
move |conn| -> DatabaseResult<()> {
let inserted_updated = diesel::update(timelines::table)
.filter(timelines::tenant_id.eq(tenant_id.to_string()))
.filter(timelines::timeline_id.eq(timeline_id.to_string()))
.set(timelines::status.eq(String::from(timeline_status)))
.execute(conn)?;
if inserted_updated != 1 {
return Err(DatabaseError::Logical(format!(
"unexpected number of rows ({})",
inserted_updated
)));
}
Ok(())
},
)
.await
}
}
/// Parts of [`crate::tenant_shard::TenantShard`] that are stored durably

View File

@@ -3395,6 +3395,7 @@ impl Service {
Ok(timeline_info)
}).await??)
}
async fn tenant_timeline_create_safekeepers(
&self,
tenant_id: TenantId,
@@ -3408,10 +3409,9 @@ impl Service {
models::TimelineCreateRequestMode::Branch { .. } => timeline_info.last_record_lsn,
models::TimelineCreateRequestMode::ImportPgdata { .. } => {
// Can't do return Err because of async block, must do ? plus unreachable!()
Err(ApiError::InternalServerError(anyhow!(
return Err(ApiError::InternalServerError(anyhow!(
"import pgdata doesn't specify the start lsn, aborting creation on safekeepers"
)))?;
unreachable!()
}
};
@@ -3444,11 +3444,9 @@ impl Service {
let mut members = Vec::new();
for sk in sks.iter() {
let Some(sk_p) = sk_persistences.get(&(sk.0 as i64)) else {
// Can't do return Err because of async block, must do ? plus unreachable!()
Err(ApiError::InternalServerError(anyhow!(
return Err(ApiError::InternalServerError(anyhow!(
"couldn't find persisted entry for safekeeper with id {sk}"
)))?;
unreachable!()
};
members.push(SafekeeperId {
id: NodeId(sk_p.id as u64),
@@ -3510,16 +3508,14 @@ impl Service {
reconcile_results.push(res_2);
}
Ok((Err(_), Ok(_)) | (_, Err(_))) => {
Err(ApiError::InternalServerError(anyhow!(
return Err(ApiError::InternalServerError(anyhow!(
"task was cancelled while reconciling timeline creation"
)))?;
unreachable!()
)));
}
Err(_) => {
Err(ApiError::InternalServerError(anyhow!(
return Err(ApiError::InternalServerError(anyhow!(
"couldn't reconcile timeline creation on safekeepers within timeout"
)))?;
unreachable!()
)));
}
}
let timeout_or_last =
@@ -3542,17 +3538,23 @@ impl Service {
);
if successful.len() < 2 {
// Failure
return Err(ApiError::InternalServerError(anyhow!(
"not enough successful reconciliations to reach quorum, please retry: {}",
successful.len()
)));
} else if successful.len() == 3 {
// Success, state of timeline is
// Success, state of timeline is Created
self.persistence
.update_timeline(tenant_id, timeline_id, TimelineStatus::Created)
.await?;
} else if successful.len() == 2 {
// Success, state of timeline remains Creating
} else {
unreachable!(
"unexpected amount of successful reconciliations {}",
"unexpected number of successful reconciliations {}",
successful.len()
);
}
// TODO
// TODO update database state from "creating" to "created" or something
// notify cplane about creation
// TODO (this should probably be in a function so that the reconciler can use it too)