From c34d36d8a270b9a4910d4d26210e7c608288f079 Mon Sep 17 00:00:00 2001 From: Dmitrii Kovalkov <34828390+DimasKovas@users.noreply.github.com> Date: Fri, 11 Jul 2025 14:49:37 +0400 Subject: [PATCH] storcon_cli: timeline-safekeeper-migrate and timeline-locate subcommands (#12548) ## Problem We have a `safekeeper_migrate` handler, but no subcommand in `storcon_cli`. Same for `/:timeline_id/locate` for identifying current set of safekeepers. - Closes: https://github.com/neondatabase/neon/issues/12395 ## Summary of changes - Add `timeline-safekeeper-migrate` and `timeline-locate` subcommands to `storcon_cli` --- Cargo.lock | 1 + control_plane/src/broker.rs | 2 +- control_plane/src/pageserver.rs | 2 +- control_plane/src/safekeeper.rs | 2 +- control_plane/src/storage_controller.rs | 2 +- control_plane/storcon_cli/Cargo.toml | 1 + control_plane/storcon_cli/src/main.rs | 57 ++++++++++++++++++- libs/safekeeper_api/src/models.rs | 11 +++- .../src/service/safekeeper_service.rs | 10 +--- 9 files changed, 73 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c528354053..025f4e4116 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6991,6 +6991,7 @@ dependencies = [ "pageserver_api", "pageserver_client", "reqwest", + "safekeeper_api", "serde_json", "storage_controller_client", "tokio", diff --git a/control_plane/src/broker.rs b/control_plane/src/broker.rs index f43f459636..988b08e875 100644 --- a/control_plane/src/broker.rs +++ b/control_plane/src/broker.rs @@ -36,7 +36,7 @@ impl StorageBroker { pub async fn start(&self, retry_timeout: &Duration) -> anyhow::Result<()> { let broker = &self.env.broker; - print!("Starting neon broker at {}", broker.client_url()); + println!("Starting neon broker at {}", broker.client_url()); let mut args = Vec::new(); diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 3673d1f4f2..843ead807d 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -303,7 +303,7 @@ impl PageServerNode { async fn start_node(&self, retry_timeout: &Duration) -> anyhow::Result<()> { // TODO: using a thread here because start_process() is not async but we need to call check_status() let datadir = self.repo_path(); - print!( + println!( "Starting pageserver node {} at '{}' in {:?}, retrying for {:?}", self.conf.id, self.pg_connection_config.raw_address(), diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index da9dafd8e9..2ba2f3ebe4 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -127,7 +127,7 @@ impl SafekeeperNode { extra_opts: &[String], retry_timeout: &Duration, ) -> anyhow::Result<()> { - print!( + println!( "Starting safekeeper at '{}' in '{}', retrying for {:?}", self.pg_connection_config.raw_address(), self.datadir_path().display(), diff --git a/control_plane/src/storage_controller.rs b/control_plane/src/storage_controller.rs index bb83a6319c..dc6c82f504 100644 --- a/control_plane/src/storage_controller.rs +++ b/control_plane/src/storage_controller.rs @@ -660,7 +660,7 @@ impl StorageController { )); } - println!("Starting storage controller"); + println!("Starting storage controller at {scheme}://{host}:{listen_port}"); background_process::start_process( COMMAND, diff --git a/control_plane/storcon_cli/Cargo.toml b/control_plane/storcon_cli/Cargo.toml index ce89116691..61d48b2469 100644 --- a/control_plane/storcon_cli/Cargo.toml +++ b/control_plane/storcon_cli/Cargo.toml @@ -14,6 +14,7 @@ humantime.workspace = true pageserver_api.workspace = true pageserver_client.workspace = true reqwest.workspace = true +safekeeper_api.workspace=true serde_json = { workspace = true, features = ["raw_value"] } storage_controller_client.workspace = true tokio.workspace = true diff --git a/control_plane/storcon_cli/src/main.rs b/control_plane/storcon_cli/src/main.rs index 701c4b3b2e..24fd34a87a 100644 --- a/control_plane/storcon_cli/src/main.rs +++ b/control_plane/storcon_cli/src/main.rs @@ -11,7 +11,7 @@ use pageserver_api::controller_api::{ PlacementPolicy, SafekeeperDescribeResponse, SafekeeperSchedulingPolicyRequest, ShardSchedulingPolicy, ShardsPreferredAzsRequest, ShardsPreferredAzsResponse, SkSchedulingPolicy, TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest, - TenantShardMigrateRequest, TenantShardMigrateResponse, + TenantShardMigrateRequest, TenantShardMigrateResponse, TimelineSafekeeperMigrateRequest, }; use pageserver_api::models::{ EvictionPolicy, EvictionPolicyLayerAccessThreshold, ShardParameters, TenantConfig, @@ -21,6 +21,7 @@ use pageserver_api::models::{ use pageserver_api::shard::{ShardStripeSize, TenantShardId}; use pageserver_client::mgmt_api::{self}; use reqwest::{Certificate, Method, StatusCode, Url}; +use safekeeper_api::models::TimelineLocateResponse; use storage_controller_client::control_api::Client; use utils::id::{NodeId, TenantId, TimelineId}; @@ -279,6 +280,23 @@ enum Command { #[arg(long)] concurrency: Option, }, + /// Locate safekeepers for a timeline from the storcon DB. + TimelineLocate { + #[arg(long)] + tenant_id: TenantId, + #[arg(long)] + timeline_id: TimelineId, + }, + /// Migrate a timeline to a new set of safekeepers + TimelineSafekeeperMigrate { + #[arg(long)] + tenant_id: TenantId, + #[arg(long)] + timeline_id: TimelineId, + /// Example: --new-sk-set 1,2,3 + #[arg(long, required = true, value_delimiter = ',')] + new_sk_set: Vec, + }, } #[derive(Parser)] @@ -1324,7 +1342,7 @@ async fn main() -> anyhow::Result<()> { concurrency, } => { let mut path = format!( - "/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/download_heatmap_layers", + "v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/download_heatmap_layers", ); if let Some(c) = concurrency { @@ -1335,6 +1353,41 @@ async fn main() -> anyhow::Result<()> { .dispatch::<(), ()>(Method::POST, path, None) .await?; } + Command::TimelineLocate { + tenant_id, + timeline_id, + } => { + let path = format!("debug/v1/tenant/{tenant_id}/timeline/{timeline_id}/locate"); + + let resp = storcon_client + .dispatch::<(), TimelineLocateResponse>(Method::GET, path, None) + .await?; + + let sk_set = resp.sk_set.iter().map(|id| id.0 as i64).collect::>(); + let new_sk_set = resp + .new_sk_set + .as_ref() + .map(|ids| ids.iter().map(|id| id.0 as i64).collect::>()); + + println!("generation = {}", resp.generation); + println!("sk_set = {sk_set:?}"); + println!("new_sk_set = {new_sk_set:?}"); + } + Command::TimelineSafekeeperMigrate { + tenant_id, + timeline_id, + new_sk_set, + } => { + let path = format!("v1/tenant/{tenant_id}/timeline/{timeline_id}/safekeeper_migrate"); + + storcon_client + .dispatch::<_, ()>( + Method::POST, + path, + Some(TimelineSafekeeperMigrateRequest { new_sk_set }), + ) + .await?; + } } Ok(()) diff --git a/libs/safekeeper_api/src/models.rs b/libs/safekeeper_api/src/models.rs index e87232474b..59e112654b 100644 --- a/libs/safekeeper_api/src/models.rs +++ b/libs/safekeeper_api/src/models.rs @@ -11,7 +11,7 @@ use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId}; use utils::lsn::Lsn; use utils::pageserver_feedback::PageserverFeedback; -use crate::membership::Configuration; +use crate::membership::{Configuration, SafekeeperGeneration}; use crate::{ServerInfo, Term}; #[derive(Debug, Serialize, Deserialize)] @@ -311,3 +311,12 @@ pub struct PullTimelineResponse { pub safekeeper_host: Option, // TODO: add more fields? } + +/// Response to a timeline locate request. +/// Storcon-only API. +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct TimelineLocateResponse { + pub generation: SafekeeperGeneration, + pub sk_set: Vec, + pub new_sk_set: Option>, +} diff --git a/storage_controller/src/service/safekeeper_service.rs b/storage_controller/src/service/safekeeper_service.rs index d7179372b2..42ddf81e3e 100644 --- a/storage_controller/src/service/safekeeper_service.rs +++ b/storage_controller/src/service/safekeeper_service.rs @@ -25,7 +25,8 @@ use pageserver_api::models::{SafekeeperInfo, SafekeepersInfo, TimelineInfo}; use safekeeper_api::PgVersionId; use safekeeper_api::membership::{self, MemberSet, SafekeeperGeneration}; use safekeeper_api::models::{ - PullTimelineRequest, TimelineMembershipSwitchRequest, TimelineMembershipSwitchResponse, + PullTimelineRequest, TimelineLocateResponse, TimelineMembershipSwitchRequest, + TimelineMembershipSwitchResponse, }; use safekeeper_api::{INITIAL_TERM, Term}; use safekeeper_client::mgmt_api; @@ -37,13 +38,6 @@ use utils::lsn::Lsn; use super::Service; -#[derive(serde::Serialize, serde::Deserialize, Clone)] -pub struct TimelineLocateResponse { - pub generation: SafekeeperGeneration, - pub sk_set: Vec, - pub new_sk_set: Option>, -} - impl Service { fn make_member_set(safekeepers: &[Safekeeper]) -> Result { let members = safekeepers