diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 76e33e4bff..01ca28fce0 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -18,7 +18,7 @@ use clap::Parser; use compute_api::requests::ComputeClaimsScope; use compute_api::spec::ComputeMode; use control_plane::broker::StorageBroker; -use control_plane::endpoint::ComputeControlPlane; +use control_plane::endpoint::{ComputeControlPlane, PageserverProtocol}; use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_ADDR, EndpointStorage}; use control_plane::local_env; use control_plane::local_env::{ @@ -605,6 +605,14 @@ struct EndpointCreateCmdArgs { #[clap(long, help = "Postgres version")] pg_version: u32, + /// Use gRPC to communicate with Pageservers, by generating grpc:// connstrings. + /// + /// Specified on creation such that it's retained across reconfiguration and restarts. + /// + /// NB: not yet supported by computes. + #[clap(long)] + grpc: bool, + #[clap( long, help = "If set, the node will be a hot replica on the specified timeline", @@ -1451,6 +1459,7 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res args.internal_http_port, args.pg_version, mode, + args.grpc, !args.update_catalog, false, )?; @@ -1491,13 +1500,20 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res let (pageservers, stripe_size) = if let Some(pageserver_id) = pageserver_id { let conf = env.get_pageserver_conf(pageserver_id).unwrap(); - let parsed = parse_host_port(&conf.listen_pg_addr).expect("Bad config"); - ( - vec![(parsed.0, parsed.1.unwrap_or(5432))], - // If caller is telling us what pageserver to use, this is not a tenant which is - // full managed by storage controller, therefore not sharded. - DEFAULT_STRIPE_SIZE, - ) + // Use gRPC if requested. + let pageserver = if endpoint.grpc { + let grpc_addr = conf.listen_grpc_addr.as_ref().expect("bad config"); + let (host, port) = parse_host_port(grpc_addr)?; + let port = port.unwrap_or(DEFAULT_PAGESERVER_GRPC_PORT); + (PageserverProtocol::Grpc, host, port) + } else { + let (host, port) = parse_host_port(&conf.listen_pg_addr)?; + let port = port.unwrap_or(5432); + (PageserverProtocol::Libpq, host, port) + }; + // If caller is telling us what pageserver to use, this is not a tenant which is + // fully managed by storage controller, therefore not sharded. + (vec![pageserver], DEFAULT_STRIPE_SIZE) } else { // Look up the currently attached location of the tenant, and its striping metadata, // to pass these on to postgres. @@ -1516,11 +1532,20 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res .await?; } - anyhow::Ok(( - Host::parse(&shard.listen_pg_addr) - .expect("Storage controller reported bad hostname"), - shard.listen_pg_port, - )) + let pageserver = if endpoint.grpc { + ( + PageserverProtocol::Grpc, + Host::parse(&shard.listen_grpc_addr.expect("no gRPC address"))?, + shard.listen_grpc_port.expect("no gRPC port"), + ) + } else { + ( + PageserverProtocol::Libpq, + Host::parse(&shard.listen_pg_addr)?, + shard.listen_pg_port, + ) + }; + anyhow::Ok(pageserver) }), ) .await?; @@ -1575,11 +1600,19 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res .get(endpoint_id.as_str()) .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?; let pageservers = if let Some(ps_id) = args.endpoint_pageserver_id { - let pageserver = PageServerNode::from_env(env, env.get_pageserver_conf(ps_id)?); - vec![( - pageserver.pg_connection_config.host().clone(), - pageserver.pg_connection_config.port(), - )] + let conf = env.get_pageserver_conf(ps_id)?; + // Use gRPC if requested. + let pageserver = if endpoint.grpc { + let grpc_addr = conf.listen_grpc_addr.as_ref().expect("bad config"); + let (host, port) = parse_host_port(grpc_addr)?; + let port = port.unwrap_or(DEFAULT_PAGESERVER_GRPC_PORT); + (PageserverProtocol::Grpc, host, port) + } else { + let (host, port) = parse_host_port(&conf.listen_pg_addr)?; + let port = port.unwrap_or(5432); + (PageserverProtocol::Libpq, host, port) + }; + vec![pageserver] } else { let storage_controller = StorageController::from_env(env); storage_controller @@ -1588,11 +1621,21 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res .shards .into_iter() .map(|shard| { - ( - Host::parse(&shard.listen_pg_addr) - .expect("Storage controller reported malformed host"), - shard.listen_pg_port, - ) + // Use gRPC if requested. + if endpoint.grpc { + ( + PageserverProtocol::Grpc, + Host::parse(&shard.listen_grpc_addr.expect("no gRPC address")) + .expect("bad hostname"), + shard.listen_grpc_port.expect("no gRPC port"), + ) + } else { + ( + PageserverProtocol::Libpq, + Host::parse(&shard.listen_pg_addr).expect("bad hostname"), + shard.listen_pg_port, + ) + } }) .collect::>() }; diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index bc3a3a4e38..03156c1809 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -37,6 +37,7 @@ //! ``` //! use std::collections::BTreeMap; +use std::fmt::Display; use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream}; use std::path::PathBuf; use std::process::Command; @@ -76,7 +77,6 @@ use utils::id::{NodeId, TenantId, TimelineId}; use crate::local_env::LocalEnv; use crate::postgresql_conf::PostgresConf; -use crate::storage_controller::StorageController; // contents of a endpoint.json file #[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)] @@ -89,6 +89,7 @@ pub struct EndpointConf { external_http_port: u16, internal_http_port: u16, pg_version: u32, + grpc: bool, skip_pg_catalog_updates: bool, reconfigure_concurrency: usize, drop_subscriptions_before_start: bool, @@ -192,6 +193,7 @@ impl ComputeControlPlane { internal_http_port: Option, pg_version: u32, mode: ComputeMode, + grpc: bool, skip_pg_catalog_updates: bool, drop_subscriptions_before_start: bool, ) -> Result> { @@ -226,6 +228,7 @@ impl ComputeControlPlane { // we also skip catalog updates in the cloud. skip_pg_catalog_updates, drop_subscriptions_before_start, + grpc, reconfigure_concurrency: 1, features: vec![], cluster: None, @@ -244,6 +247,7 @@ impl ComputeControlPlane { internal_http_port, pg_port, pg_version, + grpc, skip_pg_catalog_updates, drop_subscriptions_before_start, reconfigure_concurrency: 1, @@ -298,6 +302,8 @@ pub struct Endpoint { pub tenant_id: TenantId, pub timeline_id: TimelineId, pub mode: ComputeMode, + /// If true, the endpoint should use gRPC to communicate with Pageservers. + pub grpc: bool, // port and address of the Postgres server and `compute_ctl`'s HTTP APIs pub pg_address: SocketAddr, @@ -333,7 +339,7 @@ pub enum EndpointStatus { RunningNoPidfile, } -impl std::fmt::Display for EndpointStatus { +impl Display for EndpointStatus { fn fmt(&self, writer: &mut std::fmt::Formatter) -> std::fmt::Result { let s = match self { Self::Running => "running", @@ -345,6 +351,29 @@ impl std::fmt::Display for EndpointStatus { } } +/// Protocol used to connect to a Pageserver. +#[derive(Clone, Copy, Debug)] +pub enum PageserverProtocol { + Libpq, + Grpc, +} + +impl PageserverProtocol { + /// Returns the URL scheme for the protocol, used in connstrings. + pub fn scheme(&self) -> &'static str { + match self { + Self::Libpq => "postgresql", + Self::Grpc => "grpc", + } + } +} + +impl Display for PageserverProtocol { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(self.scheme()) + } +} + impl Endpoint { fn from_dir_entry(entry: std::fs::DirEntry, env: &LocalEnv) -> Result { if !entry.file_type()?.is_dir() { @@ -380,6 +409,7 @@ impl Endpoint { mode: conf.mode, tenant_id: conf.tenant_id, pg_version: conf.pg_version, + grpc: conf.grpc, skip_pg_catalog_updates: conf.skip_pg_catalog_updates, reconfigure_concurrency: conf.reconfigure_concurrency, drop_subscriptions_before_start: conf.drop_subscriptions_before_start, @@ -608,10 +638,10 @@ impl Endpoint { } } - fn build_pageserver_connstr(pageservers: &[(Host, u16)]) -> String { + fn build_pageserver_connstr(pageservers: &[(PageserverProtocol, Host, u16)]) -> String { pageservers .iter() - .map(|(host, port)| format!("postgresql://no_user@{host}:{port}")) + .map(|(scheme, host, port)| format!("{scheme}://no_user@{host}:{port}")) .collect::>() .join(",") } @@ -656,7 +686,7 @@ impl Endpoint { endpoint_storage_addr: String, safekeepers_generation: Option, safekeepers: Vec, - pageservers: Vec<(Host, u16)>, + pageservers: Vec<(PageserverProtocol, Host, u16)>, remote_ext_base_url: Option<&String>, shard_stripe_size: usize, create_test_user: bool, @@ -941,10 +971,12 @@ impl Endpoint { pub async fn reconfigure( &self, - mut pageservers: Vec<(Host, u16)>, + pageservers: Vec<(PageserverProtocol, Host, u16)>, stripe_size: Option, safekeepers: Option>, ) -> Result<()> { + anyhow::ensure!(!pageservers.is_empty(), "no pageservers provided"); + let (mut spec, compute_ctl_config) = { let config_path = self.endpoint_path().join("config.json"); let file = std::fs::File::open(config_path)?; @@ -956,25 +988,7 @@ impl Endpoint { let postgresql_conf = self.read_postgresql_conf()?; spec.cluster.postgresql_conf = Some(postgresql_conf); - // If we weren't given explicit pageservers, query the storage controller - if pageservers.is_empty() { - let storage_controller = StorageController::from_env(&self.env); - let locate_result = storage_controller.tenant_locate(self.tenant_id).await?; - pageservers = locate_result - .shards - .into_iter() - .map(|shard| { - ( - Host::parse(&shard.listen_pg_addr) - .expect("Storage controller reported bad hostname"), - shard.listen_pg_port, - ) - }) - .collect::>(); - } - let pageserver_connstr = Self::build_pageserver_connstr(&pageservers); - assert!(!pageserver_connstr.is_empty()); spec.pageserver_connstring = Some(pageserver_connstr); if stripe_size.is_some() { spec.shard_stripe_size = stripe_size.map(|s| s.0 as usize); diff --git a/storage_controller/src/compute_hook.rs b/storage_controller/src/compute_hook.rs index 57709302e1..a4482a4dac 100644 --- a/storage_controller/src/compute_hook.rs +++ b/storage_controller/src/compute_hook.rs @@ -5,10 +5,11 @@ use std::sync::Arc; use std::time::Duration; use anyhow::Context; -use control_plane::endpoint::{ComputeControlPlane, EndpointStatus}; +use control_plane::endpoint::{ComputeControlPlane, EndpointStatus, PageserverProtocol}; use control_plane::local_env::LocalEnv; use futures::StreamExt; use hyper::StatusCode; +use pageserver_api::config::DEFAULT_GRPC_LISTEN_PORT; use pageserver_api::controller_api::AvailabilityZone; use pageserver_api::shard::{ShardCount, ShardNumber, ShardStripeSize, TenantShardId}; use postgres_connection::parse_host_port; @@ -420,23 +421,31 @@ impl ComputeHook { preferred_az: _preferred_az, } = reconfigure_request; - let compute_pageservers = shards - .iter() - .map(|shard| { - let ps_conf = env - .get_pageserver_conf(shard.node_id) - .expect("Unknown pageserver"); - let (pg_host, pg_port) = parse_host_port(&ps_conf.listen_pg_addr) - .expect("Unable to parse listen_pg_addr"); - (pg_host, pg_port.unwrap_or(5432)) - }) - .collect::>(); - for (endpoint_name, endpoint) in &cplane.endpoints { if endpoint.tenant_id == *tenant_id && endpoint.status() == EndpointStatus::Running { - tracing::info!("Reconfiguring endpoint {}", endpoint_name,); + tracing::info!("Reconfiguring endpoint {endpoint_name}"); + + let pageservers = shards + .iter() + .map(|shard| { + let ps_conf = env + .get_pageserver_conf(shard.node_id) + .expect("Unknown pageserver"); + if endpoint.grpc { + let addr = ps_conf.listen_grpc_addr.as_ref().expect("no gRPC address"); + let (host, port) = parse_host_port(addr).expect("invalid gRPC address"); + let port = port.unwrap_or(DEFAULT_GRPC_LISTEN_PORT); + (PageserverProtocol::Grpc, host, port) + } else { + let (host, port) = parse_host_port(&ps_conf.listen_pg_addr) + .expect("Unable to parse listen_pg_addr"); + (PageserverProtocol::Libpq, host, port.unwrap_or(5432)) + } + }) + .collect::>(); + endpoint - .reconfigure(compute_pageservers.clone(), *stripe_size, None) + .reconfigure(pageservers, *stripe_size, None) .await .map_err(NotifyError::NeonLocal)?; }