diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 93233233c6..975abd196a 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -6,7 +6,8 @@ use compute_api::responses::{ LfcPrewarmState, TlsConfig, }; use compute_api::spec::{ - ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, PgIdent, + ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, + PageserverConnectionInfo, PageserverShardConnectionInfo, PgIdent, }; use futures::StreamExt; use futures::future::join_all; @@ -217,7 +218,7 @@ pub struct ParsedSpec { pub spec: ComputeSpec, pub tenant_id: TenantId, pub timeline_id: TimelineId, - pub pageserver_connstr: String, + pub pageserver_conninfo: PageserverConnectionInfo, pub safekeeper_connstrings: Vec, pub storage_auth_token: Option, pub endpoint_storage_addr: Option, @@ -264,6 +265,22 @@ impl ParsedSpec { } } +fn extract_pageserver_conninfo_from_guc(pageserver_connstring_guc: &str) -> PageserverConnectionInfo { + + PageserverConnectionInfo { + shards: pageserver_connstring_guc + .split(',') + .into_iter() + .enumerate() + .map(|(i, connstr)| (i as u32, PageserverShardConnectionInfo { + libpq_url: Some(connstr.to_string()), + grpc_url: None, + })) + .collect(), + prefer_grpc: false, + } +} + impl TryFrom for ParsedSpec { type Error = String; fn try_from(spec: ComputeSpec) -> Result { @@ -273,11 +290,17 @@ impl TryFrom for ParsedSpec { // For backwards-compatibility, the top-level fields in the spec file // may be empty. In that case, we need to dig them from the GUCs in the // cluster.settings field. - let pageserver_connstr = spec - .pageserver_connstring - .clone() - .or_else(|| spec.cluster.settings.find("neon.pageserver_connstring")) - .ok_or("pageserver connstr should be provided")?; + let pageserver_conninfo = match &spec.pageserver_connection_info { + Some(x) => x.clone(), + None => { + if let Some(guc) = spec.cluster.settings.find("neon.pageserver_connstring") { + extract_pageserver_conninfo_from_guc(&guc) + } else { + return Err("pageserver connstr should be provided".to_string()); + } + } + }; + let safekeeper_connstrings = if spec.safekeeper_connstrings.is_empty() { if matches!(spec.mode, ComputeMode::Primary) { spec.cluster @@ -330,7 +353,7 @@ impl TryFrom for ParsedSpec { let res = ParsedSpec { spec, - pageserver_connstr, + pageserver_conninfo, safekeeper_connstrings, storage_auth_token, tenant_id, @@ -1001,32 +1024,30 @@ impl ComputeNode { Ok(()) } - // Get basebackup from the libpq connection to pageserver using `connstr` and + // Get basebackup from the libpq connection to pageserver using `connstr` XXX and // unarchive it to `pgdata` directory overriding all its previous content. #[instrument(skip_all, fields(%lsn))] fn try_get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> { let spec = compute_state.pspec.as_ref().expect("spec must be set"); - let shard0_connstr = spec.pageserver_connstr.split(',').next().unwrap(); - match Url::parse(shard0_connstr)?.scheme() { - "postgres" | "postgresql" => self.try_get_basebackup_libpq(spec, lsn), - "grpc" => self.try_get_basebackup_grpc(spec, lsn), - scheme => return Err(anyhow!("unknown URL scheme {scheme}")), + if spec.pageserver_conninfo.prefer_grpc { + self.try_get_basebackup_grpc(spec, lsn)?; + } else { + self.try_get_basebackup_libpq(spec, lsn)?; } + Ok(()) } fn try_get_basebackup_grpc(&self, spec: &ParsedSpec, lsn: Lsn) -> Result<()> { let start_time = Instant::now(); - let shard0_connstr = spec - .pageserver_connstr - .split(',') - .next() - .unwrap() - .to_string(); + let shard0 = spec.pageserver_conninfo.shards.get(&0).expect("shard 0 connection info missing"); + let shard0_url = shard0.grpc_url.clone().expect("no grpc_url for shard 0"); + info!("getting basebackup@{} from pageserver {}", lsn, shard0_url); + let chunks = tokio::runtime::Handle::current().block_on(async move { - let mut client = page_api::proto::PageServiceClient::connect(shard0_connstr).await?; + let mut client = page_api::proto::PageServiceClient::connect(shard0_url.to_string()).await?; let req = page_api::proto::GetBaseBackupRequest { lsn: lsn.0, @@ -1077,8 +1098,11 @@ impl ComputeNode { fn try_get_basebackup_libpq(&self, spec: &ParsedSpec, lsn: Lsn) -> Result<()> { let start_time = Instant::now(); - let shard0_connstr = spec.pageserver_connstr.split(',').next().unwrap(); - let mut config = postgres::Config::from_str(shard0_connstr)?; + let shard0 = spec.pageserver_conninfo.shards.get(&0).expect("shard 0 connection info missing"); + let shard0_connstr = shard0.libpq_url.clone().expect("no libpq_url for shard 0"); + info!("getting basebackup@{} from pageserver {}", lsn, shard0_connstr); + + let mut config = postgres::Config::from_str(&shard0_connstr)?; // Use the storage auth token from the config file, if given. // Note: this overrides any password set in the connection string. @@ -1376,15 +1400,8 @@ impl ComputeNode { } }; - info!( - "getting basebackup@{} from pageserver {}", - lsn, &pspec.pageserver_connstr - ); self.get_basebackup(compute_state, lsn).with_context(|| { - format!( - "failed to get basebackup@{} from pageserver {}", - lsn, &pspec.pageserver_connstr - ) + format!("failed to get basebackup@{}", lsn) })?; // Update pg_hba.conf received with basebackup. @@ -2320,22 +2337,22 @@ LIMIT 100", /// The operation will time out after a specified duration. pub fn wait_timeout_while_pageserver_connstr_unchanged(&self, duration: Duration) { let state = self.state.lock().unwrap(); - let old_pageserver_connstr = state + let old_pageserver_conninfo = state .pspec .as_ref() .expect("spec must be set") - .pageserver_connstr + .pageserver_conninfo .clone(); let mut unchanged = true; let _ = self .state_changed .wait_timeout_while(state, duration, |s| { - let pageserver_connstr = &s + let pageserver_conninfo = &s .pspec .as_ref() .expect("spec must be set") - .pageserver_connstr; - unchanged = pageserver_connstr == &old_pageserver_connstr; + .pageserver_conninfo; + unchanged = pageserver_conninfo == &old_pageserver_conninfo; unchanged }) .unwrap(); diff --git a/compute_tools/src/config.rs b/compute_tools/src/config.rs index 933b30134f..c89febc38c 100644 --- a/compute_tools/src/config.rs +++ b/compute_tools/src/config.rs @@ -56,9 +56,42 @@ pub fn write_postgres_conf( // Add options for connecting to storage writeln!(file, "# Neon storage settings")?; - if let Some(s) = &spec.pageserver_connstring { - writeln!(file, "neon.pageserver_connstring={}", escape_conf_value(s))?; + + if let Some(conninfo) = &spec.pageserver_connection_info { + let mut libpq_urls: Option> = Some(Vec::new()); + let mut grpc_urls: Option> = Some(Vec::new()); + + for shardno in 0..conninfo.shards.len() { + let info = conninfo.shards.get(&(shardno as u32)) + .ok_or_else(|| anyhow::anyhow!("shard {shardno} missing from pageserver_connection_info shard map"))?; + + if let Some(url) = &info.libpq_url { + if let Some(ref mut urls) = libpq_urls { + urls.push(url.clone()); + } + } else { + libpq_urls = None + } + if let Some(url) = &info.grpc_url { + if let Some(ref mut urls) = grpc_urls { + urls.push(url.clone()); + } + } else { + grpc_urls = None + } + } + if let Some(libpq_urls) = libpq_urls { + writeln!(file, "neon.pageserver_connstring={}", escape_conf_value(&libpq_urls.join(",")))?; + } else { + writeln!(file, "# no neon.pageserver_connstring")?; + } + if let Some(grpc_urls) = grpc_urls { + writeln!(file, "neon.pageserver_grpc_urls={}", escape_conf_value(&grpc_urls.join(",")))?; + } else { + writeln!(file, "# no neon.pageserver_grpc_urls")?; + } } + if let Some(stripe_size) = spec.shard_stripe_size { writeln!(file, "neon.stripe_size={stripe_size}")?; } diff --git a/compute_tools/src/lsn_lease.rs b/compute_tools/src/lsn_lease.rs index b4ec675ff4..e9fae18262 100644 --- a/compute_tools/src/lsn_lease.rs +++ b/compute_tools/src/lsn_lease.rs @@ -81,11 +81,14 @@ fn acquire_lsn_lease_with_retry( let spec = state.pspec.as_ref().expect("spec must be set"); - let conn_strings = spec.pageserver_connstr.split(','); + spec.pageserver_conninfo.shards + .iter() + .map(|(_shardno, conninfo)| { + // FIXME: for now, this requires a libpq connection, the grpc API doesn't + // have a "lease" method. + let connstr = conninfo.libpq_url.as_ref().expect("missing libpq URL"); - conn_strings - .map(|connstr| { - let mut config = postgres::Config::from_str(connstr).expect("Invalid connstr"); + let mut config = postgres::Config::from_str(&connstr).expect("Invalid connstr"); if let Some(storage_auth_token) = &spec.storage_auth_token { config.password(storage_auth_token.clone()); } diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 21f55336aa..aef27046b2 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -16,9 +16,9 @@ use std::time::Duration; use anyhow::{Context, Result, anyhow, bail}; use clap::Parser; use compute_api::requests::ComputeClaimsScope; -use compute_api::spec::ComputeMode; +use compute_api::spec::{ComputeMode, PageserverShardConnectionInfo, PageserverConnectionInfo}; use control_plane::broker::StorageBroker; -use control_plane::endpoint::{ComputeControlPlane, EndpointTerminateMode, PageserverProtocol}; +use control_plane::endpoint::{ComputeControlPlane, EndpointTerminateMode}; use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_ADDR, EndpointStorage}; use control_plane::local_env; use control_plane::local_env::{ @@ -1504,28 +1504,34 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res )?; } - let (pageservers, stripe_size) = if let Some(pageserver_id) = pageserver_id { - let conf = env.get_pageserver_conf(pageserver_id).unwrap(); - // Use gRPC if requested. - let pageserver = if endpoint.grpc { - let grpc_addr = conf.listen_grpc_addr.as_ref().expect("bad config"); - let (host, port) = parse_host_port(grpc_addr)?; - let port = port.unwrap_or(DEFAULT_PAGESERVER_GRPC_PORT); - (PageserverProtocol::Grpc, host, port) - } else { + let (shards, stripe_size) = if let Some(ps_id) = pageserver_id { + let conf = env.get_pageserver_conf(ps_id).unwrap(); + let libpq_url = Some({ let (host, port) = parse_host_port(&conf.listen_pg_addr)?; let port = port.unwrap_or(5432); - (PageserverProtocol::Libpq, host, port) + format!("postgres://no_user@{host}:{port}") + }); + let grpc_url = if let Some(grpc_addr) = &conf.listen_grpc_addr { + let (host, port) = parse_host_port(grpc_addr)?; + let port = port.unwrap_or(DEFAULT_PAGESERVER_GRPC_PORT); + Some(format!("grpc://no_user@{host}:{port}")) + } else { + None }; + let pageserver = PageserverShardConnectionInfo { + libpq_url, + grpc_url, + }; + // If caller is telling us what pageserver to use, this is not a tenant which is // fully managed by storage controller, therefore not sharded. - (vec![pageserver], DEFAULT_STRIPE_SIZE) + (vec![(0, pageserver)], DEFAULT_STRIPE_SIZE) } else { // Look up the currently attached location of the tenant, and its striping metadata, // to pass these on to postgres. let storage_controller = StorageController::from_env(env); let locate_result = storage_controller.tenant_locate(endpoint.tenant_id).await?; - let pageservers = futures::future::try_join_all( + let shards = futures::future::try_join_all( locate_result.shards.into_iter().map(|shard| async move { if let ComputeMode::Static(lsn) = endpoint.mode { // Initialize LSN leases for static computes. @@ -1538,28 +1544,33 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res .await?; } - let pageserver = if endpoint.grpc { - ( - PageserverProtocol::Grpc, - Host::parse(&shard.listen_grpc_addr.expect("no gRPC address"))?, - shard.listen_grpc_port.expect("no gRPC port"), - ) + let libpq_host = Host::parse(&shard.listen_pg_addr)?; + let libpq_port = shard.listen_pg_port; + let libpq_url = Some(format!("postgres://no_user@{libpq_host}:{libpq_port}")); + + let grpc_url = if let Some(grpc_host) = shard.listen_grpc_addr { + let grpc_port = shard.listen_grpc_port.expect("no gRPC port"); + Some(format!("grpc://no_user@{grpc_host}:{grpc_port}")) } else { - ( - PageserverProtocol::Libpq, - Host::parse(&shard.listen_pg_addr)?, - shard.listen_pg_port, - ) + None }; - anyhow::Ok(pageserver) + let pageserver = PageserverShardConnectionInfo { + libpq_url, + grpc_url, + }; + anyhow::Ok((shard.shard_id.shard_number.0 as u32, pageserver)) }), ) .await?; let stripe_size = locate_result.shard_params.stripe_size; - (pageservers, stripe_size) + (shards, stripe_size) + }; + assert!(!shards.is_empty()); + let pageserver_conninfo = PageserverConnectionInfo { + shards: shards.into_iter().collect(), + prefer_grpc: endpoint.grpc, }; - assert!(!pageservers.is_empty()); let ps_conf = env.get_pageserver_conf(DEFAULT_PAGESERVER_ID)?; let auth_token = if matches!(ps_conf.pg_auth_type, AuthType::NeonJWT) { @@ -1591,7 +1602,7 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res endpoint_storage_addr, safekeepers_generation, safekeepers, - pageservers, + pageserver_conninfo, remote_ext_base_url.as_ref(), stripe_size.0 as usize, args.create_test_user, @@ -1606,20 +1617,27 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res .endpoints .get(endpoint_id.as_str()) .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?; - let pageservers = if let Some(ps_id) = args.endpoint_pageserver_id { + let shards = if let Some(ps_id) = args.endpoint_pageserver_id { let conf = env.get_pageserver_conf(ps_id)?; - // Use gRPC if requested. - let pageserver = if endpoint.grpc { - let grpc_addr = conf.listen_grpc_addr.as_ref().expect("bad config"); - let (host, port) = parse_host_port(grpc_addr)?; - let port = port.unwrap_or(DEFAULT_PAGESERVER_GRPC_PORT); - (PageserverProtocol::Grpc, host, port) - } else { + let libpq_url = Some({ let (host, port) = parse_host_port(&conf.listen_pg_addr)?; let port = port.unwrap_or(5432); - (PageserverProtocol::Libpq, host, port) + format!("postgres://no_user@{host}:{port}") + }); + let grpc_url = if let Some(grpc_addr) = &conf.listen_grpc_addr { + let (host, port) = parse_host_port(grpc_addr)?; + let port = port.unwrap_or(DEFAULT_PAGESERVER_GRPC_PORT); + Some(format!("grpc://no_user@{host}:{port}")) + } else { + None }; - vec![pageserver] + let pageserver = PageserverShardConnectionInfo { + libpq_url, + grpc_url, + }; + // If caller is telling us what pageserver to use, this is not a tenant which is + // fully managed by storage controller, therefore not sharded. + vec![(0, pageserver)] } else { let storage_controller = StorageController::from_env(env); storage_controller @@ -1629,27 +1647,31 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res .into_iter() .map(|shard| { // Use gRPC if requested. - if endpoint.grpc { - ( - PageserverProtocol::Grpc, - Host::parse(&shard.listen_grpc_addr.expect("no gRPC address")) - .expect("bad hostname"), - shard.listen_grpc_port.expect("no gRPC port"), - ) + let libpq_host = Host::parse(&shard.listen_pg_addr).expect("bad hostname"); + let libpq_port = shard.listen_pg_port; + let libpq_url = Some(format!("postgres://no_user@{libpq_host}:{libpq_port}")); + + let grpc_url = if let Some(grpc_host) = shard.listen_grpc_addr { + let grpc_port = shard.listen_grpc_port.expect("no gRPC port"); + Some(format!("grpc://no_user@{grpc_host}:{grpc_port}")) } else { - ( - PageserverProtocol::Libpq, - Host::parse(&shard.listen_pg_addr).expect("bad hostname"), - shard.listen_pg_port, - ) - } + None + }; + (shard.shard_id.shard_number.0 as u32, PageserverShardConnectionInfo { + libpq_url, + grpc_url, + }) }) .collect::>() }; + let pageserver_conninfo = PageserverConnectionInfo { + shards: shards.into_iter().collect(), + prefer_grpc: endpoint.grpc, + }; // If --safekeepers argument is given, use only the listed // safekeeper nodes; otherwise all from the env. let safekeepers = parse_safekeepers(&args.safekeepers)?; - endpoint.reconfigure(pageservers, None, safekeepers).await?; + endpoint.reconfigure(pageserver_conninfo, None, safekeepers).await?; } EndpointCmd::Stop(args) => { let endpoint_id = &args.endpoint_id; diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index ae81e7abbe..ac5e3d14bf 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -56,9 +56,13 @@ use compute_api::responses::{ TlsConfig, }; use compute_api::spec::{ - Cluster, ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, Database, PgIdent, - RemoteExtSpec, Role, + Cluster, ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, Database, + PgIdent, RemoteExtSpec, Role, }; + +// re-export these, because they're used in the reconfigure() function +pub use compute_api::spec::{PageserverConnectionInfo, PageserverShardConnectionInfo}; + use jsonwebtoken::jwk::{ AlgorithmParameters, CommonParameters, EllipticCurve, Jwk, JwkSet, KeyAlgorithm, KeyOperations, OctetKeyPairParameters, OctetKeyPairType, PublicKeyUse, @@ -73,7 +77,6 @@ use sha2::{Digest, Sha256}; use spki::der::Decode; use spki::{SubjectPublicKeyInfo, SubjectPublicKeyInfoRef}; use tracing::debug; -use url::Host; use utils::id::{NodeId, TenantId, TimelineId}; use crate::local_env::LocalEnv; @@ -659,14 +662,6 @@ impl Endpoint { } } - fn build_pageserver_connstr(pageservers: &[(PageserverProtocol, Host, u16)]) -> String { - pageservers - .iter() - .map(|(scheme, host, port)| format!("{scheme}://no_user@{host}:{port}")) - .collect::>() - .join(",") - } - /// Map safekeepers ids to the actual connection strings. fn build_safekeepers_connstrs(&self, sk_ids: Vec) -> Result> { let mut safekeeper_connstrings = Vec::new(); @@ -707,7 +702,7 @@ impl Endpoint { endpoint_storage_addr: String, safekeepers_generation: Option, safekeepers: Vec, - pageservers: Vec<(PageserverProtocol, Host, u16)>, + pageserver_conninfo: PageserverConnectionInfo, remote_ext_base_url: Option<&String>, shard_stripe_size: usize, create_test_user: bool, @@ -726,9 +721,6 @@ impl Endpoint { std::fs::remove_dir_all(self.pgdata())?; } - let pageserver_connstring = Self::build_pageserver_connstr(&pageservers); - assert!(!pageserver_connstring.is_empty()); - let safekeeper_connstrings = self.build_safekeepers_connstrs(safekeepers)?; // check for file remote_extensions_spec.json @@ -787,7 +779,7 @@ impl Endpoint { branch_id: None, endpoint_id: Some(self.endpoint_id.clone()), mode: self.mode, - pageserver_connstring: Some(pageserver_connstring), + pageserver_connection_info: Some(pageserver_conninfo), safekeepers_generation: safekeepers_generation.map(|g| g.into_inner()), safekeeper_connstrings, storage_auth_token: auth_token.clone(), @@ -997,11 +989,11 @@ impl Endpoint { pub async fn reconfigure( &self, - pageservers: Vec<(PageserverProtocol, Host, u16)>, + pageserver_conninfo: PageserverConnectionInfo, stripe_size: Option, safekeepers: Option>, ) -> Result<()> { - anyhow::ensure!(!pageservers.is_empty(), "no pageservers provided"); + anyhow::ensure!(!pageserver_conninfo.shards.is_empty(), "no pageservers provided"); let (mut spec, compute_ctl_config) = { let config_path = self.endpoint_path().join("config.json"); @@ -1014,8 +1006,7 @@ impl Endpoint { let postgresql_conf = self.read_postgresql_conf()?; spec.cluster.postgresql_conf = Some(postgresql_conf); - let pageserver_connstr = Self::build_pageserver_connstr(&pageservers); - spec.pageserver_connstring = Some(pageserver_connstr); + spec.pageserver_connection_info = Some(pageserver_conninfo); if stripe_size.is_some() { spec.shard_stripe_size = stripe_size.map(|s| s.0 as usize); } diff --git a/libs/compute_api/src/spec.rs b/libs/compute_api/src/spec.rs index c930c365bf..ffefc12250 100644 --- a/libs/compute_api/src/spec.rs +++ b/libs/compute_api/src/spec.rs @@ -103,7 +103,11 @@ pub struct ComputeSpec { // updated to fill these fields, we can make these non optional. pub tenant_id: Option, pub timeline_id: Option, - pub pageserver_connstring: Option, + + // Pageserver information can be passed in two different ways: + // 1. Here + // 2. in cluster.settings. This is legacy, we are switching to method 1. + pub pageserver_connection_info: Option, // More neon ids that we expose to the compute_ctl // and to postgres as neon extension GUCs. @@ -203,6 +207,20 @@ pub enum ComputeFeature { UnknownFeature, } +/// Feature flag to signal `compute_ctl` to enable certain experimental functionality. +#[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)] +pub struct PageserverConnectionInfo { + pub shards: HashMap, + + pub prefer_grpc: bool, +} + +#[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)] +pub struct PageserverShardConnectionInfo { + pub libpq_url: Option, + pub grpc_url: Option, +} + #[derive(Clone, Debug, Default, Deserialize, Serialize)] pub struct RemoteExtSpec { pub public_extensions: Option>, diff --git a/pageserver/client_grpc/src/request_tracker.rs b/pageserver/client_grpc/src/request_tracker.rs index f7d2558849..8d5b77ed5a 100644 --- a/pageserver/client_grpc/src/request_tracker.rs +++ b/pageserver/client_grpc/src/request_tracker.rs @@ -204,7 +204,7 @@ impl PooledItemFactory for StreamFactory { #[derive(Clone)] pub struct RequestTracker { - cur_id: Arc, + _cur_id: Arc, stream_pool: Arc>, unary_pool: Arc>, auth_interceptor: AuthInterceptor, @@ -220,7 +220,7 @@ impl RequestTracker { let cur_id = Arc::new(AtomicU64::new(0)); RequestTracker { - cur_id: cur_id.clone(), + _cur_id: cur_id.clone(), stream_pool: stream_pool, unary_pool: unary_pool, auth_interceptor: auth_interceptor, diff --git a/pgxn/neon/communicator/src/worker_process/main_loop.rs b/pgxn/neon/communicator/src/worker_process/main_loop.rs index 45b5635682..573e391262 100644 --- a/pgxn/neon/communicator/src/worker_process/main_loop.rs +++ b/pgxn/neon/communicator/src/worker_process/main_loop.rs @@ -70,7 +70,7 @@ pub(super) async fn init( tenant_id: String, timeline_id: String, auth_token: Option, - mut shard_map: HashMap, + shard_map: HashMap, initial_file_cache_size: u64, file_cache_path: Option, ) -> CommunicatorWorkerProcessStruct<'static> { @@ -87,12 +87,6 @@ pub(super) async fn init( ) }; - // TODO: for now, just hack in the gRPC port number. This needs to be plumbed through. - for connstr in shard_map.values_mut() { - *connstr = connstr.replace(":64000", ":51051"); - } - tracing::warn!("mangled connstrings to use gRPC port 51051 shard_map={shard_map:?}"); - // Initialize subsystems let cache = cis .integrated_cache_init_struct diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index 902f471dd9..c68f25fe80 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -69,7 +69,8 @@ char *neon_project_id; char *neon_branch_id; char *neon_endpoint_id; int32 max_cluster_size; -char *page_server_connstring; +char *pageserver_connstring; +char *pageserver_grpc_urls; char *neon_auth_token; int readahead_buffer_size = 128; @@ -177,6 +178,8 @@ static bool pageserver_flush(shardno_t shard_no); static void pageserver_disconnect(shardno_t shard_no); static void pageserver_disconnect_shard(shardno_t shard_no); +static void AssignShardMap(const char *newval); + static bool PagestoreShmemIsValid(void) { @@ -239,6 +242,7 @@ ParseShardMap(const char *connstr, ShardMap *result) return true; } +/* GUC hooks for neon.pageserver_connstring */ static bool CheckPageserverConnstring(char **newval, void **extra, GucSource source) { @@ -249,6 +253,45 @@ CheckPageserverConnstring(char **newval, void **extra, GucSource source) static void AssignPageserverConnstring(const char *newval, void *extra) +{ + /* + * 'neon.pageserver_connstring' is ignored if the new communicator is used. + * In that case, the shard map is loaded from 'neon.pageserver_grpc_urls' + * instead. + */ + if (neon_enable_new_communicator) + return; + + AssignShardMap(newval); +} + + +/* GUC hooks for neon.pageserver_connstring */ +static bool +CheckPageserverGrpcUrls(char **newval, void **extra, GucSource source) +{ + char *p = *newval; + + return ParseShardMap(p, NULL); +} + +static void +AssignPageserverGrpcUrls(const char *newval, void *extra) +{ + /* + * 'neon.pageserver_grpc-urls' is ignored if the new communicator is not + * used. In that case, the shard map is loaded from 'neon.pageserver_connstring' + instead. + */ + if (!neon_enable_new_communicator) + return; + + AssignShardMap(newval); +} + + +static void +AssignShardMap(const char *newval) { ShardMap shard_map; @@ -262,7 +305,7 @@ AssignPageserverConnstring(const char *newval, void *extra) { /* * shouldn't happen, because we already checked the value in - * CheckPageserverConnstring + * CheckPageserverConnstring/CheckPageserverGrpcUrls */ elog(ERROR, "could not parse shard map"); } @@ -1352,7 +1395,8 @@ PagestoreShmemInit(void) pg_atomic_init_u64(&pagestore_shared->begin_update_counter, 0); pg_atomic_init_u64(&pagestore_shared->end_update_counter, 0); memset(&pagestore_shared->shard_map, 0, sizeof(ShardMap)); - AssignPageserverConnstring(page_server_connstring, NULL); + AssignPageserverConnstring(pageserver_connstring, NULL); + AssignPageserverGrpcUrls(pageserver_grpc_urls, NULL); } NeonPerfCountersShmemInit(); @@ -1405,12 +1449,21 @@ pg_init_libpagestore(void) DefineCustomStringVariable("neon.pageserver_connstring", "connection string to the page server", NULL, - &page_server_connstring, + &pageserver_connstring, "", PGC_SIGHUP, 0, /* no flags required */ CheckPageserverConnstring, AssignPageserverConnstring, NULL); + DefineCustomStringVariable("neon.pageserver_grpc_urls", + "list of gRPC URLs for the page servers", + NULL, + &pageserver_grpc_urls, + "", + PGC_SIGHUP, + 0, /* no flags required */ + CheckPageserverGrpcUrls, AssignPageserverGrpcUrls, NULL); + DefineCustomStringVariable("neon.timeline_id", "Neon timeline_id the server is running on", NULL, @@ -1568,7 +1621,7 @@ pg_init_libpagestore(void) if (neon_auth_token) neon_log(LOG, "using storage auth token from NEON_AUTH_TOKEN environment variable"); - if (page_server_connstring && page_server_connstring[0]) + if (pageserver_connstring[0] || pageserver_connstring[0]) { neon_log(PageStoreTrace, "set neon_smgr hook"); smgr_hook = smgr_neon; diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index e6b8486248..ef08e57a3d 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -220,7 +220,8 @@ extern void prefetch_on_ps_disconnect(void); extern page_server_api *page_server; -extern char *page_server_connstring; +extern char *pageserver_connstring; +extern char *pageserver_grpc_urls; extern int flush_every_n_requests; extern int readahead_buffer_size; extern char *neon_timeline; diff --git a/storage_controller/src/compute_hook.rs b/storage_controller/src/compute_hook.rs index a4482a4dac..9095542c5f 100644 --- a/storage_controller/src/compute_hook.rs +++ b/storage_controller/src/compute_hook.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use std::time::Duration; use anyhow::Context; -use control_plane::endpoint::{ComputeControlPlane, EndpointStatus, PageserverProtocol}; +use control_plane::endpoint::{ComputeControlPlane, EndpointStatus, PageserverConnectionInfo, PageserverShardConnectionInfo}; use control_plane::local_env::LocalEnv; use futures::StreamExt; use hyper::StatusCode; @@ -425,27 +425,40 @@ impl ComputeHook { if endpoint.tenant_id == *tenant_id && endpoint.status() == EndpointStatus::Running { tracing::info!("Reconfiguring endpoint {endpoint_name}"); - let pageservers = shards - .iter() - .map(|shard| { - let ps_conf = env - .get_pageserver_conf(shard.node_id) - .expect("Unknown pageserver"); - if endpoint.grpc { - let addr = ps_conf.listen_grpc_addr.as_ref().expect("no gRPC address"); - let (host, port) = parse_host_port(addr).expect("invalid gRPC address"); - let port = port.unwrap_or(DEFAULT_GRPC_LISTEN_PORT); - (PageserverProtocol::Grpc, host, port) - } else { - let (host, port) = parse_host_port(&ps_conf.listen_pg_addr) - .expect("Unable to parse listen_pg_addr"); - (PageserverProtocol::Libpq, host, port.unwrap_or(5432)) - } - }) - .collect::>(); + let mut shard_conninfos = HashMap::new(); + for shard in shards.iter() { + let ps_conf = env + .get_pageserver_conf(shard.node_id) + .expect("Unknown pageserver"); + + let libpq_url = Some({ + let (host, port) = parse_host_port(&ps_conf.listen_pg_addr) + .expect("Unable to parse listen_pg_addr"); + let port = port.unwrap_or(5432); + format!("postgres://no_user@{host}:{port}") + }); + let grpc_url = if let Some(grpc_addr) = &ps_conf.listen_grpc_addr { + let (host, port) = parse_host_port(grpc_addr) + .expect("invalid gRPC address"); + let port = port.unwrap_or(DEFAULT_GRPC_LISTEN_PORT); + Some(format!("grpc://no_user@{host}:{port}")) + } else { + None + }; + let pageserver = PageserverShardConnectionInfo { + libpq_url, + grpc_url, + }; + shard_conninfos.insert(shard.shard_number.0 as u32, pageserver); + } + + let pageserver_conninfo = PageserverConnectionInfo { + shards: shard_conninfos, + prefer_grpc: endpoint.grpc, + }; endpoint - .reconfigure(pageservers, *stripe_size, None) + .reconfigure(pageserver_conninfo, *stripe_size, None) .await .map_err(NotifyError::NeonLocal)?; } diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index a1340b2a44..b9f7d95cc7 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -4258,7 +4258,13 @@ class Endpoint(PgProtocol, LogUtils): # If gRPC is enabled, use the new communicator too. # # NB: the communicator is enabled by default, so force it to false otherwise. - config_lines += [f"neon.enable_new_communicator={str(grpc).lower()}"] + # + # XXX: By checking for None, we enable the new communicator for all tests + # by default + if grpc or grpc is None: + config_lines += [f"neon.enable_new_communicator=on"] + else: + config_lines += [f"neon.enable_new_communicator=off"] # Delete file cache if it exists (and we're recreating the endpoint) if USE_LFC: