From 9ecce60ded25139ac0c2072b83671c3cd2bfbd65 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 9 Jun 2025 11:09:04 -0700 Subject: [PATCH] Plumb gRPC addr through storage-controller --- control_plane/src/bin/neon_local.rs | 47 ++++++++++++------- control_plane/src/pageserver.rs | 10 ++++ control_plane/storcon_cli/src/main.rs | 9 ++++ libs/pageserver_api/src/config.rs | 2 + libs/pageserver_api/src/config/tests.rs | 31 ++++++++++++ libs/pageserver_api/src/controller_api.rs | 8 ++++ pageserver/src/controller_upcall_client.rs | 2 + .../down.sql | 1 + .../up.sql | 1 + storage_controller/src/node.rs | 24 ++++++++-- storage_controller/src/persistence.rs | 2 + storage_controller/src/scheduler.rs | 2 + storage_controller/src/schema.rs | 2 + storage_controller/src/service.rs | 4 ++ 14 files changed, 124 insertions(+), 21 deletions(-) create mode 100644 storage_controller/migrations/2025-06-07-043910_pageserver_grpc_addr/down.sql create mode 100644 storage_controller/migrations/2025-06-07-043910_pageserver_grpc_addr/up.sql diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 8769431c7d..80194b0627 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -1473,9 +1473,6 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res // fully managed by storage controller, therefore not sharded. (vec![(protocol, host, port)], DEFAULT_STRIPE_SIZE) } else { - // TODO: plumb Pageserver gRPC ports through storage-controller. - assert!(!args.grpc, "gRPC not supported with storage-controller yet"); - // Look up the currently attached location of the tenant, and its striping metadata, // to pass these on to postgres. let storage_controller = StorageController::from_env(env); @@ -1493,12 +1490,22 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res .await?; } - anyhow::Ok(( - PageserverProtocol::Libpq, - Host::parse(&shard.listen_pg_addr) - .expect("Storage controller reported bad hostname"), - shard.listen_pg_port, - )) + let pageserver = if args.grpc { + ( + PageserverProtocol::Grpc, + Host::parse(&shard.listen_grpc_addr.expect("no gRPC addr")) + .expect("bad hostname"), + shard.listen_grpc_port.expect("no gRPC port"), + ) + } else { + ( + PageserverProtocol::Libpq, + Host::parse(&shard.listen_pg_addr).expect("bad hostname"), + shard.listen_pg_port, + ) + }; + + anyhow::Ok(pageserver) }), ) .await?; @@ -1565,8 +1572,6 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res }; vec![(protocol, host, port)] } else { - // TODO: plumb gRPC ports through storage-controller. - assert!(!args.grpc, "gRPC not supported with storage-controller yet"); let storage_controller = StorageController::from_env(env); storage_controller .tenant_locate(endpoint.tenant_id) @@ -1574,12 +1579,20 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res .shards .into_iter() .map(|shard| { - ( - PageserverProtocol::Libpq, - Host::parse(&shard.listen_pg_addr) - .expect("Storage controller reported malformed host"), - shard.listen_pg_port, - ) + if args.grpc { + ( + PageserverProtocol::Grpc, + Host::parse(&shard.listen_grpc_addr.expect("no gRPC addr")) + .expect("bad hostname"), + shard.listen_grpc_port.expect("no gRPC port"), + ) + } else { + ( + PageserverProtocol::Libpq, + Host::parse(&shard.listen_pg_addr).expect("bad hostname"), + shard.listen_pg_port, + ) + } }) .collect::>() }; diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 0cf7ca184d..25bcf9fb83 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -265,6 +265,14 @@ impl PageServerNode { None => None, }; + let mut grpc_host = None; + let mut grpc_port = None; + if let Some(grpc_addr) = &self.conf.listen_grpc_addr { + let (_, port) = parse_host_port(grpc_addr).expect("Unable to parse listen_grpc_addr"); + grpc_host = Some("localhost".to_string()); + grpc_port = Some(port.unwrap_or(51051)); + } + // Intentionally hand-craft JSON: this acts as an implicit format compat test // in case the pageserver-side structure is edited, and reflects the real life // situation: the metadata is written by some other script. @@ -273,6 +281,8 @@ impl PageServerNode { serde_json::to_vec(&pageserver_api::config::NodeMetadata { postgres_host: "localhost".to_string(), postgres_port: self.pg_connection_config.port(), + grpc_host, + grpc_port, http_host: "localhost".to_string(), http_port, https_port, diff --git a/control_plane/storcon_cli/src/main.rs b/control_plane/storcon_cli/src/main.rs index 19c686dcfd..473c1bc7d9 100644 --- a/control_plane/storcon_cli/src/main.rs +++ b/control_plane/storcon_cli/src/main.rs @@ -37,6 +37,11 @@ enum Command { #[arg(long)] listen_pg_port: u16, + #[arg(long)] + listen_grpc_addr: Option, + #[arg(long)] + listen_grpc_port: Option, + #[arg(long)] listen_http_addr: String, #[arg(long)] @@ -410,6 +415,8 @@ async fn main() -> anyhow::Result<()> { node_id, listen_pg_addr, listen_pg_port, + listen_grpc_addr, + listen_grpc_port, listen_http_addr, listen_http_port, listen_https_port, @@ -423,6 +430,8 @@ async fn main() -> anyhow::Result<()> { node_id, listen_pg_addr, listen_pg_port, + listen_grpc_addr, + listen_grpc_port, listen_http_addr, listen_http_port, listen_https_port, diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index 30b0612082..4a09a53d92 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -34,6 +34,8 @@ pub struct NodeMetadata { pub postgres_host: String, #[serde(rename = "port")] pub postgres_port: u16, + pub grpc_host: Option, + pub grpc_port: Option, pub http_host: String, pub http_port: u16, pub https_port: Option, diff --git a/libs/pageserver_api/src/config/tests.rs b/libs/pageserver_api/src/config/tests.rs index 9e61873273..7137df969a 100644 --- a/libs/pageserver_api/src/config/tests.rs +++ b/libs/pageserver_api/src/config/tests.rs @@ -14,6 +14,8 @@ fn test_node_metadata_v1_backward_compatibilty() { NodeMetadata { postgres_host: "localhost".to_string(), postgres_port: 23, + grpc_host: None, + grpc_port: None, http_host: "localhost".to_string(), http_port: 42, https_port: None, @@ -37,6 +39,35 @@ fn test_node_metadata_v2_backward_compatibilty() { NodeMetadata { postgres_host: "localhost".to_string(), postgres_port: 23, + grpc_host: None, + grpc_port: None, + http_host: "localhost".to_string(), + http_port: 42, + https_port: Some(123), + other: HashMap::new(), + } + ) +} + +#[test] +fn test_node_metadata_v3_backward_compatibilty() { + let v3 = serde_json::to_vec(&serde_json::json!({ + "host": "localhost", + "port": 23, + "grpc_host": "localhost", + "grpc_port": 51, + "http_host": "localhost", + "http_port": 42, + "https_port": 123, + })); + + assert_eq!( + serde_json::from_slice::(&v3.unwrap()).unwrap(), + NodeMetadata { + postgres_host: "localhost".to_string(), + postgres_port: 23, + grpc_host: Some("localhost".to_string()), + grpc_port: Some(51), http_host: "localhost".to_string(), http_port: 42, https_port: Some(123), diff --git a/libs/pageserver_api/src/controller_api.rs b/libs/pageserver_api/src/controller_api.rs index c5b49edba0..f6e75440ce 100644 --- a/libs/pageserver_api/src/controller_api.rs +++ b/libs/pageserver_api/src/controller_api.rs @@ -53,6 +53,9 @@ pub struct NodeRegisterRequest { pub listen_pg_addr: String, pub listen_pg_port: u16, + pub listen_grpc_addr: Option, + pub listen_grpc_port: Option, + pub listen_http_addr: String, pub listen_http_port: u16, pub listen_https_port: Option, @@ -102,6 +105,9 @@ pub struct TenantLocateResponseShard { pub listen_pg_addr: String, pub listen_pg_port: u16, + pub listen_grpc_addr: Option, + pub listen_grpc_port: Option, + pub listen_http_addr: String, pub listen_http_port: u16, pub listen_https_port: Option, @@ -152,6 +158,8 @@ pub struct NodeDescribeResponse { pub listen_pg_addr: String, pub listen_pg_port: u16, + pub listen_grpc_addr: Option, + pub listen_grpc_port: Option, } #[derive(Serialize, Deserialize, Debug)] diff --git a/pageserver/src/controller_upcall_client.rs b/pageserver/src/controller_upcall_client.rs index dc38ea616c..2d99c05481 100644 --- a/pageserver/src/controller_upcall_client.rs +++ b/pageserver/src/controller_upcall_client.rs @@ -195,6 +195,8 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient { node_id: conf.id, listen_pg_addr: m.postgres_host, listen_pg_port: m.postgres_port, + listen_grpc_addr: m.grpc_host, + listen_grpc_port: m.grpc_port, listen_http_addr: m.http_host, listen_http_port: m.http_port, listen_https_port: m.https_port, diff --git a/storage_controller/migrations/2025-06-07-043910_pageserver_grpc_addr/down.sql b/storage_controller/migrations/2025-06-07-043910_pageserver_grpc_addr/down.sql new file mode 100644 index 0000000000..f9f2ebb070 --- /dev/null +++ b/storage_controller/migrations/2025-06-07-043910_pageserver_grpc_addr/down.sql @@ -0,0 +1 @@ +ALTER TABLE nodes DROP listen_grpc_addr, listen_grpc_port; diff --git a/storage_controller/migrations/2025-06-07-043910_pageserver_grpc_addr/up.sql b/storage_controller/migrations/2025-06-07-043910_pageserver_grpc_addr/up.sql new file mode 100644 index 0000000000..8291864b16 --- /dev/null +++ b/storage_controller/migrations/2025-06-07-043910_pageserver_grpc_addr/up.sql @@ -0,0 +1 @@ +ALTER TABLE nodes ADD listen_grpc_addr VARCHAR NULL, ADD listen_grpc_port INTEGER NULL; diff --git a/storage_controller/src/node.rs b/storage_controller/src/node.rs index e180c49b43..5c17e73736 100644 --- a/storage_controller/src/node.rs +++ b/storage_controller/src/node.rs @@ -37,6 +37,9 @@ pub(crate) struct Node { listen_pg_addr: String, listen_pg_port: u16, + listen_grpc_addr: Option, + listen_grpc_port: Option, + availability_zone_id: AvailabilityZone, // Flag from storcon's config to use https for pageserver admin API. @@ -99,8 +102,8 @@ impl Node { self.id == register_req.node_id && self.listen_http_addr == register_req.listen_http_addr && self.listen_http_port == register_req.listen_http_port - // Note: listen_https_port may change. See [`Self::need_update`] for mode details. - // && self.listen_https_port == register_req.listen_https_port + // Note: HTTPS and gRPC ports/addresses may change, to allow for migrations. See + // [`Self::need_update`] for more details. && self.listen_pg_addr == register_req.listen_pg_addr && self.listen_pg_port == register_req.listen_pg_port && self.availability_zone_id == register_req.availability_zone_id @@ -108,9 +111,10 @@ impl Node { // Do we need to update an existing record in DB on this registration request? pub(crate) fn need_update(&self, register_req: &NodeRegisterRequest) -> bool { - // listen_https_port is checked here because it may change during migration to https. - // After migration, this check may be moved to registration_match. + // These are checked here, since they may change before we're fully migrated. self.listen_https_port != register_req.listen_https_port + || self.listen_grpc_addr != register_req.listen_grpc_addr + || self.listen_grpc_port != register_req.listen_grpc_port } /// For a shard located on this node, populate a response object @@ -124,6 +128,8 @@ impl Node { listen_https_port: self.listen_https_port, listen_pg_addr: self.listen_pg_addr.clone(), listen_pg_port: self.listen_pg_port, + listen_grpc_addr: self.listen_grpc_addr.clone(), + listen_grpc_port: self.listen_grpc_port, } } @@ -210,6 +216,8 @@ impl Node { listen_https_port: Option, listen_pg_addr: String, listen_pg_port: u16, + listen_grpc_addr: Option, + listen_grpc_port: Option, availability_zone_id: AvailabilityZone, use_https: bool, ) -> anyhow::Result { @@ -227,6 +235,8 @@ impl Node { listen_https_port, listen_pg_addr, listen_pg_port, + listen_grpc_addr, + listen_grpc_port, scheduling: NodeSchedulingPolicy::Active, availability: NodeAvailability::Offline, availability_zone_id, @@ -244,6 +254,8 @@ impl Node { listen_https_port: self.listen_https_port.map(|x| x as i32), listen_pg_addr: self.listen_pg_addr.clone(), listen_pg_port: self.listen_pg_port as i32, + listen_grpc_addr: self.listen_grpc_addr.clone(), + listen_grpc_port: self.listen_grpc_port.map(|x| x as i32), availability_zone_id: self.availability_zone_id.0.clone(), } } @@ -268,6 +280,8 @@ impl Node { listen_https_port: np.listen_https_port.map(|x| x as u16), listen_pg_addr: np.listen_pg_addr, listen_pg_port: np.listen_pg_port as u16, + listen_grpc_addr: np.listen_grpc_addr, + listen_grpc_port: np.listen_grpc_port.map(|x| x as u16), availability_zone_id: AvailabilityZone(np.availability_zone_id), use_https, cancel: CancellationToken::new(), @@ -357,6 +371,8 @@ impl Node { listen_https_port: self.listen_https_port, listen_pg_addr: self.listen_pg_addr.clone(), listen_pg_port: self.listen_pg_port, + listen_grpc_addr: self.listen_grpc_addr.clone(), + listen_grpc_port: self.listen_grpc_port, } } } diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index 052c0f02eb..905fceaf06 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -2048,6 +2048,8 @@ pub(crate) struct NodePersistence { pub(crate) listen_pg_port: i32, pub(crate) availability_zone_id: String, pub(crate) listen_https_port: Option, + pub(crate) listen_grpc_addr: Option, + pub(crate) listen_grpc_port: Option, } /// Tenant metadata health status that are stored durably. diff --git a/storage_controller/src/scheduler.rs b/storage_controller/src/scheduler.rs index 773373391e..78a52a87c1 100644 --- a/storage_controller/src/scheduler.rs +++ b/storage_controller/src/scheduler.rs @@ -945,6 +945,8 @@ pub(crate) mod test_utils { None, format!("pghost-{i}"), 5432 + i as u16, + None, + None, az_iter .next() .cloned() diff --git a/storage_controller/src/schema.rs b/storage_controller/src/schema.rs index 20be9bb5ca..e162d2dd5d 100644 --- a/storage_controller/src/schema.rs +++ b/storage_controller/src/schema.rs @@ -33,6 +33,8 @@ diesel::table! { listen_pg_port -> Int4, availability_zone_id -> Varchar, listen_https_port -> Nullable, + listen_grpc_addr -> Nullable, + listen_grpc_port -> Nullable, } } diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 790797bae2..f619313a64 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -1681,6 +1681,8 @@ impl Service { None, "".to_string(), 123, + None, + None, AvailabilityZone("test_az".to_string()), false, ) @@ -7215,6 +7217,8 @@ impl Service { register_req.listen_https_port, register_req.listen_pg_addr, register_req.listen_pg_port, + register_req.listen_grpc_addr, + register_req.listen_grpc_port, register_req.availability_zone_id.clone(), self.config.use_https_pageserver_api, );