neon_local timeline import: create timelines on safekeepers (#12138)

neon_local's timeline import subcommand creates timelines manually, but
doesn't create them on the safekeepers. If a test then tries to open an
endpoint to read from the timeline, it will error in the new world with
`--timelines-onto-safekeepers`.

Therefore, if that flag is enabled, create the timelines on the
safekeepers.

Note that this import functionality is different from the fast import
feature (https://github.com/neondatabase/neon/issues/10188, #11801).

Part of #11670
As well as part of #11712
This commit is contained in:
Arpad Müller
2025-06-05 20:53:14 +02:00
committed by GitHub
parent f64eb0cbaf
commit 24d7c37e6e
8 changed files with 91 additions and 42 deletions

1
Cargo.lock generated
View File

@@ -1445,6 +1445,7 @@ dependencies = [
"regex", "regex",
"reqwest", "reqwest",
"safekeeper_api", "safekeeper_api",
"safekeeper_client",
"scopeguard", "scopeguard",
"serde", "serde",
"serde_json", "serde_json",

View File

@@ -36,6 +36,7 @@ pageserver_api.workspace = true
pageserver_client.workspace = true pageserver_client.workspace = true
postgres_backend.workspace = true postgres_backend.workspace = true
safekeeper_api.workspace = true safekeeper_api.workspace = true
safekeeper_client.workspace = true
postgres_connection.workspace = true postgres_connection.workspace = true
storage_broker.workspace = true storage_broker.workspace = true
http-utils.workspace = true http-utils.workspace = true

View File

@@ -45,7 +45,7 @@ use pageserver_api::models::{
use pageserver_api::shard::{DEFAULT_STRIPE_SIZE, ShardCount, ShardStripeSize, TenantShardId}; use pageserver_api::shard::{DEFAULT_STRIPE_SIZE, ShardCount, ShardStripeSize, TenantShardId};
use postgres_backend::AuthType; use postgres_backend::AuthType;
use postgres_connection::parse_host_port; use postgres_connection::parse_host_port;
use safekeeper_api::membership::SafekeeperGeneration; use safekeeper_api::membership::{SafekeeperGeneration, SafekeeperId};
use safekeeper_api::{ use safekeeper_api::{
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT, DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT,
DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT, DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT,
@@ -1255,6 +1255,45 @@ async fn handle_timeline(cmd: &TimelineCmd, env: &mut local_env::LocalEnv) -> Re
pageserver pageserver
.timeline_import(tenant_id, timeline_id, base, pg_wal, args.pg_version) .timeline_import(tenant_id, timeline_id, base, pg_wal, args.pg_version)
.await?; .await?;
if env.storage_controller.timelines_onto_safekeepers {
println!("Creating timeline on safekeeper ...");
let timeline_info = pageserver
.timeline_info(
TenantShardId::unsharded(tenant_id),
timeline_id,
pageserver_client::mgmt_api::ForceAwaitLogicalSize::No,
)
.await?;
let default_sk = SafekeeperNode::from_env(env, env.safekeepers.first().unwrap());
let default_host = default_sk
.conf
.listen_addr
.clone()
.unwrap_or_else(|| "localhost".to_string());
let mconf = safekeeper_api::membership::Configuration {
generation: SafekeeperGeneration::new(1),
members: safekeeper_api::membership::MemberSet {
m: vec![SafekeeperId {
host: default_host,
id: default_sk.conf.id,
pg_port: default_sk.conf.pg_port,
}],
},
new_members: None,
};
let pg_version = args.pg_version * 10000;
let req = safekeeper_api::models::TimelineCreateRequest {
tenant_id,
timeline_id,
mconf,
pg_version,
system_id: None,
wal_seg_size: None,
start_lsn: timeline_info.last_record_lsn,
commit_lsn: None,
};
default_sk.create_timeline(&req).await?;
}
env.register_branch_mapping(branch_name.to_string(), tenant_id, timeline_id)?; env.register_branch_mapping(branch_name.to_string(), tenant_id, timeline_id)?;
println!("Done"); println!("Done");
} }

View File

@@ -635,4 +635,16 @@ impl PageServerNode {
Ok(()) Ok(())
} }
pub async fn timeline_info(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
force_await_logical_size: mgmt_api::ForceAwaitLogicalSize,
) -> anyhow::Result<TimelineInfo> {
let timeline_info = self
.http_client
.timeline_info(tenant_shard_id, timeline_id, force_await_logical_size)
.await?;
Ok(timeline_info)
}
} }

View File

@@ -6,7 +6,6 @@
//! .neon/safekeepers/<safekeeper id> //! .neon/safekeepers/<safekeeper id>
//! ``` //! ```
use std::error::Error as _; use std::error::Error as _;
use std::future::Future;
use std::io::Write; use std::io::Write;
use std::path::PathBuf; use std::path::PathBuf;
use std::time::Duration; use std::time::Duration;
@@ -14,9 +13,9 @@ use std::{io, result};
use anyhow::Context; use anyhow::Context;
use camino::Utf8PathBuf; use camino::Utf8PathBuf;
use http_utils::error::HttpErrorBody;
use postgres_connection::PgConnectionConfig; use postgres_connection::PgConnectionConfig;
use reqwest::{IntoUrl, Method}; use safekeeper_api::models::TimelineCreateRequest;
use safekeeper_client::mgmt_api;
use thiserror::Error; use thiserror::Error;
use utils::auth::{Claims, Scope}; use utils::auth::{Claims, Scope};
use utils::id::NodeId; use utils::id::NodeId;
@@ -35,25 +34,14 @@ pub enum SafekeeperHttpError {
type Result<T> = result::Result<T, SafekeeperHttpError>; type Result<T> = result::Result<T, SafekeeperHttpError>;
pub(crate) trait ResponseErrorMessageExt: Sized { fn err_from_client_err(err: mgmt_api::Error) -> SafekeeperHttpError {
fn error_from_body(self) -> impl Future<Output = Result<Self>> + Send; use mgmt_api::Error::*;
} match err {
ApiError(_, str) => SafekeeperHttpError::Response(str),
impl ResponseErrorMessageExt for reqwest::Response { Cancelled => SafekeeperHttpError::Response("Cancelled".to_owned()),
async fn error_from_body(self) -> Result<Self> { ReceiveBody(err) => SafekeeperHttpError::Transport(err),
let status = self.status(); ReceiveErrorBody(err) => SafekeeperHttpError::Response(err),
if !(status.is_client_error() || status.is_server_error()) { Timeout(str) => SafekeeperHttpError::Response(format!("timeout: {str}")),
return Ok(self);
}
// reqwest does not export its error construction utility functions, so let's craft the message ourselves
let url = self.url().to_owned();
Err(SafekeeperHttpError::Response(
match self.json::<HttpErrorBody>().await {
Ok(err_body) => format!("Error: {}", err_body.msg),
Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
},
))
} }
} }
@@ -70,9 +58,8 @@ pub struct SafekeeperNode {
pub pg_connection_config: PgConnectionConfig, pub pg_connection_config: PgConnectionConfig,
pub env: LocalEnv, pub env: LocalEnv,
pub http_client: reqwest::Client, pub http_client: mgmt_api::Client,
pub listen_addr: String, pub listen_addr: String,
pub http_base_url: String,
} }
impl SafekeeperNode { impl SafekeeperNode {
@@ -82,13 +69,14 @@ impl SafekeeperNode {
} else { } else {
"127.0.0.1".to_string() "127.0.0.1".to_string()
}; };
let jwt = None;
let http_base_url = format!("http://{}:{}", listen_addr, conf.http_port);
SafekeeperNode { SafekeeperNode {
id: conf.id, id: conf.id,
conf: conf.clone(), conf: conf.clone(),
pg_connection_config: Self::safekeeper_connection_config(&listen_addr, conf.pg_port), pg_connection_config: Self::safekeeper_connection_config(&listen_addr, conf.pg_port),
env: env.clone(), env: env.clone(),
http_client: env.create_http_client(), http_client: mgmt_api::Client::new(env.create_http_client(), http_base_url, jwt),
http_base_url: format!("http://{}:{}/v1", listen_addr, conf.http_port),
listen_addr, listen_addr,
} }
} }
@@ -278,20 +266,19 @@ impl SafekeeperNode {
) )
} }
fn http_request<U: IntoUrl>(&self, method: Method, url: U) -> reqwest::RequestBuilder { pub async fn check_status(&self) -> Result<()> {
// TODO: authentication self.http_client
//if self.env.auth_type == AuthType::NeonJWT { .status()
// builder = builder.bearer_auth(&self.env.safekeeper_auth_token) .await
//} .map_err(err_from_client_err)?;
self.http_client.request(method, url) Ok(())
} }
pub async fn check_status(&self) -> Result<()> { pub async fn create_timeline(&self, req: &TimelineCreateRequest) -> Result<()> {
self.http_request(Method::GET, format!("{}/{}", self.http_base_url, "status")) self.http_client
.send() .create_timeline(req)
.await? .await
.error_from_body() .map_err(err_from_client_err)?;
.await?;
Ok(()) Ok(())
} }
} }

