From 24d7c37e6ee7b730f983487351721f40922a9745 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Thu, 5 Jun 2025 20:53:14 +0200 Subject: [PATCH] neon_local timeline import: create timelines on safekeepers (#12138) neon_local's timeline import subcommand creates timelines manually, but doesn't create them on the safekeepers. If a test then tries to open an endpoint to read from the timeline, it will error in the new world with `--timelines-onto-safekeepers`. Therefore, if that flag is enabled, create the timelines on the safekeepers. Note that this import functionality is different from the fast import feature (https://github.com/neondatabase/neon/issues/10188, #11801). Part of #11670 As well as part of #11712 --- Cargo.lock | 1 + control_plane/Cargo.toml | 1 + control_plane/src/bin/neon_local.rs | 41 ++++++++++++++++++- control_plane/src/pageserver.rs | 12 ++++++ control_plane/src/safekeeper.rs | 63 ++++++++++++----------------- libs/safekeeper_api/src/models.rs | 2 +- safekeeper/client/src/mgmt_api.rs | 10 ++++- test_runner/regress/test_import.py | 3 ++ 8 files changed, 91 insertions(+), 42 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 588a63b6a3..5f71af118c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1445,6 +1445,7 @@ dependencies = [ "regex", "reqwest", "safekeeper_api", + "safekeeper_client", "scopeguard", "serde", "serde_json", diff --git a/control_plane/Cargo.toml b/control_plane/Cargo.toml index 62c039047f..bbaa3f12b9 100644 --- a/control_plane/Cargo.toml +++ b/control_plane/Cargo.toml @@ -36,6 +36,7 @@ pageserver_api.workspace = true pageserver_client.workspace = true postgres_backend.workspace = true safekeeper_api.workspace = true +safekeeper_client.workspace = true postgres_connection.workspace = true storage_broker.workspace = true http-utils.workspace = true diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index ef6985d697..76e33e4bff 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -45,7 +45,7 @@ use pageserver_api::models::{ use pageserver_api::shard::{DEFAULT_STRIPE_SIZE, ShardCount, ShardStripeSize, TenantShardId}; use postgres_backend::AuthType; use postgres_connection::parse_host_port; -use safekeeper_api::membership::SafekeeperGeneration; +use safekeeper_api::membership::{SafekeeperGeneration, SafekeeperId}; use safekeeper_api::{ DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT, DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT, @@ -1255,6 +1255,45 @@ async fn handle_timeline(cmd: &TimelineCmd, env: &mut local_env::LocalEnv) -> Re pageserver .timeline_import(tenant_id, timeline_id, base, pg_wal, args.pg_version) .await?; + if env.storage_controller.timelines_onto_safekeepers { + println!("Creating timeline on safekeeper ..."); + let timeline_info = pageserver + .timeline_info( + TenantShardId::unsharded(tenant_id), + timeline_id, + pageserver_client::mgmt_api::ForceAwaitLogicalSize::No, + ) + .await?; + let default_sk = SafekeeperNode::from_env(env, env.safekeepers.first().unwrap()); + let default_host = default_sk + .conf + .listen_addr + .clone() + .unwrap_or_else(|| "localhost".to_string()); + let mconf = safekeeper_api::membership::Configuration { + generation: SafekeeperGeneration::new(1), + members: safekeeper_api::membership::MemberSet { + m: vec![SafekeeperId { + host: default_host, + id: default_sk.conf.id, + pg_port: default_sk.conf.pg_port, + }], + }, + new_members: None, + }; + let pg_version = args.pg_version * 10000; + let req = safekeeper_api::models::TimelineCreateRequest { + tenant_id, + timeline_id, + mconf, + pg_version, + system_id: None, + wal_seg_size: None, + start_lsn: timeline_info.last_record_lsn, + commit_lsn: None, + }; + default_sk.create_timeline(&req).await?; + } env.register_branch_mapping(branch_name.to_string(), tenant_id, timeline_id)?; println!("Done"); } diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 0cf7ca184d..3b7c4ec39f 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -635,4 +635,16 @@ impl PageServerNode { Ok(()) } + pub async fn timeline_info( + &self, + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + force_await_logical_size: mgmt_api::ForceAwaitLogicalSize, + ) -> anyhow::Result { + let timeline_info = self + .http_client + .timeline_info(tenant_shard_id, timeline_id, force_await_logical_size) + .await?; + Ok(timeline_info) + } } diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index eec2c997e6..28d369a315 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -6,7 +6,6 @@ //! .neon/safekeepers/ //! ``` use std::error::Error as _; -use std::future::Future; use std::io::Write; use std::path::PathBuf; use std::time::Duration; @@ -14,9 +13,9 @@ use std::{io, result}; use anyhow::Context; use camino::Utf8PathBuf; -use http_utils::error::HttpErrorBody; use postgres_connection::PgConnectionConfig; -use reqwest::{IntoUrl, Method}; +use safekeeper_api::models::TimelineCreateRequest; +use safekeeper_client::mgmt_api; use thiserror::Error; use utils::auth::{Claims, Scope}; use utils::id::NodeId; @@ -35,25 +34,14 @@ pub enum SafekeeperHttpError { type Result = result::Result; -pub(crate) trait ResponseErrorMessageExt: Sized { - fn error_from_body(self) -> impl Future> + Send; -} - -impl ResponseErrorMessageExt for reqwest::Response { - async fn error_from_body(self) -> Result { - let status = self.status(); - if !(status.is_client_error() || status.is_server_error()) { - return Ok(self); - } - - // reqwest does not export its error construction utility functions, so let's craft the message ourselves - let url = self.url().to_owned(); - Err(SafekeeperHttpError::Response( - match self.json::().await { - Ok(err_body) => format!("Error: {}", err_body.msg), - Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url), - }, - )) +fn err_from_client_err(err: mgmt_api::Error) -> SafekeeperHttpError { + use mgmt_api::Error::*; + match err { + ApiError(_, str) => SafekeeperHttpError::Response(str), + Cancelled => SafekeeperHttpError::Response("Cancelled".to_owned()), + ReceiveBody(err) => SafekeeperHttpError::Transport(err), + ReceiveErrorBody(err) => SafekeeperHttpError::Response(err), + Timeout(str) => SafekeeperHttpError::Response(format!("timeout: {str}")), } } @@ -70,9 +58,8 @@ pub struct SafekeeperNode { pub pg_connection_config: PgConnectionConfig, pub env: LocalEnv, - pub http_client: reqwest::Client, + pub http_client: mgmt_api::Client, pub listen_addr: String, - pub http_base_url: String, } impl SafekeeperNode { @@ -82,13 +69,14 @@ impl SafekeeperNode { } else { "127.0.0.1".to_string() }; + let jwt = None; + let http_base_url = format!("http://{}:{}", listen_addr, conf.http_port); SafekeeperNode { id: conf.id, conf: conf.clone(), pg_connection_config: Self::safekeeper_connection_config(&listen_addr, conf.pg_port), env: env.clone(), - http_client: env.create_http_client(), - http_base_url: format!("http://{}:{}/v1", listen_addr, conf.http_port), + http_client: mgmt_api::Client::new(env.create_http_client(), http_base_url, jwt), listen_addr, } } @@ -278,20 +266,19 @@ impl SafekeeperNode { ) } - fn http_request(&self, method: Method, url: U) -> reqwest::RequestBuilder { - // TODO: authentication - //if self.env.auth_type == AuthType::NeonJWT { - // builder = builder.bearer_auth(&self.env.safekeeper_auth_token) - //} - self.http_client.request(method, url) + pub async fn check_status(&self) -> Result<()> { + self.http_client + .status() + .await + .map_err(err_from_client_err)?; + Ok(()) } - pub async fn check_status(&self) -> Result<()> { - self.http_request(Method::GET, format!("{}/{}", self.http_base_url, "status")) - .send() - .await? - .error_from_body() - .await?; + pub async fn create_timeline(&self, req: &TimelineCreateRequest) -> Result<()> { + self.http_client + .create_timeline(req) + .await + .map_err(err_from_client_err)?; Ok(()) } } diff --git a/libs/safekeeper_api/src/models.rs b/libs/safekeeper_api/src/models.rs index 8658dc4011..fd05f6fda3 100644 --- a/libs/safekeeper_api/src/models.rs +++ b/libs/safekeeper_api/src/models.rs @@ -13,7 +13,7 @@ use utils::pageserver_feedback::PageserverFeedback; use crate::membership::Configuration; use crate::{ServerInfo, Term}; -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct SafekeeperStatus { pub id: NodeId, } diff --git a/safekeeper/client/src/mgmt_api.rs b/safekeeper/client/src/mgmt_api.rs index b364ac8e48..2e46a7b529 100644 --- a/safekeeper/client/src/mgmt_api.rs +++ b/safekeeper/client/src/mgmt_api.rs @@ -8,8 +8,8 @@ use std::error::Error as _; use http_utils::error::HttpErrorBody; use reqwest::{IntoUrl, Method, StatusCode}; use safekeeper_api::models::{ - self, PullTimelineRequest, PullTimelineResponse, SafekeeperUtilization, TimelineCreateRequest, - TimelineStatus, + self, PullTimelineRequest, PullTimelineResponse, SafekeeperStatus, SafekeeperUtilization, + TimelineCreateRequest, TimelineStatus, }; use utils::id::{NodeId, TenantId, TimelineId}; use utils::logging::SecretString; @@ -183,6 +183,12 @@ impl Client { self.get(&uri).await } + pub async fn status(&self) -> Result { + let uri = format!("{}/v1/status", self.mgmt_api_endpoint); + let resp = self.get(&uri).await?; + resp.json().await.map_err(Error::ReceiveBody) + } + pub async fn utilization(&self) -> Result { let uri = format!("{}/v1/utilization", self.mgmt_api_endpoint); let resp = self.get(&uri).await?; diff --git a/test_runner/regress/test_import.py b/test_runner/regress/test_import.py index 55737c35f0..e1070a81e6 100644 --- a/test_runner/regress/test_import.py +++ b/test_runner/regress/test_import.py @@ -87,6 +87,9 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build # Set up pageserver for import neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS) + neon_env_builder.storage_controller_config = { + "timelines_onto_safekeepers": True, + } env = neon_env_builder.init_start() env.pageserver.tenant_create(tenant)