storcon: register Pageserver gRPC address (#12268)

## Problem

Pageservers now expose a gRPC API on a separate address and port. This
must be registered with the storage controller such that it can be
plumbed through to the compute via cplane.

Touches #11926.

## Summary of changes

This patch registers the gRPC address and port with the storage
controller:

* Add gRPC address to `nodes` database table and `NodePersistence`, with
a Diesel migration.
* Add gRPC address in `NodeMetadata`, `NodeRegisterRequest`,
`NodeDescribeResponse`, and `TenantLocateResponseShard`.
* Add gRPC address flags to `storcon_cli node-register`.

These changes are backwards-compatible, since all structs will ignore
unknown fields during deserialization.
This commit is contained in:
Erik Grinaker
2025-06-17 06:27:10 -07:00
committed by GitHub
parent d81353b2d1
commit 48052477b4
13 changed files with 131 additions and 17 deletions

View File

@@ -16,6 +16,7 @@ use std::time::Duration;
use anyhow::{Context, bail};
use camino::Utf8PathBuf;
use pageserver_api::config::{DEFAULT_GRPC_LISTEN_PORT, DEFAULT_HTTP_LISTEN_PORT};
use pageserver_api::models::{self, TenantInfo, TimelineInfo};
use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api;
@@ -252,9 +253,10 @@ impl PageServerNode {
// the storage controller
let metadata_path = datadir.join("metadata.json");
let (_http_host, http_port) =
let http_host = "localhost".to_string();
let (_, http_port) =
parse_host_port(&self.conf.listen_http_addr).expect("Unable to parse listen_http_addr");
let http_port = http_port.unwrap_or(9898);
let http_port = http_port.unwrap_or(DEFAULT_HTTP_LISTEN_PORT);
let https_port = match self.conf.listen_https_addr.as_ref() {
Some(https_addr) => {
@@ -265,6 +267,13 @@ impl PageServerNode {
None => None,
};
let (mut grpc_host, mut grpc_port) = (None, None);
if let Some(grpc_addr) = &self.conf.listen_grpc_addr {
let (_, port) = parse_host_port(grpc_addr).expect("Unable to parse listen_grpc_addr");
grpc_host = Some("localhost".to_string());
grpc_port = Some(port.unwrap_or(DEFAULT_GRPC_LISTEN_PORT));
}
// Intentionally hand-craft JSON: this acts as an implicit format compat test
// in case the pageserver-side structure is edited, and reflects the real life
// situation: the metadata is written by some other script.
@@ -273,7 +282,9 @@ impl PageServerNode {
serde_json::to_vec(&pageserver_api::config::NodeMetadata {
postgres_host: "localhost".to_string(),
postgres_port: self.pg_connection_config.port(),
http_host: "localhost".to_string(),
grpc_host,
grpc_port,
http_host,
http_port,
https_port,
other: HashMap::from([(

View File

@@ -36,6 +36,10 @@ enum Command {
listen_pg_addr: String,
#[arg(long)]
listen_pg_port: u16,
#[arg(long)]
listen_grpc_addr: Option<String>,
#[arg(long)]
listen_grpc_port: Option<u16>,
#[arg(long)]
listen_http_addr: String,
@@ -418,6 +422,8 @@ async fn main() -> anyhow::Result<()> {
node_id,
listen_pg_addr,
listen_pg_port,
listen_grpc_addr,
listen_grpc_port,
listen_http_addr,
listen_http_port,
listen_https_port,
@@ -431,6 +437,8 @@ async fn main() -> anyhow::Result<()> {
node_id,
listen_pg_addr,
listen_pg_port,
listen_grpc_addr,
listen_grpc_port,
listen_http_addr,
listen_http_port,
listen_https_port,

View File

@@ -12,6 +12,7 @@ pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LI
pub const DEFAULT_GRPC_LISTEN_PORT: u16 = 51051; // storage-broker already uses 50051
use std::collections::HashMap;
use std::fmt::Display;
use std::num::{NonZeroU64, NonZeroUsize};
use std::str::FromStr;
use std::time::Duration;
@@ -24,16 +25,17 @@ use utils::logging::LogFormat;
use crate::models::{ImageCompressionAlgorithm, LsnLease};
// Certain metadata (e.g. externally-addressable name, AZ) is delivered
// as a separate structure. This information is not neeed by the pageserver
// as a separate structure. This information is not needed by the pageserver
// itself, it is only used for registering the pageserver with the control
// plane and/or storage controller.
//
#[derive(PartialEq, Eq, Debug, serde::Serialize, serde::Deserialize)]
pub struct NodeMetadata {
#[serde(rename = "host")]
pub postgres_host: String,
#[serde(rename = "port")]
pub postgres_port: u16,
pub grpc_host: Option<String>,
pub grpc_port: Option<u16>,
pub http_host: String,
pub http_port: u16,
pub https_port: Option<u16>,
@@ -44,6 +46,23 @@ pub struct NodeMetadata {
pub other: HashMap<String, serde_json::Value>,
}
impl Display for NodeMetadata {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"postgresql://{}:{} ",
self.postgres_host, self.postgres_port
)?;
if let Some(grpc_host) = &self.grpc_host {
let grpc_port = self.grpc_port.unwrap_or_default();
write!(f, "grpc://{grpc_host}:{grpc_port} ")?;
}
write!(f, "http://{}:{} ", self.http_host, self.http_port)?;
write!(f, "other:{:?}", self.other)?;
Ok(())
}
}
/// PostHog integration config.
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct PostHogConfig {

View File

@@ -14,6 +14,8 @@ fn test_node_metadata_v1_backward_compatibilty() {
NodeMetadata {
postgres_host: "localhost".to_string(),
postgres_port: 23,
grpc_host: None,
grpc_port: None,
http_host: "localhost".to_string(),
http_port: 42,
https_port: None,
@@ -37,6 +39,35 @@ fn test_node_metadata_v2_backward_compatibilty() {
NodeMetadata {
postgres_host: "localhost".to_string(),
postgres_port: 23,
grpc_host: None,
grpc_port: None,
http_host: "localhost".to_string(),
http_port: 42,
https_port: Some(123),
other: HashMap::new(),
}
)
}
#[test]
fn test_node_metadata_v3_backward_compatibilty() {
let v3 = serde_json::to_vec(&serde_json::json!({
"host": "localhost",
"port": 23,
"grpc_host": "localhost",
"grpc_port": 51,
"http_host": "localhost",
"http_port": 42,
"https_port": 123,
}));
assert_eq!(
serde_json::from_slice::<NodeMetadata>(&v3.unwrap()).unwrap(),
NodeMetadata {
postgres_host: "localhost".to_string(),
postgres_port: 23,
grpc_host: Some("localhost".to_string()),
grpc_port: Some(51),
http_host: "localhost".to_string(),
http_port: 42,
https_port: Some(123),

View File

@@ -52,6 +52,8 @@ pub struct NodeRegisterRequest {
pub listen_pg_addr: String,
pub listen_pg_port: u16,
pub listen_grpc_addr: Option<String>,
pub listen_grpc_port: Option<u16>,
pub listen_http_addr: String,
pub listen_http_port: u16,
@@ -101,6 +103,8 @@ pub struct TenantLocateResponseShard {
pub listen_pg_addr: String,
pub listen_pg_port: u16,
pub listen_grpc_addr: Option<String>,
pub listen_grpc_port: Option<u16>,
pub listen_http_addr: String,
pub listen_http_port: u16,
@@ -152,6 +156,8 @@ pub struct NodeDescribeResponse {
pub listen_pg_addr: String,
pub listen_pg_port: u16,
pub listen_grpc_addr: Option<String>,
pub listen_grpc_port: Option<u16>,
}
#[derive(Serialize, Deserialize, Debug)]

View File

@@ -159,14 +159,7 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient {
Ok(m) => {
// Since we run one time at startup, be generous in our logging and
// dump all metadata.
tracing::info!(
"Loaded node metadata: postgres {}:{}, http {}:{}, other fields: {:?}",
m.postgres_host,
m.postgres_port,
m.http_host,
m.http_port,
m.other
);
tracing::info!("Loaded node metadata: {m}");
let az_id = {
let az_id_from_metadata = m
@@ -195,6 +188,8 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient {
node_id: conf.id,
listen_pg_addr: m.postgres_host,
listen_pg_port: m.postgres_port,
listen_grpc_addr: m.grpc_host,
listen_grpc_port: m.grpc_port,
listen_http_addr: m.http_host,
listen_http_port: m.http_port,
listen_https_port: m.https_port,

View File

@@ -0,0 +1 @@
ALTER TABLE nodes DROP listen_grpc_addr, listen_grpc_port;

View File

@@ -0,0 +1 @@
ALTER TABLE nodes ADD listen_grpc_addr VARCHAR NULL, ADD listen_grpc_port INTEGER NULL;

View File

@@ -37,6 +37,8 @@ pub(crate) struct Node {
listen_pg_addr: String,
listen_pg_port: u16,
listen_grpc_addr: Option<String>,
listen_grpc_port: Option<u16>,
availability_zone_id: AvailabilityZone,
@@ -100,8 +102,8 @@ impl Node {
self.id == register_req.node_id
&& self.listen_http_addr == register_req.listen_http_addr
&& self.listen_http_port == register_req.listen_http_port
// Note: listen_https_port may change. See [`Self::need_update`] for mode details.
// && self.listen_https_port == register_req.listen_https_port
// Note: HTTPS and gRPC addresses may change, to allow for migrations. See
// [`Self::need_update`] for more details.
&& self.listen_pg_addr == register_req.listen_pg_addr
&& self.listen_pg_port == register_req.listen_pg_port
&& self.availability_zone_id == register_req.availability_zone_id
@@ -109,9 +111,10 @@ impl Node {
// Do we need to update an existing record in DB on this registration request?
pub(crate) fn need_update(&self, register_req: &NodeRegisterRequest) -> bool {
// listen_https_port is checked here because it may change during migration to https.
// After migration, this check may be moved to registration_match.
// These are checked here, since they may change before we're fully migrated.
self.listen_https_port != register_req.listen_https_port
|| self.listen_grpc_addr != register_req.listen_grpc_addr
|| self.listen_grpc_port != register_req.listen_grpc_port
}
/// For a shard located on this node, populate a response object
@@ -125,6 +128,8 @@ impl Node {
listen_https_port: self.listen_https_port,
listen_pg_addr: self.listen_pg_addr.clone(),
listen_pg_port: self.listen_pg_port,
listen_grpc_addr: self.listen_grpc_addr.clone(),
listen_grpc_port: self.listen_grpc_port,
}
}
@@ -211,6 +216,8 @@ impl Node {
listen_https_port: Option<u16>,
listen_pg_addr: String,
listen_pg_port: u16,
listen_grpc_addr: Option<String>,
listen_grpc_port: Option<u16>,
availability_zone_id: AvailabilityZone,
use_https: bool,
) -> anyhow::Result<Self> {
@@ -221,6 +228,10 @@ impl Node {
);
}
if listen_grpc_addr.is_some() != listen_grpc_port.is_some() {
anyhow::bail!("cannot create node {id}: must specify both gRPC address and port");
}
Ok(Self {
id,
listen_http_addr,
@@ -228,6 +239,8 @@ impl Node {
listen_https_port,
listen_pg_addr,
listen_pg_port,
listen_grpc_addr,
listen_grpc_port,
scheduling: NodeSchedulingPolicy::Active,
lifecycle: NodeLifecycle::Active,
availability: NodeAvailability::Offline,
@@ -247,6 +260,8 @@ impl Node {
listen_https_port: self.listen_https_port.map(|x| x as i32),
listen_pg_addr: self.listen_pg_addr.clone(),
listen_pg_port: self.listen_pg_port as i32,
listen_grpc_addr: self.listen_grpc_addr.clone(),
listen_grpc_port: self.listen_grpc_port.map(|port| port as i32),
availability_zone_id: self.availability_zone_id.0.clone(),
}
}
@@ -260,6 +275,13 @@ impl Node {
);
}
if np.listen_grpc_addr.is_some() != np.listen_grpc_port.is_some() {
anyhow::bail!(
"can't load node {}: must specify both gRPC address and port",
np.node_id
);
}
Ok(Self {
id: NodeId(np.node_id as u64),
// At startup we consider a node offline until proven otherwise.
@@ -272,6 +294,8 @@ impl Node {
listen_https_port: np.listen_https_port.map(|x| x as u16),
listen_pg_addr: np.listen_pg_addr,
listen_pg_port: np.listen_pg_port as u16,
listen_grpc_addr: np.listen_grpc_addr,
listen_grpc_port: np.listen_grpc_port.map(|port| port as u16),
availability_zone_id: AvailabilityZone(np.availability_zone_id),
use_https,
cancel: CancellationToken::new(),
@@ -361,6 +385,8 @@ impl Node {
listen_https_port: self.listen_https_port,
listen_pg_addr: self.listen_pg_addr.clone(),
listen_pg_port: self.listen_pg_port,
listen_grpc_addr: self.listen_grpc_addr.clone(),
listen_grpc_port: self.listen_grpc_port,
}
}
}

View File

@@ -2125,6 +2125,8 @@ pub(crate) struct NodePersistence {
pub(crate) availability_zone_id: String,
pub(crate) listen_https_port: Option<i32>,
pub(crate) lifecycle: String,
pub(crate) listen_grpc_addr: Option<String>,
pub(crate) listen_grpc_port: Option<i32>,
}
/// Tenant metadata health status that are stored durably.

View File

@@ -945,6 +945,8 @@ pub(crate) mod test_utils {
None,
format!("pghost-{i}"),
5432 + i as u16,
Some(format!("grpchost-{i}")),
Some(51051 + i as u16),
az_iter
.next()
.cloned()

View File

@@ -34,6 +34,8 @@ diesel::table! {
availability_zone_id -> Varchar,
listen_https_port -> Nullable<Int4>,
lifecycle -> Varchar,
listen_grpc_addr -> Nullable<Varchar>,
listen_grpc_port -> Nullable<Int4>,
}
}

View File

@@ -1683,6 +1683,8 @@ impl Service {
None,
"".to_string(),
123,
None,
None,
AvailabilityZone("test_az".to_string()),
false,
)
@@ -7254,6 +7256,12 @@ impl Service {
));
}
if register_req.listen_grpc_addr.is_some() != register_req.listen_grpc_port.is_some() {
return Err(ApiError::BadRequest(anyhow::anyhow!(
"must specify both gRPC address and port"
)));
}
// Ordering: we must persist the new node _before_ adding it to in-memory state.
// This ensures that before we use it for anything or expose it via any external
// API, it is guaranteed to be available after a restart.
@@ -7264,6 +7272,8 @@ impl Service {
register_req.listen_https_port,
register_req.listen_pg_addr,
register_req.listen_pg_port,
register_req.listen_grpc_addr,
register_req.listen_grpc_port,
register_req.availability_zone_id.clone(),
self.config.use_https_pageserver_api,
);