View File

@@ -13,7 +13,7 @@ use utils::pageserver_feedback::PageserverFeedback;
use crate::membership::Configuration; use crate::membership::Configuration;
use crate::{ServerInfo, Term}; use crate::{ServerInfo, Term};
#[derive(Debug, Serialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct SafekeeperStatus { pub struct SafekeeperStatus {
pub id: NodeId, pub id: NodeId,
} }

View File

@@ -8,8 +8,8 @@ use std::error::Error as _;
use http_utils::error::HttpErrorBody; use http_utils::error::HttpErrorBody;
use reqwest::{IntoUrl, Method, StatusCode}; use reqwest::{IntoUrl, Method, StatusCode};
use safekeeper_api::models::{ use safekeeper_api::models::{
self, PullTimelineRequest, PullTimelineResponse, SafekeeperUtilization, TimelineCreateRequest, self, PullTimelineRequest, PullTimelineResponse, SafekeeperStatus, SafekeeperUtilization,
TimelineStatus, TimelineCreateRequest, TimelineStatus,
}; };
use utils::id::{NodeId, TenantId, TimelineId}; use utils::id::{NodeId, TenantId, TimelineId};
use utils::logging::SecretString; use utils::logging::SecretString;
@@ -183,6 +183,12 @@ impl Client {
self.get(&uri).await self.get(&uri).await
} }
pub async fn status(&self) -> Result<SafekeeperStatus> {
let uri = format!("{}/v1/status", self.mgmt_api_endpoint);
let resp = self.get(&uri).await?;
resp.json().await.map_err(Error::ReceiveBody)
}
pub async fn utilization(&self) -> Result<SafekeeperUtilization> { pub async fn utilization(&self) -> Result<SafekeeperUtilization> {
let uri = format!("{}/v1/utilization", self.mgmt_api_endpoint); let uri = format!("{}/v1/utilization", self.mgmt_api_endpoint);
let resp = self.get(&uri).await?; let resp = self.get(&uri).await?;

View File

@@ -87,6 +87,9 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
# Set up pageserver for import # Set up pageserver for import
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS) neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": True,
}
env = neon_env_builder.init_start() env = neon_env_builder.init_start()
env.pageserver.tenant_create(tenant) env.pageserver.tenant_create(tenant)