storcon: add force_upsert flag to timeline_import endpoint (#12622)

It is useful to have ability to update an existing timeline entry, as a
way to mirror legacy migrations to the storcon managed table.
This commit is contained in:
Arpad Müller
2025-07-21 23:14:15 +02:00
committed by GitHub
parent b7bc3ce61e
commit 80baeaa084
3 changed files with 63 additions and 4 deletions

View File

@@ -596,6 +596,7 @@ pub struct TimelineImportRequest {
pub timeline_id: TimelineId,
pub start_lsn: Lsn,
pub sk_set: Vec<NodeId>,
pub force_upsert: bool,
}
#[derive(serde::Serialize, serde::Deserialize, Clone)]

View File

@@ -129,6 +129,7 @@ pub(crate) enum DatabaseOperation {
UpdateLeader,
SetPreferredAzs,
InsertTimeline,
UpdateTimeline,
UpdateTimelineMembership,
GetTimeline,
InsertTimelineReconcile,
@@ -1463,6 +1464,36 @@ impl Persistence {
.await
}
/// Update an already present timeline.
/// VERY UNSAFE FUNCTION: this overrides in-progress migrations. Don't use this unless neccessary.
pub(crate) async fn update_timeline_unsafe(
&self,
entry: TimelineUpdate,
) -> DatabaseResult<bool> {
use crate::schema::timelines;
let entry = &entry;
self.with_measured_conn(DatabaseOperation::UpdateTimeline, move |conn| {
Box::pin(async move {
let inserted_updated = diesel::update(timelines::table)
.filter(timelines::tenant_id.eq(&entry.tenant_id))
.filter(timelines::timeline_id.eq(&entry.timeline_id))
.set(entry)
.execute(conn)
.await?;
match inserted_updated {
0 => Ok(false),
1 => Ok(true),
_ => Err(DatabaseError::Logical(format!(
"unexpected number of rows ({inserted_updated})"
))),
}
})
})
.await
}
/// Update timeline membership configuration in the database.
/// Perform a compare-and-swap (CAS) operation on the timeline's generation.
/// The `new_generation` must be the next (+1) generation after the one in the database.
@@ -2503,6 +2534,18 @@ impl TimelineFromDb {
}
}
// This is separate from TimelinePersistence because we don't want to touch generation and deleted_at values for the update.
#[derive(AsChangeset)]
#[diesel(table_name = crate::schema::timelines)]
#[diesel(treat_none_as_null = true)]
pub(crate) struct TimelineUpdate {
pub(crate) tenant_id: String,
pub(crate) timeline_id: String,
pub(crate) start_lsn: LsnWrapper,
pub(crate) sk_set: Vec<i64>,
pub(crate) new_sk_set: Option<Vec<i64>>,
}
#[derive(Insertable, AsChangeset, Queryable, Selectable, Clone)]
#[diesel(table_name = crate::schema::safekeeper_timeline_pending_ops)]
pub(crate) struct TimelinePendingOpPersistence {

View File

@@ -10,6 +10,7 @@ use crate::id_lock_map::trace_shared_lock;
use crate::metrics;
use crate::persistence::{
DatabaseError, SafekeeperTimelineOpKind, TimelinePendingOpPersistence, TimelinePersistence,
TimelineUpdate,
};
use crate::safekeeper::Safekeeper;
use crate::safekeeper_client::SafekeeperClient;
@@ -454,19 +455,33 @@ impl Service {
let persistence = TimelinePersistence {
tenant_id: req.tenant_id.to_string(),
timeline_id: req.timeline_id.to_string(),
start_lsn: Lsn::INVALID.into(),
start_lsn: req.start_lsn.into(),
generation: 1,
sk_set: req.sk_set.iter().map(|sk_id| sk_id.0 as i64).collect(),
new_sk_set: None,
cplane_notified_generation: 1,
deleted_at: None,
};
let inserted = self.persistence.insert_timeline(persistence).await?;
let inserted = self
.persistence
.insert_timeline(persistence.clone())
.await?;
if inserted {
tracing::info!("imported timeline into db");
} else {
tracing::info!("didn't import timeline into db, as it is already present in db");
return Ok(());
}
tracing::info!("timeline already present in db, updating");
let update = TimelineUpdate {
tenant_id: persistence.tenant_id,
timeline_id: persistence.timeline_id,
start_lsn: persistence.start_lsn,
sk_set: persistence.sk_set,
new_sk_set: persistence.new_sk_set,
};
self.persistence.update_timeline_unsafe(update).await?;
tracing::info!("timeline updated");
Ok(())
}