From 8eb853b731b445c92c6585244144fdd371010c7b Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 28 May 2025 19:29:32 +0200 Subject: [PATCH] finish the stub implementation of storcon side, it now PUTs to SKs and gets back 404s --- libs/safekeeper_api/src/models.rs | 12 + safekeeper/client/src/mgmt_api.rs | 18 +- storage_controller/Cargo.toml | 2 +- .../2025-05-26-105843_sk_ps_discovery/up.sql | 7 +- storage_controller/src/main.rs | 3 +- storage_controller/src/persistence.rs | 107 ++++++++ storage_controller/src/safekeeper_client.rs | 19 +- storage_controller/src/schema.rs | 38 ++- storage_controller/src/service.rs | 25 +- .../src/service/safekeeper_service.rs | 5 + .../src/service/sk_ps_discovery.rs | 242 ++++++++++++++++-- 11 files changed, 423 insertions(+), 55 deletions(-) diff --git a/libs/safekeeper_api/src/models.rs b/libs/safekeeper_api/src/models.rs index 8658dc4011..9460936084 100644 --- a/libs/safekeeper_api/src/models.rs +++ b/libs/safekeeper_api/src/models.rs @@ -6,6 +6,7 @@ use pageserver_api::shard::ShardIdentity; use postgres_ffi::TimestampTz; use serde::{Deserialize, Serialize}; use tokio::time::Instant; +use utils::generation::Generation; use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId}; use utils::lsn::Lsn; use utils::pageserver_feedback::PageserverFeedback; @@ -309,3 +310,14 @@ pub struct PullTimelineResponse { pub safekeeper_host: Option, // TODO: add more fields? } + +#[derive(Debug, Serialize, Deserialize)] +pub struct TenantShardPageserverAttachments { + pub attachments: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct TenantShardPageserverAttachment { + pub ps_id: NodeId, + pub generation: Generation, +} diff --git a/safekeeper/client/src/mgmt_api.rs b/safekeeper/client/src/mgmt_api.rs index b364ac8e48..0a52813e60 100644 --- a/safekeeper/client/src/mgmt_api.rs +++ b/safekeeper/client/src/mgmt_api.rs @@ -8,11 +8,12 @@ use std::error::Error as _; use http_utils::error::HttpErrorBody; use reqwest::{IntoUrl, Method, StatusCode}; use safekeeper_api::models::{ - self, PullTimelineRequest, PullTimelineResponse, SafekeeperUtilization, TimelineCreateRequest, - TimelineStatus, + self, PullTimelineRequest, PullTimelineResponse, SafekeeperUtilization, + TenantShardPageserverAttachments, TimelineCreateRequest, TimelineStatus, }; use utils::id::{NodeId, TenantId, TimelineId}; use utils::logging::SecretString; +use utils::shard::TenantShardId; #[derive(Debug, Clone)] pub struct Client { @@ -189,6 +190,19 @@ impl Client { resp.json().await.map_err(Error::ReceiveBody) } + pub async fn put_tenant_shard_pageserver_attachments( + &self, + tenant_shard_id: TenantShardId, + attachments: TenantShardPageserverAttachments, + ) -> Result<()> { + let uri = format!( + "{}/v1/tenant/{tenant_shard_id}/pageserver_attachments", + self.mgmt_api_endpoint + ); + let resp = self.put(uri, attachments).await?; + resp.json().await.map_err(Error::ReceiveBody) + } + async fn post( &self, uri: U, diff --git a/storage_controller/Cargo.toml b/storage_controller/Cargo.toml index b0d0ea0933..29ceea5bbe 100644 --- a/storage_controller/Cargo.toml +++ b/storage_controller/Cargo.toml @@ -70,4 +70,4 @@ http-utils = { path = "../libs/http-utils/" } utils = { path = "../libs/utils/" } metrics = { path = "../libs/metrics/" } control_plane = { path = "../control_plane" } -workspace_hack = { version = "0.1", path = "../workspace_hack" } \ No newline at end of file +workspace_hack = { version = "0.1", path = "../workspace_hack" } diff --git a/storage_controller/migrations/2025-05-26-105843_sk_ps_discovery/up.sql b/storage_controller/migrations/2025-05-26-105843_sk_ps_discovery/up.sql index d20894bce1..6742b063bc 100644 --- a/storage_controller/migrations/2025-05-26-105843_sk_ps_discovery/up.sql +++ b/storage_controller/migrations/2025-05-26-105843_sk_ps_discovery/up.sql @@ -6,7 +6,8 @@ CREATE TABLE "sk_ps_discovery"( "sk_id" INT8 NOT NULL REFERENCES "safekeepers"("id") ON DELETE CASCADE, -- more efficient that trigger on "safekeepers" "ps_id" INT8 NOT NULL REFERENCES "nodes"("node_id") ON DELETE CASCADE, -- more efficient that trigger on "nodes" "created_at" TIMESTAMPTZ NOT NULL, - "last_attempt_at" TIMESTAMPTZ, + "retries" INT4 NOT NULL DEFAULT 0, + "last_retry_at" TIMESTAMPTZ, PRIMARY KEY("tenant_id", "shard_number", "shard_count", "ps_generation", "sk_id") ); @@ -17,7 +18,7 @@ BEGIN DELETE FROM sk_ps_discovery WHERE tenant_id = ARG_TENANT_ID; - INSERT INTO sk_ps_discovery (tenant_id, shard_number, shard_count, ps_generation, sk_id, ps_id,created_at,last_attempt_at) + INSERT INTO sk_ps_discovery (tenant_id, shard_number, shard_count, ps_generation, sk_id, ps_id,created_at) WITH sk_timeline_attachments AS ( SELECT DISTINCT tenant_id,unnest(array_cat(sk_set, new_sk_set)) as sk_id FROM timelines WHERE @@ -25,7 +26,7 @@ BEGIN AND timelines.deleted_at IS NULL ) - SELECT tenant_shards.tenant_id, tenant_shards.shard_number, tenant_shards.shard_count, tenant_shards.generation, sk_timeline_attachments.sk_id, tenant_shards.generation_pageserver, NOW(), NULL + SELECT tenant_shards.tenant_id, tenant_shards.shard_number, tenant_shards.shard_count, tenant_shards.generation, sk_timeline_attachments.sk_id, tenant_shards.generation_pageserver, NOW() FROM tenant_shards INNER JOIN sk_timeline_attachments ON tenant_shards.tenant_id = sk_timeline_attachments.tenant_id; diff --git a/storage_controller/src/main.rs b/storage_controller/src/main.rs index f55a3d04de..6d8f1e5991 100644 --- a/storage_controller/src/main.rs +++ b/storage_controller/src/main.rs @@ -436,7 +436,8 @@ async fn async_main() -> anyhow::Result<()> { }; // Validate that we can connect to the database - Persistence::await_connection(secrets.database_url.clone(), args.db_connect_timeout.into()).await?; + Persistence::await_connection(secrets.database_url.clone(), args.db_connect_timeout.into()) + .await?; let persistence = Arc::new(Persistence::new(secrets.database_url).await); diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index 030c0d0c91..f0c7a7b65e 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -1,6 +1,7 @@ pub(crate) mod split_state; use std::collections::HashMap; use std::io::Write; +use std::ops::Add; use std::pin::Pin; use std::str::FromStr; use std::sync::Arc; @@ -139,6 +140,8 @@ pub(crate) enum DatabaseOperation { DeleteTimelineImport, ListTimelineImports, IsTenantImportingTimeline, + ListSkPsDiscovery, + UpdateSkPsDiscoveryAttempt, } #[must_use] @@ -1885,6 +1888,76 @@ impl Persistence { drop(client); })) } + + pub(crate) async fn get_all_sk_ps_discovery_work( + &self, + ) -> DatabaseResult> { + use crate::schema::sk_ps_discovery::dsl; + self.with_measured_conn(DatabaseOperation::ListSkPsDiscovery, move |conn| { + Box::pin(async move { + let vec: Vec = dsl::sk_ps_discovery.load(conn).await?; + Ok(vec) + }) + }) + .await + } + + pub(crate) async fn update_sk_ps_discovery_attempt( + &self, + pk: SkPsDiscoveryPersistencePk, + update: Result<(), ()>, + ) -> DatabaseResult<()> { + use crate::schema::sk_ps_discovery::dsl; + + self.with_measured_conn(DatabaseOperation::UpdateSkPsDiscoveryAttempt, move |conn| { + let pk = pk.clone(); + Box::pin(async move { + match update { + Ok(()) => { + let SkPsDiscoveryPersistencePk { + tenant_id, + shard_number, + shard_count, + ps_generation, + sk_id, + } = pk; + diesel::delete(dsl::sk_ps_discovery) + .filter(dsl::tenant_id.eq(tenant_id)) + .filter(dsl::shard_number.eq(shard_number)) + .filter(dsl::shard_count.eq(shard_count)) + .filter(dsl::ps_generation.eq(ps_generation)) + .filter(dsl::sk_id.eq(sk_id)) + .execute(conn) // TODO: check update count? + .await?; + } + Err(_) => { + let SkPsDiscoveryPersistencePk { + tenant_id, + shard_number, + shard_count, + ps_generation, + sk_id, + } = pk; + + diesel::update(dsl::sk_ps_discovery) + .filter(dsl::tenant_id.eq(tenant_id)) + .filter(dsl::shard_number.eq(shard_number)) + .filter(dsl::shard_count.eq(shard_count)) + .filter(dsl::ps_generation.eq(ps_generation)) + .filter(dsl::sk_id.eq(sk_id)) + .set(( + dsl::retries.eq(dsl::retries.add(1)), // XXX: in split-brain situation we would bump twice... + dsl::last_retry_at.eq(diesel::dsl::now), + )) + .execute(conn) // TODO: check update count? + .await?; + } + } + Ok(()) + }) + }) + .await + } } pub(crate) fn load_certs() -> anyhow::Result> { @@ -2469,3 +2542,37 @@ pub(crate) struct TimelineImportPersistence { pub(crate) timeline_id: String, pub(crate) shard_statuses: serde_json::Value, } + +#[derive(Insertable, AsChangeset, Selectable, Clone)] +#[diesel(table_name = crate::schema::sk_ps_discovery)] +pub(crate) struct SkPsDiscoveryPersistencePk { + pub(crate) tenant_id: String, + pub(crate) shard_number: i32, + pub(crate) shard_count: i32, + pub(crate) ps_generation: i32, + pub(crate) sk_id: i64, +} + +#[derive(Queryable, Selectable, Clone, PartialEq, Eq)] +#[diesel(table_name = crate::schema::sk_ps_discovery)] +pub(crate) struct SkPsDiscoveryPersistence { + pub(crate) tenant_id: String, + pub(crate) shard_number: i32, + pub(crate) shard_count: i32, + pub(crate) ps_generation: i32, + pub(crate) sk_id: i64, + pub(crate) ps_id: i64, + pub(crate) created_at: chrono::DateTime, + pub(crate) retries: i32, + pub(crate) last_retry_at: Option>, +} + +impl SkPsDiscoveryPersistence { + pub(crate) fn tenant_shard_id(&self) -> Result { + Ok(TenantShardId { + tenant_id: TenantId::from_str(self.tenant_id.as_str())?, + shard_number: ShardNumber(self.shard_number as u8), + shard_count: ShardCount::new(self.shard_count as u8), + }) + } +} diff --git a/storage_controller/src/safekeeper_client.rs b/storage_controller/src/safekeeper_client.rs index 1f3ea96d96..7129d224e4 100644 --- a/storage_controller/src/safekeeper_client.rs +++ b/storage_controller/src/safekeeper_client.rs @@ -1,9 +1,11 @@ use safekeeper_api::models::{ - self, PullTimelineRequest, PullTimelineResponse, SafekeeperUtilization, TimelineCreateRequest, + self, PullTimelineRequest, PullTimelineResponse, SafekeeperUtilization, + TenantShardPageserverAttachments, TimelineCreateRequest, }; use safekeeper_client::mgmt_api::{Client, Result}; use utils::id::{NodeId, TenantId, TimelineId}; use utils::logging::SecretString; +use utils::shard::TenantShardId; use crate::metrics::PageserverRequestLabelGroup; @@ -164,4 +166,19 @@ impl SafekeeperClient { self.inner.utilization().await ) } + + pub async fn put_tenant_shard_pageserver_attachments( + &self, + tenant_shard_id: TenantShardId, + attachments: TenantShardPageserverAttachments, + ) -> Result<()> { + measured_request!( + "put_tenant_shard_pageserver_attachments", + crate::metrics::Method::Put, + &self.node_id_label, + self.inner + .put_tenant_shard_pageserver_attachments(tenant_shard_id, attachments) + .await + ) + } } diff --git a/storage_controller/src/schema.rs b/storage_controller/src/schema.rs index 2c2b3074fe..c828511271 100644 --- a/storage_controller/src/schema.rs +++ b/storage_controller/src/schema.rs @@ -60,13 +60,26 @@ diesel::table! { } } +diesel::table! { + sk_ps_discovery (tenant_id, shard_number, shard_count, ps_generation, sk_id) { + tenant_id -> Varchar, + shard_number -> Int4, + shard_count -> Int4, + ps_generation -> Int4, + sk_id -> Int8, + ps_id -> Int8, + created_at -> Timestamptz, + retries -> Int4, + last_retry_at -> Nullable, + } +} + diesel::table! { tenant_shards (tenant_id, shard_number, shard_count) { tenant_id -> Varchar, shard_number -> Int4, shard_count -> Int4, shard_stripe_size -> Int4, - // pageserver generation generation -> Nullable, generation_pageserver -> Nullable, placement_policy -> Varchar, @@ -93,7 +106,6 @@ diesel::table! { tenant_id -> Varchar, timeline_id -> Varchar, start_lsn -> PgLsn, - // sk config generation generation -> Int4, sk_set -> Array>, new_sk_set -> Nullable>>, @@ -102,24 +114,8 @@ diesel::table! { } } -// Operational table that contains pending notifications for Safekeepers -// about tenant shard pageserver-side attachments. -// Rows are removed when the notification was acknowledged by the Safekeeper. -diesel::table! { - use diesel::sql_types::*; - sk_ps_discovery(tenant_id, shard_number, shard_count, ps_generation, sk_id) { - tenant_id -> Varchar, - shard_number -> Int4, - shard_count -> Int4, - ps_generation -> Int4, - sk_id -> Int8, - // payload - ps_id -> Int8, - // tracking of reliable delivery - created_at -> Timestamptz, - last_attempt_at -> Nullable, - } -} +diesel::joinable!(sk_ps_discovery -> nodes (ps_id)); +diesel::joinable!(sk_ps_discovery -> safekeepers (sk_id)); diesel::allow_tables_to_appear_in_same_query!( controllers, @@ -127,8 +123,8 @@ diesel::allow_tables_to_appear_in_same_query!( nodes, safekeeper_timeline_pending_ops, safekeepers, + sk_ps_discovery, tenant_shards, timeline_imports, timelines, - sk_ps_discovery, ); diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 3d83328a86..4cb9a04c95 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -489,7 +489,6 @@ pub struct Service { inner: Arc>, config: Config, persistence: Arc, - sk_ps_discovery: sk_ps_discovery::ActorClient, compute_hook: Arc, result_tx: tokio::sync::mpsc::UnboundedSender, @@ -1194,6 +1193,16 @@ impl Service { } } } + + #[instrument(skip_all)] + async fn run_sk_ps_discovery(self: &Arc) { + self.startup_complete.clone().wait().await; + sk_ps_discovery::run( + self.clone(), + self.http_client.clone(), /* TODO this client is configured to openf resh TCP connection each time, very inefficient */ + ).await; + } + /// Heartbeat all storage nodes once in a while. #[instrument(skip_all)] async fn spawn_heartbeat_driver(&self) { @@ -1768,8 +1777,6 @@ impl Service { LeadershipStatus::Leader }; - let sk_ps_discovery = sk_ps_discovery::spawn(persistence.clone()); - let this = Arc::new(Self { inner: Arc::new(std::sync::RwLock::new(ServiceState::new( nodes, @@ -1782,7 +1789,6 @@ impl Service { ))), config: config.clone(), persistence, - sk_ps_discovery, compute_hook: Arc::new(ComputeHook::new(config.clone())?), result_tx, heartbeater_ps, @@ -1802,7 +1808,7 @@ impl Service { reconcilers_gate: Gate::default(), tenant_op_locks: Default::default(), node_op_locks: Default::default(), - http_client, + http_client: http_client.clone(), step_down_barrier: Default::default(), }); @@ -1870,6 +1876,15 @@ impl Service { } }); + tokio::task::spawn({ + let this = this.clone(); + let startup_complete = startup_complete.clone(); + async move { + startup_complete.wait().await; + this.run_sk_ps_discovery().await + } + }); + tokio::task::spawn({ let this = this.clone(); let startup_complete = startup_complete.clone(); diff --git a/storage_controller/src/service/safekeeper_service.rs b/storage_controller/src/service/safekeeper_service.rs index cd5ace449d..4027e08a3c 100644 --- a/storage_controller/src/service/safekeeper_service.rs +++ b/storage_controller/src/service/safekeeper_service.rs @@ -647,6 +647,11 @@ impl Service { sk.describe_response() } + pub(crate) fn get_safekeeper_object(&self, node_id: i64) -> Option { + let locked = self.inner.read().unwrap(); + locked.safekeepers.get(&NodeId(node_id as u64)).cloned() + } + pub(crate) async fn upsert_safekeeper( self: &Arc, record: crate::persistence::SafekeeperUpsert, diff --git a/storage_controller/src/service/sk_ps_discovery.rs b/storage_controller/src/service/sk_ps_discovery.rs index 7782bae2bb..bac8dec07a 100644 --- a/storage_controller/src/service/sk_ps_discovery.rs +++ b/storage_controller/src/service/sk_ps_discovery.rs @@ -1,33 +1,44 @@ -use std::sync::Arc; +use std::{ + collections::{HashMap, hash_map}, + sync::Arc, + time::Duration, +}; use anyhow::Context; -use futures::StreamExt; -use tracing::info; -use utils::id::TenantId; +use futures::{StreamExt, stream::FuturesUnordered}; +use safekeeper_api::models::{TenantShardPageserverAttachment, TenantShardPageserverAttachments}; +use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; +use tracing::{Instrument, Span, error, info, info_span}; +use utils::{ + generation::Generation, + id::{NodeId, TenantId}, + logging::SecretString, + shard::TenantShardId, +}; -use crate::persistence::Persistence; +use crate::{ + heartbeater::SafekeeperState, + persistence::{Persistence, SkPsDiscoveryPersistence, SkPsDiscoveryPersistencePk}, +}; -pub struct ActorClient { - tx: tokio::sync::mpsc::UnboundedSender, -} +use super::Service; struct Actor { + service: Arc, persistence: Arc, - rx: tokio::sync::mpsc::UnboundedReceiver, + http_client: reqwest::Client, } -#[derive(Debug)] -enum Message {} - -pub fn spawn(persistence: Arc) -> ActorClient { - let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); - let actor = Actor { persistence, rx }; - tokio::spawn(actor.run()); - ActorClient { tx } +pub async fn run(service: Arc, http_client: reqwest::Client) { + let actor = Actor { + persistence: service.persistence.clone(), + service, + http_client, // XXX: build our own client instead of getting Service's client; we probably want idle conn to each sk + }; + actor.run().await; } -impl ActorClient {} - impl Actor { async fn run(mut self) { loop { @@ -55,8 +66,37 @@ impl Actor { .await .context("listen to sk_ps_discovery")?; + let mut sync_full_ticker = tokio::time::interval(std::time::Duration::from_secs(5)); + + #[derive(PartialEq, Eq, Hash, Debug)] + struct TaskKey { + tenant_shard_id: TenantShardId, + ps_generation: Generation, + sk_id: NodeId, + } + struct Task { + work: SkPsDiscoveryPersistence, + cancel: CancellationToken, + join_handle: Option>, + } + impl<'a> TryFrom<&'a SkPsDiscoveryPersistence> for TaskKey { + type Error = hex::FromHexError; + fn try_from(value: &'a SkPsDiscoveryPersistence) -> Result { + Ok(Self { + tenant_shard_id: value.tenant_shard_id()?, + ps_generation: Generation::new(value.ps_generation as u32), + sk_id: NodeId(value.sk_id as u64), + }) + } + } + let mut tasks = HashMap::new(); + loop { tokio::select! { + biased; // control messages have higher priority, the periodic full tick, then subscriptions. + _ = sync_full_ticker.tick() => { + info!("rebuild"); + } maybe_res = subscription.next() => { match maybe_res { None => { @@ -65,6 +105,7 @@ impl Actor { Some(Ok(tenant_id)) => { let tenant_id: TenantId = tenant_id; info!(?tenant_id, "notify for tenant_id"); + // for now, just also rebuild everything } Some(Err(err)) => { let err: serde_json::Error = err; @@ -72,10 +113,169 @@ impl Actor { } } } - msg = self.rx.recv() => { - todo!("{msg:?}"); + } + + // get list of tasks from database + let mut new_tasks = self + .persistence + .get_all_sk_ps_discovery_work() + .await + .context("get_all_sk_ps_discovery_work")? + .into_iter() + .map(|work: SkPsDiscoveryPersistence| { + anyhow::Ok(( + TaskKey::try_from(&work)?, + Task { + work, + cancel: CancellationToken::new(), + join_handle: None, + }, + )) + }) + .collect::, _>>()?; + + // Carry over ongoing tasks + let mut ongoing_wait = FuturesUnordered::new(); + for ( + task_key, + Task { + work: ongoing_persistence, + cancel, + join_handle, + }, + ) in tasks.drain() + { + match new_tasks.entry(task_key) { + hash_map::Entry::Occupied(mut planned) => { + let Task { + work: planned_persistence, + cancel: planned_cancel, + join_handle: planned_jh, + } = planned.get_mut(); + assert!(planned_jh.is_none()); + if *planned_persistence == ongoing_persistence { + *planned_jh = join_handle; + *planned_cancel = cancel; + } else { + match join_handle { + Some(jh) => { + cancel.cancel(); + ongoing_wait.push(jh); + } + None => (), + } + } + } + hash_map::Entry::Vacant(_) => match join_handle { + Some(jh) => { + cancel.cancel(); + ongoing_wait.push(jh); + } + None => (), + }, + } + } + while let Some(_) = ongoing_wait.next().await {} + tasks = new_tasks; + + // Kick off new tasks + for (key, task) in tasks.iter_mut() { + if task.join_handle.is_none() { + task.join_handle = Some(tokio::spawn( + DeliveryAttempt { + cancel: task.cancel.clone(), + persistence: self.persistence.clone(), + service: self.service.clone(), + http_client: self.http_client.clone(), + work: task.work.clone(), + } + .run() + .instrument({ + let span = info_span!(parent: None, "sk_ps_discovery_delivery", ?key); + span.follows_from(Span::current()); + span + }), + )) } } } } } + +struct DeliveryAttempt { + cancel: CancellationToken, + persistence: Arc, + service: Arc, + http_client: reqwest::Client, + work: SkPsDiscoveryPersistence, +} + +impl DeliveryAttempt { + pub async fn run(self) { + let res = self.run0().await; + if self.cancel.is_cancelled() { + return; + } + if let Err(ref err) = res { + error!(?err, "attempt failed"); + } + let res = self + .persistence + .update_sk_ps_discovery_attempt( + SkPsDiscoveryPersistencePk { + tenant_id: self.work.tenant_id, + shard_number: self.work.shard_number, + shard_count: self.work.shard_count, + ps_generation: self.work.ps_generation, + sk_id: self.work.sk_id, + }, + res.map_err(|_| ()), + ) + .await; + if let Err(ref err) = res { + error!(?err, "persistence of attempt result failed"); + } + } + async fn run0(&self) -> anyhow::Result<()> { + let Some(sk) = self.service.get_safekeeper_object(self.work.sk_id) else { + anyhow::bail!("safekeeper object does not exist"); + }; + + match sk.availability() { + SafekeeperState::Available { .. } => (), + SafekeeperState::Offline => { + anyhow::bail!("safekeeper is offline"); + } + } + let tenant_shard_id = self.work.tenant_shard_id()?; + sk.with_client_retries( + |client| async move { + client + .put_tenant_shard_pageserver_attachments( + tenant_shard_id, + TenantShardPageserverAttachments { + attachments: vec![TenantShardPageserverAttachment { + ps_id: NodeId(self.work.ps_id as u64), + generation: Generation::new(self.work.ps_generation as u32), + }], + }, + ) + .await + }, + &self.http_client, + &self + .service + .config + .safekeeper_jwt_token + .clone() + .map(SecretString::from), + 1, + 3, + Duration::from_secs(1), + &self.cancel, + ) + .await?; + + Ok(()) + } +}