mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 14:02:55 +00:00
storage controller: register nodes in re-attach request (#7040)
## Problem Currently we manually register nodes with the storage controller, and use a script during deploy to register with the cloud control plane. Rather than extend that script further, nodes should just register on startup. ## Summary of changes - Extend the re-attach request to include an optional NodeRegisterRequest - If the `register` field is set, handle it like a normal node registration before executing the normal re-attach work. - Update tests/neon_local that used to rely on doing an explicit register step that could be enabled/disabled. --------- Co-authored-by: Christian Schwarz <christian@neon.tech>
This commit is contained in:
@@ -922,6 +922,10 @@ impl Service {
|
||||
&self,
|
||||
reattach_req: ReAttachRequest,
|
||||
) -> Result<ReAttachResponse, ApiError> {
|
||||
if let Some(register_req) = reattach_req.register {
|
||||
self.node_register(register_req).await?;
|
||||
}
|
||||
|
||||
// Take a re-attach as indication that the node is available: this is a precursor to proper
|
||||
// heartbeating in https://github.com/neondatabase/neon/issues/6844
|
||||
self.node_configure(NodeConfigureRequest {
|
||||
|
||||
@@ -1100,9 +1100,8 @@ fn get_pageserver(env: &local_env::LocalEnv, args: &ArgMatches) -> Result<PageSe
|
||||
async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
|
||||
match sub_match.subcommand() {
|
||||
Some(("start", subcommand_args)) => {
|
||||
let register = subcommand_args.get_one::<bool>("register").unwrap_or(&true);
|
||||
if let Err(e) = get_pageserver(env, subcommand_args)?
|
||||
.start(&pageserver_config_overrides(subcommand_args), *register)
|
||||
.start(&pageserver_config_overrides(subcommand_args))
|
||||
.await
|
||||
{
|
||||
eprintln!("pageserver start failed: {e}");
|
||||
@@ -1131,7 +1130,7 @@ async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) ->
|
||||
}
|
||||
|
||||
if let Err(e) = pageserver
|
||||
.start(&pageserver_config_overrides(subcommand_args), false)
|
||||
.start(&pageserver_config_overrides(subcommand_args))
|
||||
.await
|
||||
{
|
||||
eprintln!("pageserver start failed: {e}");
|
||||
@@ -1293,7 +1292,7 @@ async fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) ->
|
||||
for ps_conf in &env.pageservers {
|
||||
let pageserver = PageServerNode::from_env(env, ps_conf);
|
||||
if let Err(e) = pageserver
|
||||
.start(&pageserver_config_overrides(sub_match), true)
|
||||
.start(&pageserver_config_overrides(sub_match))
|
||||
.await
|
||||
{
|
||||
eprintln!("pageserver {} start failed: {:#}", ps_conf.id, e);
|
||||
@@ -1596,11 +1595,7 @@ fn cli() -> Command {
|
||||
.subcommand(Command::new("status"))
|
||||
.subcommand(Command::new("start")
|
||||
.about("Start local pageserver")
|
||||
.arg(pageserver_config_args.clone()).arg(Arg::new("register")
|
||||
.long("register")
|
||||
.default_value("true").required(false)
|
||||
.value_parser(value_parser!(bool))
|
||||
.value_name("register"))
|
||||
.arg(pageserver_config_args.clone())
|
||||
)
|
||||
.subcommand(Command::new("stop")
|
||||
.about("Stop local pageserver")
|
||||
|
||||
@@ -774,7 +774,10 @@ impl Endpoint {
|
||||
spec.shard_stripe_size = stripe_size.map(|s| s.0 as usize);
|
||||
}
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
let client = reqwest::Client::builder()
|
||||
.timeout(Duration::from_secs(30))
|
||||
.build()
|
||||
.unwrap();
|
||||
let response = client
|
||||
.post(format!(
|
||||
"http://{}:{}/configure",
|
||||
|
||||
@@ -17,7 +17,6 @@ use std::time::Duration;
|
||||
use anyhow::{bail, Context};
|
||||
use camino::Utf8PathBuf;
|
||||
use futures::SinkExt;
|
||||
use pageserver_api::controller_api::NodeRegisterRequest;
|
||||
use pageserver_api::models::{
|
||||
self, LocationConfig, ShardParameters, TenantHistorySize, TenantInfo, TimelineInfo,
|
||||
};
|
||||
@@ -32,7 +31,6 @@ use utils::{
|
||||
};
|
||||
|
||||
use crate::local_env::PageServerConf;
|
||||
use crate::storage_controller::StorageController;
|
||||
use crate::{background_process, local_env::LocalEnv};
|
||||
|
||||
/// Directory within .neon which will be used by default for LocalFs remote storage.
|
||||
@@ -163,8 +161,8 @@ impl PageServerNode {
|
||||
.expect("non-Unicode path")
|
||||
}
|
||||
|
||||
pub async fn start(&self, config_overrides: &[&str], register: bool) -> anyhow::Result<()> {
|
||||
self.start_node(config_overrides, false, register).await
|
||||
pub async fn start(&self, config_overrides: &[&str]) -> anyhow::Result<()> {
|
||||
self.start_node(config_overrides, false).await
|
||||
}
|
||||
|
||||
fn pageserver_init(&self, config_overrides: &[&str]) -> anyhow::Result<()> {
|
||||
@@ -202,6 +200,28 @@ impl PageServerNode {
|
||||
String::from_utf8_lossy(&init_output.stderr),
|
||||
);
|
||||
|
||||
// Write metadata file, used by pageserver on startup to register itself with
|
||||
// the storage controller
|
||||
let metadata_path = datadir.join("metadata.json");
|
||||
|
||||
let (_http_host, http_port) =
|
||||
parse_host_port(&self.conf.listen_http_addr).expect("Unable to parse listen_http_addr");
|
||||
let http_port = http_port.unwrap_or(9898);
|
||||
// Intentionally hand-craft JSON: this acts as an implicit format compat test
|
||||
// in case the pageserver-side structure is edited, and reflects the real life
|
||||
// situation: the metadata is written by some other script.
|
||||
std::fs::write(
|
||||
metadata_path,
|
||||
serde_json::to_vec(&serde_json::json!({
|
||||
"host": "localhost",
|
||||
"port": self.pg_connection_config.port(),
|
||||
"http_host": "localhost",
|
||||
"http_port": http_port,
|
||||
}))
|
||||
.unwrap(),
|
||||
)
|
||||
.expect("Failed to write metadata file");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -209,27 +229,7 @@ impl PageServerNode {
|
||||
&self,
|
||||
config_overrides: &[&str],
|
||||
update_config: bool,
|
||||
register: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
// Register the node with the storage controller before starting pageserver: pageserver must be registered to
|
||||
// successfully call /re-attach and finish starting up.
|
||||
if register {
|
||||
let storage_controller = StorageController::from_env(&self.env);
|
||||
let (pg_host, pg_port) =
|
||||
parse_host_port(&self.conf.listen_pg_addr).expect("Unable to parse listen_pg_addr");
|
||||
let (http_host, http_port) = parse_host_port(&self.conf.listen_http_addr)
|
||||
.expect("Unable to parse listen_http_addr");
|
||||
storage_controller
|
||||
.node_register(NodeRegisterRequest {
|
||||
node_id: self.conf.id,
|
||||
listen_pg_addr: pg_host.to_string(),
|
||||
listen_pg_port: pg_port.unwrap_or(5432),
|
||||
listen_http_addr: http_host.to_string(),
|
||||
listen_http_port: http_port.unwrap_or(80),
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
|
||||
// TODO: using a thread here because start_process() is not async but we need to call check_status()
|
||||
let datadir = self.repo_path();
|
||||
print!(
|
||||
|
||||
@@ -6,11 +6,18 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::id::NodeId;
|
||||
|
||||
use crate::shard::TenantShardId;
|
||||
use crate::{controller_api::NodeRegisterRequest, shard::TenantShardId};
|
||||
|
||||
/// Upcall message sent by the pageserver to the configured `control_plane_api` on
|
||||
/// startup.
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct ReAttachRequest {
|
||||
pub node_id: NodeId,
|
||||
|
||||
/// Optional inline self-registration: this is useful with the storage controller,
|
||||
/// if the node already has a node_id set.
|
||||
#[serde(skip_serializing_if = "Option::is_none", default)]
|
||||
pub register: Option<NodeRegisterRequest>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
|
||||
@@ -7,8 +7,9 @@
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use remote_storage::{RemotePath, RemoteStorageConfig};
|
||||
use serde;
|
||||
use serde::de::IntoDeserializer;
|
||||
use std::env;
|
||||
use std::{collections::HashMap, env};
|
||||
use storage_broker::Uri;
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
use utils::id::ConnectionId;
|
||||
@@ -304,6 +305,26 @@ impl<T> BuilderValue<T> {
|
||||
}
|
||||
}
|
||||
|
||||
// Certain metadata (e.g. externally-addressable name, AZ) is delivered
|
||||
// as a separate structure. This information is not neeed by the pageserver
|
||||
// itself, it is only used for registering the pageserver with the control
|
||||
// plane and/or storage controller.
|
||||
//
|
||||
#[derive(serde::Deserialize)]
|
||||
pub(crate) struct NodeMetadata {
|
||||
#[serde(rename = "host")]
|
||||
pub(crate) postgres_host: String,
|
||||
#[serde(rename = "port")]
|
||||
pub(crate) postgres_port: u16,
|
||||
pub(crate) http_host: String,
|
||||
pub(crate) http_port: u16,
|
||||
|
||||
// Deployment tools may write fields to the metadata file beyond what we
|
||||
// use in this type: this type intentionally only names fields that require.
|
||||
#[serde(flatten)]
|
||||
pub(crate) other: HashMap<String, serde_json::Value>,
|
||||
}
|
||||
|
||||
// needed to simplify config construction
|
||||
struct PageServerConfigBuilder {
|
||||
listen_pg_addr: BuilderValue<String>,
|
||||
@@ -761,6 +782,10 @@ impl PageServerConf {
|
||||
self.workdir.join("deletion")
|
||||
}
|
||||
|
||||
pub fn metadata_path(&self) -> Utf8PathBuf {
|
||||
self.workdir.join("metadata.json")
|
||||
}
|
||||
|
||||
pub fn deletion_list_path(&self, sequence: u64) -> Utf8PathBuf {
|
||||
// Encode a version in the filename, so that if we ever switch away from JSON we can
|
||||
// increment this.
|
||||
|
||||
@@ -2,6 +2,7 @@ use std::collections::HashMap;
|
||||
|
||||
use futures::Future;
|
||||
use pageserver_api::{
|
||||
controller_api::NodeRegisterRequest,
|
||||
shard::TenantShardId,
|
||||
upcall_api::{
|
||||
ReAttachRequest, ReAttachResponse, ValidateRequest, ValidateRequestTenant, ValidateResponse,
|
||||
@@ -12,7 +13,10 @@ use tokio_util::sync::CancellationToken;
|
||||
use url::Url;
|
||||
use utils::{backoff, generation::Generation, id::NodeId};
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::{
|
||||
config::{NodeMetadata, PageServerConf},
|
||||
virtual_file::on_fatal_io_error,
|
||||
};
|
||||
|
||||
/// The Pageserver's client for using the control plane API: this is a small subset
|
||||
/// of the overall control plane API, for dealing with generations (see docs/rfcs/025-generation-numbers.md)
|
||||
@@ -32,6 +36,7 @@ pub enum RetryForeverError {
|
||||
pub trait ControlPlaneGenerationsApi {
|
||||
fn re_attach(
|
||||
&self,
|
||||
conf: &PageServerConf,
|
||||
) -> impl Future<Output = Result<HashMap<TenantShardId, Generation>, RetryForeverError>> + Send;
|
||||
fn validate(
|
||||
&self,
|
||||
@@ -110,13 +115,59 @@ impl ControlPlaneClient {
|
||||
|
||||
impl ControlPlaneGenerationsApi for ControlPlaneClient {
|
||||
/// Block until we get a successful response, or error out if we are shut down
|
||||
async fn re_attach(&self) -> Result<HashMap<TenantShardId, Generation>, RetryForeverError> {
|
||||
async fn re_attach(
|
||||
&self,
|
||||
conf: &PageServerConf,
|
||||
) -> Result<HashMap<TenantShardId, Generation>, RetryForeverError> {
|
||||
let re_attach_path = self
|
||||
.base_url
|
||||
.join("re-attach")
|
||||
.expect("Failed to build re-attach path");
|
||||
|
||||
// Include registration content in the re-attach request if a metadata file is readable
|
||||
let metadata_path = conf.metadata_path();
|
||||
let register = match tokio::fs::read_to_string(&metadata_path).await {
|
||||
Ok(metadata_str) => match serde_json::from_str::<NodeMetadata>(&metadata_str) {
|
||||
Ok(m) => {
|
||||
// Since we run one time at startup, be generous in our logging and
|
||||
// dump all metadata.
|
||||
tracing::info!(
|
||||
"Loaded node metadata: postgres {}:{}, http {}:{}, other fields: {:?}",
|
||||
m.postgres_host,
|
||||
m.postgres_port,
|
||||
m.http_host,
|
||||
m.http_port,
|
||||
m.other
|
||||
);
|
||||
|
||||
Some(NodeRegisterRequest {
|
||||
node_id: conf.id,
|
||||
listen_pg_addr: m.postgres_host,
|
||||
listen_pg_port: m.postgres_port,
|
||||
listen_http_addr: m.http_host,
|
||||
listen_http_port: m.http_port,
|
||||
})
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Unreadable metadata in {metadata_path}: {e}");
|
||||
None
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
if e.kind() == std::io::ErrorKind::NotFound {
|
||||
// This is legal: we may have been deployed with some external script
|
||||
// doing registration for us.
|
||||
tracing::info!("Metadata file not found at {metadata_path}");
|
||||
} else {
|
||||
on_fatal_io_error(&e, &format!("Loading metadata at {metadata_path}"))
|
||||
}
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
let request = ReAttachRequest {
|
||||
node_id: self.node_id,
|
||||
register,
|
||||
};
|
||||
|
||||
fail::fail_point!("control-plane-client-re-attach");
|
||||
|
||||
@@ -831,7 +831,10 @@ mod test {
|
||||
}
|
||||
|
||||
impl ControlPlaneGenerationsApi for MockControlPlane {
|
||||
async fn re_attach(&self) -> Result<HashMap<TenantShardId, Generation>, RetryForeverError> {
|
||||
async fn re_attach(
|
||||
&self,
|
||||
_conf: &PageServerConf,
|
||||
) -> Result<HashMap<TenantShardId, Generation>, RetryForeverError> {
|
||||
unimplemented!()
|
||||
}
|
||||
async fn validate(
|
||||
|
||||
@@ -295,7 +295,7 @@ async fn init_load_generations(
|
||||
} else if let Some(client) = ControlPlaneClient::new(conf, cancel) {
|
||||
info!("Calling control plane API to re-attach tenants");
|
||||
// If we are configured to use the control plane API, then it is the source of truth for what tenants to load.
|
||||
match client.re_attach().await {
|
||||
match client.re_attach(conf).await {
|
||||
Ok(tenants) => tenants,
|
||||
Err(RetryForeverError::ShuttingDown) => {
|
||||
anyhow::bail!("Shut down while waiting for control plane re-attach response")
|
||||
|
||||
@@ -519,9 +519,9 @@ class NeonEnvBuilder:
|
||||
self.env = NeonEnv(self)
|
||||
return self.env
|
||||
|
||||
def start(self):
|
||||
def start(self, register_pageservers=False):
|
||||
assert self.env is not None, "environment is not already initialized, call init() first"
|
||||
self.env.start()
|
||||
self.env.start(register_pageservers=register_pageservers)
|
||||
|
||||
def init_start(
|
||||
self,
|
||||
@@ -1112,7 +1112,7 @@ class NeonEnv:
|
||||
log.info(f"Config: {cfg}")
|
||||
self.neon_cli.init(cfg, force=config.config_init_force)
|
||||
|
||||
def start(self):
|
||||
def start(self, register_pageservers=False):
|
||||
# storage controller starts first, so that pageserver /re-attach calls don't
|
||||
# bounce through retries on startup
|
||||
self.storage_controller.start()
|
||||
@@ -1124,6 +1124,11 @@ class NeonEnv:
|
||||
# reconcile.
|
||||
wait_until(30, 1, storage_controller_ready)
|
||||
|
||||
if register_pageservers:
|
||||
# Special case for forward compat tests, this can be removed later.
|
||||
for pageserver in self.pageservers:
|
||||
self.storage_controller.node_register(pageserver)
|
||||
|
||||
# Start up broker, pageserver and all safekeepers
|
||||
futs = []
|
||||
with concurrent.futures.ThreadPoolExecutor(
|
||||
@@ -1712,10 +1717,8 @@ class NeonCli(AbstractNeonCli):
|
||||
id: int,
|
||||
overrides: Tuple[str, ...] = (),
|
||||
extra_env_vars: Optional[Dict[str, str]] = None,
|
||||
register: bool = True,
|
||||
) -> "subprocess.CompletedProcess[str]":
|
||||
register_str = "true" if register else "false"
|
||||
start_args = ["pageserver", "start", f"--id={id}", *overrides, f"--register={register_str}"]
|
||||
start_args = ["pageserver", "start", f"--id={id}", *overrides]
|
||||
storage = self.env.pageserver_remote_storage
|
||||
append_pageserver_param_overrides(
|
||||
params_to_update=start_args,
|
||||
@@ -2066,6 +2069,8 @@ class NeonStorageController(MetricsGetter):
|
||||
"node_id": int(node.id),
|
||||
"listen_http_addr": "localhost",
|
||||
"listen_http_port": node.service_port.http,
|
||||
"listen_pg_addr": "localhost",
|
||||
"listen_pg_port": node.service_port.pg,
|
||||
}
|
||||
log.info(f"node_register({body})")
|
||||
self.request(
|
||||
@@ -2233,7 +2238,6 @@ class NeonPageserver(PgProtocol):
|
||||
self,
|
||||
overrides: Tuple[str, ...] = (),
|
||||
extra_env_vars: Optional[Dict[str, str]] = None,
|
||||
register: bool = True,
|
||||
) -> "NeonPageserver":
|
||||
"""
|
||||
Start the page server.
|
||||
@@ -2243,7 +2247,7 @@ class NeonPageserver(PgProtocol):
|
||||
assert self.running is False
|
||||
|
||||
self.env.neon_cli.pageserver_start(
|
||||
self.id, overrides=overrides, extra_env_vars=extra_env_vars, register=register
|
||||
self.id, overrides=overrides, extra_env_vars=extra_env_vars
|
||||
)
|
||||
self.running = True
|
||||
return self
|
||||
|
||||
@@ -242,7 +242,7 @@ def test_forward_compatibility(
|
||||
# everything else: our test code is written for latest CLI args.
|
||||
env.neon_local_binpath = neon_local_binpath
|
||||
|
||||
neon_env_builder.start()
|
||||
neon_env_builder.start(register_pageservers=True)
|
||||
|
||||
check_neon_works(
|
||||
env,
|
||||
|
||||
@@ -205,6 +205,9 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
|
||||
sk.start()
|
||||
env.storage_controller.start()
|
||||
|
||||
# We will start a pageserver with no control_plane_api set, so it won't be able to self-register
|
||||
env.storage_controller.node_register(env.pageserver)
|
||||
|
||||
env.pageserver.start(overrides=('--pageserver-config-override=control_plane_api=""',))
|
||||
|
||||
env.neon_cli.create_tenant(
|
||||
@@ -511,7 +514,6 @@ def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
env.pageserver.stop() # Non-immediate: implicitly checking that shutdown doesn't hang waiting for CP
|
||||
env.pageserver.start(
|
||||
overrides=("--pageserver-config-override=control_plane_emergency_mode=true",),
|
||||
register=False,
|
||||
)
|
||||
|
||||
# The pageserver should provide service to clients
|
||||
|
||||
@@ -278,13 +278,12 @@ def test_sharding_service_onboarding(neon_env_builder: NeonEnvBuilder, warm_up:
|
||||
env.pageservers[0].allowed_errors.append(".*Emergency mode!.*")
|
||||
env.pageservers[0].start(
|
||||
overrides=("--pageserver-config-override=control_plane_emergency_mode=true",),
|
||||
register=False,
|
||||
)
|
||||
origin_ps = env.pageservers[0]
|
||||
|
||||
# This is the pageserver managed by the sharding service, where the tenant
|
||||
# will be attached after onboarding
|
||||
env.pageservers[1].start(register=True)
|
||||
env.pageservers[1].start()
|
||||
dest_ps = env.pageservers[1]
|
||||
virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user