mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-09 14:32:57 +00:00
neon_local: support gRPC connstrings for endpoints (#12271)
## Problem `neon_local` should support endpoints using gRPC, by providing `grpc://` connstrings with the Pageservers' gRPC ports. Requires #12268. Touches #11926. ## Summary of changes * Add `--grpc` switch for `neon_local endpoint create`. * Generate `grpc://` connstrings for endpoints when enabled. Computes don't actually support `grpc://` connstrings yet, but will soon. gRPC is configured when the endpoint is created, not when it's started, such that it continues to use gRPC across restarts and reconfigurations. In particular, this is necessary for the storage controller's local notify hook, which can't easily plumb through gRPC configuration from the start/reconfigure commands but has access to the endpoint's configuration.
This commit is contained in:
@@ -5,10 +5,11 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Context;
|
||||
use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
|
||||
use control_plane::endpoint::{ComputeControlPlane, EndpointStatus, PageserverProtocol};
|
||||
use control_plane::local_env::LocalEnv;
|
||||
use futures::StreamExt;
|
||||
use hyper::StatusCode;
|
||||
use pageserver_api::config::DEFAULT_GRPC_LISTEN_PORT;
|
||||
use pageserver_api::controller_api::AvailabilityZone;
|
||||
use pageserver_api::shard::{ShardCount, ShardNumber, ShardStripeSize, TenantShardId};
|
||||
use postgres_connection::parse_host_port;
|
||||
@@ -420,23 +421,31 @@ impl ComputeHook {
|
||||
preferred_az: _preferred_az,
|
||||
} = reconfigure_request;
|
||||
|
||||
let compute_pageservers = shards
|
||||
.iter()
|
||||
.map(|shard| {
|
||||
let ps_conf = env
|
||||
.get_pageserver_conf(shard.node_id)
|
||||
.expect("Unknown pageserver");
|
||||
let (pg_host, pg_port) = parse_host_port(&ps_conf.listen_pg_addr)
|
||||
.expect("Unable to parse listen_pg_addr");
|
||||
(pg_host, pg_port.unwrap_or(5432))
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for (endpoint_name, endpoint) in &cplane.endpoints {
|
||||
if endpoint.tenant_id == *tenant_id && endpoint.status() == EndpointStatus::Running {
|
||||
tracing::info!("Reconfiguring endpoint {}", endpoint_name,);
|
||||
tracing::info!("Reconfiguring endpoint {endpoint_name}");
|
||||
|
||||
let pageservers = shards
|
||||
.iter()
|
||||
.map(|shard| {
|
||||
let ps_conf = env
|
||||
.get_pageserver_conf(shard.node_id)
|
||||
.expect("Unknown pageserver");
|
||||
if endpoint.grpc {
|
||||
let addr = ps_conf.listen_grpc_addr.as_ref().expect("no gRPC address");
|
||||
let (host, port) = parse_host_port(addr).expect("invalid gRPC address");
|
||||
let port = port.unwrap_or(DEFAULT_GRPC_LISTEN_PORT);
|
||||
(PageserverProtocol::Grpc, host, port)
|
||||
} else {
|
||||
let (host, port) = parse_host_port(&ps_conf.listen_pg_addr)
|
||||
.expect("Unable to parse listen_pg_addr");
|
||||
(PageserverProtocol::Libpq, host, port.unwrap_or(5432))
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
endpoint
|
||||
.reconfigure(compute_pageservers.clone(), *stripe_size, None)
|
||||
.reconfigure(pageservers, *stripe_size, None)
|
||||
.await
|
||||
.map_err(NotifyError::NeonLocal)?;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user