Plumb through both libpq and grpc connection strings to the compute

Add a new 'pageserver_connection_info' field in the compute spec. It
replaces the old 'pageserver_connstring' field with a more complicated
struct that includes both libpq and grpc URLs, for each shard (or only
one of the the URLs, depending on the configuration). It also includes
a flag suggesting which one to use; compute_ctl now uses it to decide
which protocol to use for the basebackup.

This is compatible with everything that's in production, because the
control plane never used the 'pageserver_connstring' field. That was
added a long time ago with the idea that it would replace the code
that digs the 'neon.pageserver_connstring' GUC from the list of
Postgres settings, but we never got around to do that in the control
plane. Hence, it was only used with neon_local. But the plan now is to
pass the 'pageserver_connection_info' from the control plane, and once
that's fully deployed everywhere, the code to parse
'neon.pageserver_connstring' in compute_ctl can be removed.

The 'grpc' flag on an endpoint in endpoint config is now more of a
suggestion. Compute_ctl gets both URLs, so it can choose to use libpq
or grpc as it wishes. It currently always obeys the 'prefer_grpc' flag
that's part of the connection info though. Postgres however uses grpc
iff the new rust-based communicator is enabled.

TODO/plan for the control plane:

- Start to pass `pageserver_connection_info` in the spec file.
- Also keep the current `neon.pageserver_connstring` setting for now,
  for backwards compatibility with old computes

After that, the `pageserver_connection_info.prefer_grpc` flag in the
spec file can be used to control whether compute_ctl uses grpc or
libpq.  The actual compute's grpc usage will be controlled by the
`neon.enable_new_communicator` GUC. It can be set separately from
'prefer_grpc'.

Later:

- Once all old computes are gone, remove the code to pass
  `neon.pageserver_connstring`
This commit is contained in:
Heikki Linnakangas
2025-06-29 18:16:49 +03:00
parent 8c122a1c98
commit a352d290eb
12 changed files with 303 additions and 152 deletions

View File

@@ -6,7 +6,8 @@ use compute_api::responses::{
LfcPrewarmState, TlsConfig,
};
use compute_api::spec::{
ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, PgIdent,
ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, ExtVersion,
PageserverConnectionInfo, PageserverShardConnectionInfo, PgIdent,
};
use futures::StreamExt;
use futures::future::join_all;
@@ -217,7 +218,7 @@ pub struct ParsedSpec {
pub spec: ComputeSpec,
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub pageserver_connstr: String,
pub pageserver_conninfo: PageserverConnectionInfo,
pub safekeeper_connstrings: Vec<String>,
pub storage_auth_token: Option<String>,
pub endpoint_storage_addr: Option<SocketAddr>,
@@ -264,6 +265,22 @@ impl ParsedSpec {
}
}
fn extract_pageserver_conninfo_from_guc(pageserver_connstring_guc: &str) -> PageserverConnectionInfo {
PageserverConnectionInfo {
shards: pageserver_connstring_guc
.split(',')
.into_iter()
.enumerate()
.map(|(i, connstr)| (i as u32, PageserverShardConnectionInfo {
libpq_url: Some(connstr.to_string()),
grpc_url: None,
}))
.collect(),
prefer_grpc: false,
}
}
impl TryFrom<ComputeSpec> for ParsedSpec {
type Error = String;
fn try_from(spec: ComputeSpec) -> Result<Self, String> {
@@ -273,11 +290,17 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
// For backwards-compatibility, the top-level fields in the spec file
// may be empty. In that case, we need to dig them from the GUCs in the
// cluster.settings field.
let pageserver_connstr = spec
.pageserver_connstring
.clone()
.or_else(|| spec.cluster.settings.find("neon.pageserver_connstring"))
.ok_or("pageserver connstr should be provided")?;
let pageserver_conninfo = match &spec.pageserver_connection_info {
Some(x) => x.clone(),
None => {
if let Some(guc) = spec.cluster.settings.find("neon.pageserver_connstring") {
extract_pageserver_conninfo_from_guc(&guc)
} else {
return Err("pageserver connstr should be provided".to_string());
}
}
};
let safekeeper_connstrings = if spec.safekeeper_connstrings.is_empty() {
if matches!(spec.mode, ComputeMode::Primary) {
spec.cluster
@@ -330,7 +353,7 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
let res = ParsedSpec {
spec,
pageserver_connstr,
pageserver_conninfo,
safekeeper_connstrings,
storage_auth_token,
tenant_id,
@@ -1001,32 +1024,30 @@ impl ComputeNode {
Ok(())
}
// Get basebackup from the libpq connection to pageserver using `connstr` and
// Get basebackup from the libpq connection to pageserver using `connstr` XXX and
// unarchive it to `pgdata` directory overriding all its previous content.
#[instrument(skip_all, fields(%lsn))]
fn try_get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
let spec = compute_state.pspec.as_ref().expect("spec must be set");
let shard0_connstr = spec.pageserver_connstr.split(',').next().unwrap();
match Url::parse(shard0_connstr)?.scheme() {
"postgres" | "postgresql" => self.try_get_basebackup_libpq(spec, lsn),
"grpc" => self.try_get_basebackup_grpc(spec, lsn),
scheme => return Err(anyhow!("unknown URL scheme {scheme}")),
if spec.pageserver_conninfo.prefer_grpc {
self.try_get_basebackup_grpc(spec, lsn)?;
} else {
self.try_get_basebackup_libpq(spec, lsn)?;
}
Ok(())
}
fn try_get_basebackup_grpc(&self, spec: &ParsedSpec, lsn: Lsn) -> Result<()> {
let start_time = Instant::now();
let shard0_connstr = spec
.pageserver_connstr
.split(',')
.next()
.unwrap()
.to_string();
let shard0 = spec.pageserver_conninfo.shards.get(&0).expect("shard 0 connection info missing");
let shard0_url = shard0.grpc_url.clone().expect("no grpc_url for shard 0");
info!("getting basebackup@{} from pageserver {}", lsn, shard0_url);
let chunks = tokio::runtime::Handle::current().block_on(async move {
let mut client = page_api::proto::PageServiceClient::connect(shard0_connstr).await?;
let mut client = page_api::proto::PageServiceClient::connect(shard0_url.to_string()).await?;
let req = page_api::proto::GetBaseBackupRequest {
lsn: lsn.0,
@@ -1077,8 +1098,11 @@ impl ComputeNode {
fn try_get_basebackup_libpq(&self, spec: &ParsedSpec, lsn: Lsn) -> Result<()> {
let start_time = Instant::now();
let shard0_connstr = spec.pageserver_connstr.split(',').next().unwrap();
let mut config = postgres::Config::from_str(shard0_connstr)?;
let shard0 = spec.pageserver_conninfo.shards.get(&0).expect("shard 0 connection info missing");
let shard0_connstr = shard0.libpq_url.clone().expect("no libpq_url for shard 0");
info!("getting basebackup@{} from pageserver {}", lsn, shard0_connstr);
let mut config = postgres::Config::from_str(&shard0_connstr)?;
// Use the storage auth token from the config file, if given.
// Note: this overrides any password set in the connection string.
@@ -1376,15 +1400,8 @@ impl ComputeNode {
}
};
info!(
"getting basebackup@{} from pageserver {}",
lsn, &pspec.pageserver_connstr
);
self.get_basebackup(compute_state, lsn).with_context(|| {
format!(
"failed to get basebackup@{} from pageserver {}",
lsn, &pspec.pageserver_connstr
)
format!("failed to get basebackup@{}", lsn)
})?;
// Update pg_hba.conf received with basebackup.
@@ -2320,22 +2337,22 @@ LIMIT 100",
/// The operation will time out after a specified duration.
pub fn wait_timeout_while_pageserver_connstr_unchanged(&self, duration: Duration) {
let state = self.state.lock().unwrap();
let old_pageserver_connstr = state
let old_pageserver_conninfo = state
.pspec
.as_ref()
.expect("spec must be set")
.pageserver_connstr
.pageserver_conninfo
.clone();
let mut unchanged = true;
let _ = self
.state_changed
.wait_timeout_while(state, duration, |s| {
let pageserver_connstr = &s
let pageserver_conninfo = &s
.pspec
.as_ref()
.expect("spec must be set")
.pageserver_connstr;
unchanged = pageserver_connstr == &old_pageserver_connstr;
.pageserver_conninfo;
unchanged = pageserver_conninfo == &old_pageserver_conninfo;
unchanged
})
.unwrap();

View File

@@ -56,9 +56,42 @@ pub fn write_postgres_conf(
// Add options for connecting to storage
writeln!(file, "# Neon storage settings")?;
if let Some(s) = &spec.pageserver_connstring {
writeln!(file, "neon.pageserver_connstring={}", escape_conf_value(s))?;
if let Some(conninfo) = &spec.pageserver_connection_info {
let mut libpq_urls: Option<Vec<String>> = Some(Vec::new());
let mut grpc_urls: Option<Vec<String>> = Some(Vec::new());
for shardno in 0..conninfo.shards.len() {
let info = conninfo.shards.get(&(shardno as u32))
.ok_or_else(|| anyhow::anyhow!("shard {shardno} missing from pageserver_connection_info shard map"))?;
if let Some(url) = &info.libpq_url {
if let Some(ref mut urls) = libpq_urls {
urls.push(url.clone());
}
} else {
libpq_urls = None
}
if let Some(url) = &info.grpc_url {
if let Some(ref mut urls) = grpc_urls {
urls.push(url.clone());
}
} else {
grpc_urls = None
}
}
if let Some(libpq_urls) = libpq_urls {
writeln!(file, "neon.pageserver_connstring={}", escape_conf_value(&libpq_urls.join(",")))?;
} else {
writeln!(file, "# no neon.pageserver_connstring")?;
}
if let Some(grpc_urls) = grpc_urls {
writeln!(file, "neon.pageserver_grpc_urls={}", escape_conf_value(&grpc_urls.join(",")))?;
} else {
writeln!(file, "# no neon.pageserver_grpc_urls")?;
}
}
if let Some(stripe_size) = spec.shard_stripe_size {
writeln!(file, "neon.stripe_size={stripe_size}")?;
}

View File

@@ -81,11 +81,14 @@ fn acquire_lsn_lease_with_retry(
let spec = state.pspec.as_ref().expect("spec must be set");
let conn_strings = spec.pageserver_connstr.split(',');
spec.pageserver_conninfo.shards
.iter()
.map(|(_shardno, conninfo)| {
// FIXME: for now, this requires a libpq connection, the grpc API doesn't
// have a "lease" method.
let connstr = conninfo.libpq_url.as_ref().expect("missing libpq URL");
conn_strings
.map(|connstr| {
let mut config = postgres::Config::from_str(connstr).expect("Invalid connstr");
let mut config = postgres::Config::from_str(&connstr).expect("Invalid connstr");
if let Some(storage_auth_token) = &spec.storage_auth_token {
config.password(storage_auth_token.clone());
}

View File

@@ -16,9 +16,9 @@ use std::time::Duration;
use anyhow::{Context, Result, anyhow, bail};
use clap::Parser;
use compute_api::requests::ComputeClaimsScope;
use compute_api::spec::ComputeMode;
use compute_api::spec::{ComputeMode, PageserverShardConnectionInfo, PageserverConnectionInfo};
use control_plane::broker::StorageBroker;
use control_plane::endpoint::{ComputeControlPlane, EndpointTerminateMode, PageserverProtocol};
use control_plane::endpoint::{ComputeControlPlane, EndpointTerminateMode};
use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_ADDR, EndpointStorage};
use control_plane::local_env;
use control_plane::local_env::{
@@ -1504,28 +1504,34 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
)?;
}
let (pageservers, stripe_size) = if let Some(pageserver_id) = pageserver_id {
let conf = env.get_pageserver_conf(pageserver_id).unwrap();
// Use gRPC if requested.
let pageserver = if endpoint.grpc {
let grpc_addr = conf.listen_grpc_addr.as_ref().expect("bad config");
let (host, port) = parse_host_port(grpc_addr)?;
let port = port.unwrap_or(DEFAULT_PAGESERVER_GRPC_PORT);
(PageserverProtocol::Grpc, host, port)
} else {
let (shards, stripe_size) = if let Some(ps_id) = pageserver_id {
let conf = env.get_pageserver_conf(ps_id).unwrap();
let libpq_url = Some({
let (host, port) = parse_host_port(&conf.listen_pg_addr)?;
let port = port.unwrap_or(5432);
(PageserverProtocol::Libpq, host, port)
format!("postgres://no_user@{host}:{port}")
});
let grpc_url = if let Some(grpc_addr) = &conf.listen_grpc_addr {
let (host, port) = parse_host_port(grpc_addr)?;
let port = port.unwrap_or(DEFAULT_PAGESERVER_GRPC_PORT);
Some(format!("grpc://no_user@{host}:{port}"))
} else {
None
};
let pageserver = PageserverShardConnectionInfo {
libpq_url,
grpc_url,
};
// If caller is telling us what pageserver to use, this is not a tenant which is
// fully managed by storage controller, therefore not sharded.
(vec![pageserver], DEFAULT_STRIPE_SIZE)
(vec![(0, pageserver)], DEFAULT_STRIPE_SIZE)
} else {
// Look up the currently attached location of the tenant, and its striping metadata,
// to pass these on to postgres.
let storage_controller = StorageController::from_env(env);
let locate_result = storage_controller.tenant_locate(endpoint.tenant_id).await?;
let pageservers = futures::future::try_join_all(
let shards = futures::future::try_join_all(
locate_result.shards.into_iter().map(|shard| async move {
if let ComputeMode::Static(lsn) = endpoint.mode {
// Initialize LSN leases for static computes.
@@ -1538,28 +1544,33 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
.await?;
}
let pageserver = if endpoint.grpc {
(
PageserverProtocol::Grpc,
Host::parse(&shard.listen_grpc_addr.expect("no gRPC address"))?,
shard.listen_grpc_port.expect("no gRPC port"),
)
let libpq_host = Host::parse(&shard.listen_pg_addr)?;
let libpq_port = shard.listen_pg_port;
let libpq_url = Some(format!("postgres://no_user@{libpq_host}:{libpq_port}"));
let grpc_url = if let Some(grpc_host) = shard.listen_grpc_addr {
let grpc_port = shard.listen_grpc_port.expect("no gRPC port");
Some(format!("grpc://no_user@{grpc_host}:{grpc_port}"))
} else {
(
PageserverProtocol::Libpq,
Host::parse(&shard.listen_pg_addr)?,
shard.listen_pg_port,
)
None
};
anyhow::Ok(pageserver)
let pageserver = PageserverShardConnectionInfo {
libpq_url,
grpc_url,
};
anyhow::Ok((shard.shard_id.shard_number.0 as u32, pageserver))
}),
)
.await?;
let stripe_size = locate_result.shard_params.stripe_size;
(pageservers, stripe_size)
(shards, stripe_size)
};
assert!(!shards.is_empty());
let pageserver_conninfo = PageserverConnectionInfo {
shards: shards.into_iter().collect(),
prefer_grpc: endpoint.grpc,
};
assert!(!pageservers.is_empty());
let ps_conf = env.get_pageserver_conf(DEFAULT_PAGESERVER_ID)?;
let auth_token = if matches!(ps_conf.pg_auth_type, AuthType::NeonJWT) {
@@ -1591,7 +1602,7 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
endpoint_storage_addr,
safekeepers_generation,
safekeepers,
pageservers,
pageserver_conninfo,
remote_ext_base_url.as_ref(),
stripe_size.0 as usize,
args.create_test_user,
@@ -1606,20 +1617,27 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
.endpoints
.get(endpoint_id.as_str())
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
let pageservers = if let Some(ps_id) = args.endpoint_pageserver_id {
let shards = if let Some(ps_id) = args.endpoint_pageserver_id {
let conf = env.get_pageserver_conf(ps_id)?;
// Use gRPC if requested.
let pageserver = if endpoint.grpc {
let grpc_addr = conf.listen_grpc_addr.as_ref().expect("bad config");
let (host, port) = parse_host_port(grpc_addr)?;
let port = port.unwrap_or(DEFAULT_PAGESERVER_GRPC_PORT);
(PageserverProtocol::Grpc, host, port)
} else {
let libpq_url = Some({
let (host, port) = parse_host_port(&conf.listen_pg_addr)?;
let port = port.unwrap_or(5432);
(PageserverProtocol::Libpq, host, port)
format!("postgres://no_user@{host}:{port}")
});
let grpc_url = if let Some(grpc_addr) = &conf.listen_grpc_addr {
let (host, port) = parse_host_port(grpc_addr)?;
let port = port.unwrap_or(DEFAULT_PAGESERVER_GRPC_PORT);
Some(format!("grpc://no_user@{host}:{port}"))
} else {
None
};
vec![pageserver]
let pageserver = PageserverShardConnectionInfo {
libpq_url,
grpc_url,
};
// If caller is telling us what pageserver to use, this is not a tenant which is
// fully managed by storage controller, therefore not sharded.
vec![(0, pageserver)]
} else {
let storage_controller = StorageController::from_env(env);
storage_controller
@@ -1629,27 +1647,31 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
.into_iter()
.map(|shard| {
// Use gRPC if requested.
if endpoint.grpc {
(
PageserverProtocol::Grpc,
Host::parse(&shard.listen_grpc_addr.expect("no gRPC address"))
.expect("bad hostname"),
shard.listen_grpc_port.expect("no gRPC port"),
)
let libpq_host = Host::parse(&shard.listen_pg_addr).expect("bad hostname");
let libpq_port = shard.listen_pg_port;
let libpq_url = Some(format!("postgres://no_user@{libpq_host}:{libpq_port}"));
let grpc_url = if let Some(grpc_host) = shard.listen_grpc_addr {
let grpc_port = shard.listen_grpc_port.expect("no gRPC port");
Some(format!("grpc://no_user@{grpc_host}:{grpc_port}"))
} else {
(
PageserverProtocol::Libpq,
Host::parse(&shard.listen_pg_addr).expect("bad hostname"),
shard.listen_pg_port,
)
}
None
};
(shard.shard_id.shard_number.0 as u32, PageserverShardConnectionInfo {
libpq_url,
grpc_url,
})
})
.collect::<Vec<_>>()
};
let pageserver_conninfo = PageserverConnectionInfo {
shards: shards.into_iter().collect(),
prefer_grpc: endpoint.grpc,
};
// If --safekeepers argument is given, use only the listed
// safekeeper nodes; otherwise all from the env.
let safekeepers = parse_safekeepers(&args.safekeepers)?;
endpoint.reconfigure(pageservers, None, safekeepers).await?;
endpoint.reconfigure(pageserver_conninfo, None, safekeepers).await?;
}
EndpointCmd::Stop(args) => {
let endpoint_id = &args.endpoint_id;

View File

@@ -56,9 +56,13 @@ use compute_api::responses::{
TlsConfig,
};
use compute_api::spec::{
Cluster, ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, Database, PgIdent,
RemoteExtSpec, Role,
Cluster, ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, Database,
PgIdent, RemoteExtSpec, Role,
};
// re-export these, because they're used in the reconfigure() function
pub use compute_api::spec::{PageserverConnectionInfo, PageserverShardConnectionInfo};
use jsonwebtoken::jwk::{
AlgorithmParameters, CommonParameters, EllipticCurve, Jwk, JwkSet, KeyAlgorithm, KeyOperations,
OctetKeyPairParameters, OctetKeyPairType, PublicKeyUse,
@@ -73,7 +77,6 @@ use sha2::{Digest, Sha256};
use spki::der::Decode;
use spki::{SubjectPublicKeyInfo, SubjectPublicKeyInfoRef};
use tracing::debug;
use url::Host;
use utils::id::{NodeId, TenantId, TimelineId};
use crate::local_env::LocalEnv;
@@ -659,14 +662,6 @@ impl Endpoint {
}
}
fn build_pageserver_connstr(pageservers: &[(PageserverProtocol, Host, u16)]) -> String {
pageservers
.iter()
.map(|(scheme, host, port)| format!("{scheme}://no_user@{host}:{port}"))
.collect::<Vec<_>>()
.join(",")
}
/// Map safekeepers ids to the actual connection strings.
fn build_safekeepers_connstrs(&self, sk_ids: Vec<NodeId>) -> Result<Vec<String>> {
let mut safekeeper_connstrings = Vec::new();
@@ -707,7 +702,7 @@ impl Endpoint {
endpoint_storage_addr: String,
safekeepers_generation: Option<SafekeeperGeneration>,
safekeepers: Vec<NodeId>,
pageservers: Vec<(PageserverProtocol, Host, u16)>,
pageserver_conninfo: PageserverConnectionInfo,
remote_ext_base_url: Option<&String>,
shard_stripe_size: usize,
create_test_user: bool,
@@ -726,9 +721,6 @@ impl Endpoint {
std::fs::remove_dir_all(self.pgdata())?;
}
let pageserver_connstring = Self::build_pageserver_connstr(&pageservers);
assert!(!pageserver_connstring.is_empty());
let safekeeper_connstrings = self.build_safekeepers_connstrs(safekeepers)?;
// check for file remote_extensions_spec.json
@@ -787,7 +779,7 @@ impl Endpoint {
branch_id: None,
endpoint_id: Some(self.endpoint_id.clone()),
mode: self.mode,
pageserver_connstring: Some(pageserver_connstring),
pageserver_connection_info: Some(pageserver_conninfo),
safekeepers_generation: safekeepers_generation.map(|g| g.into_inner()),
safekeeper_connstrings,
storage_auth_token: auth_token.clone(),
@@ -997,11 +989,11 @@ impl Endpoint {
pub async fn reconfigure(
&self,
pageservers: Vec<(PageserverProtocol, Host, u16)>,
pageserver_conninfo: PageserverConnectionInfo,
stripe_size: Option<ShardStripeSize>,
safekeepers: Option<Vec<NodeId>>,
) -> Result<()> {
anyhow::ensure!(!pageservers.is_empty(), "no pageservers provided");
anyhow::ensure!(!pageserver_conninfo.shards.is_empty(), "no pageservers provided");
let (mut spec, compute_ctl_config) = {
let config_path = self.endpoint_path().join("config.json");
@@ -1014,8 +1006,7 @@ impl Endpoint {
let postgresql_conf = self.read_postgresql_conf()?;
spec.cluster.postgresql_conf = Some(postgresql_conf);
let pageserver_connstr = Self::build_pageserver_connstr(&pageservers);
spec.pageserver_connstring = Some(pageserver_connstr);
spec.pageserver_connection_info = Some(pageserver_conninfo);
if stripe_size.is_some() {
spec.shard_stripe_size = stripe_size.map(|s| s.0 as usize);
}

View File

@@ -103,7 +103,11 @@ pub struct ComputeSpec {
// updated to fill these fields, we can make these non optional.
pub tenant_id: Option<TenantId>,
pub timeline_id: Option<TimelineId>,
pub pageserver_connstring: Option<String>,
// Pageserver information can be passed in two different ways:
// 1. Here
// 2. in cluster.settings. This is legacy, we are switching to method 1.
pub pageserver_connection_info: Option<PageserverConnectionInfo>,
// More neon ids that we expose to the compute_ctl
// and to postgres as neon extension GUCs.
@@ -203,6 +207,20 @@ pub enum ComputeFeature {
UnknownFeature,
}
/// Feature flag to signal `compute_ctl` to enable certain experimental functionality.
#[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)]
pub struct PageserverConnectionInfo {
pub shards: HashMap<u32, PageserverShardConnectionInfo>,
pub prefer_grpc: bool,
}
#[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)]
pub struct PageserverShardConnectionInfo {
pub libpq_url: Option<String>,
pub grpc_url: Option<String>,
}
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
pub struct RemoteExtSpec {
pub public_extensions: Option<Vec<String>>,

View File

@@ -204,7 +204,7 @@ impl PooledItemFactory<StreamReturner> for StreamFactory {
#[derive(Clone)]
pub struct RequestTracker {
cur_id: Arc<AtomicU64>,
_cur_id: Arc<AtomicU64>,
stream_pool: Arc<ConnectionPool<StreamReturner>>,
unary_pool: Arc<ConnectionPool<Channel>>,
auth_interceptor: AuthInterceptor,
@@ -220,7 +220,7 @@ impl RequestTracker {
let cur_id = Arc::new(AtomicU64::new(0));
RequestTracker {
cur_id: cur_id.clone(),
_cur_id: cur_id.clone(),
stream_pool: stream_pool,
unary_pool: unary_pool,
auth_interceptor: auth_interceptor,

View File

@@ -70,7 +70,7 @@ pub(super) async fn init(
tenant_id: String,
timeline_id: String,
auth_token: Option<String>,
mut shard_map: HashMap<utils::shard::ShardIndex, String>,
shard_map: HashMap<utils::shard::ShardIndex, String>,
initial_file_cache_size: u64,
file_cache_path: Option<PathBuf>,
) -> CommunicatorWorkerProcessStruct<'static> {
@@ -87,12 +87,6 @@ pub(super) async fn init(
)
};
// TODO: for now, just hack in the gRPC port number. This needs to be plumbed through.
for connstr in shard_map.values_mut() {
*connstr = connstr.replace(":64000", ":51051");
}
tracing::warn!("mangled connstrings to use gRPC port 51051 shard_map={shard_map:?}");
// Initialize subsystems
let cache = cis
.integrated_cache_init_struct

View File

@@ -69,7 +69,8 @@ char *neon_project_id;
char *neon_branch_id;
char *neon_endpoint_id;
int32 max_cluster_size;
char *page_server_connstring;
char *pageserver_connstring;
char *pageserver_grpc_urls;
char *neon_auth_token;
int readahead_buffer_size = 128;
@@ -177,6 +178,8 @@ static bool pageserver_flush(shardno_t shard_no);
static void pageserver_disconnect(shardno_t shard_no);
static void pageserver_disconnect_shard(shardno_t shard_no);
static void AssignShardMap(const char *newval);
static bool
PagestoreShmemIsValid(void)
{
@@ -239,6 +242,7 @@ ParseShardMap(const char *connstr, ShardMap *result)
return true;
}
/* GUC hooks for neon.pageserver_connstring */
static bool
CheckPageserverConnstring(char **newval, void **extra, GucSource source)
{
@@ -249,6 +253,45 @@ CheckPageserverConnstring(char **newval, void **extra, GucSource source)
static void
AssignPageserverConnstring(const char *newval, void *extra)
{
/*
* 'neon.pageserver_connstring' is ignored if the new communicator is used.
* In that case, the shard map is loaded from 'neon.pageserver_grpc_urls'
* instead.
*/
if (neon_enable_new_communicator)
return;
AssignShardMap(newval);
}
/* GUC hooks for neon.pageserver_connstring */
static bool
CheckPageserverGrpcUrls(char **newval, void **extra, GucSource source)
{
char *p = *newval;
return ParseShardMap(p, NULL);
}
static void
AssignPageserverGrpcUrls(const char *newval, void *extra)
{
/*
* 'neon.pageserver_grpc-urls' is ignored if the new communicator is not
* used. In that case, the shard map is loaded from 'neon.pageserver_connstring'
instead.
*/
if (!neon_enable_new_communicator)
return;
AssignShardMap(newval);
}
static void
AssignShardMap(const char *newval)
{
ShardMap shard_map;
@@ -262,7 +305,7 @@ AssignPageserverConnstring(const char *newval, void *extra)
{
/*
* shouldn't happen, because we already checked the value in
* CheckPageserverConnstring
* CheckPageserverConnstring/CheckPageserverGrpcUrls
*/
elog(ERROR, "could not parse shard map");
}
@@ -1352,7 +1395,8 @@ PagestoreShmemInit(void)
pg_atomic_init_u64(&pagestore_shared->begin_update_counter, 0);
pg_atomic_init_u64(&pagestore_shared->end_update_counter, 0);
memset(&pagestore_shared->shard_map, 0, sizeof(ShardMap));
AssignPageserverConnstring(page_server_connstring, NULL);
AssignPageserverConnstring(pageserver_connstring, NULL);
AssignPageserverGrpcUrls(pageserver_grpc_urls, NULL);
}
NeonPerfCountersShmemInit();
@@ -1405,12 +1449,21 @@ pg_init_libpagestore(void)
DefineCustomStringVariable("neon.pageserver_connstring",
"connection string to the page server",
NULL,
&page_server_connstring,
&pageserver_connstring,
"",
PGC_SIGHUP,
0, /* no flags required */
CheckPageserverConnstring, AssignPageserverConnstring, NULL);
DefineCustomStringVariable("neon.pageserver_grpc_urls",
"list of gRPC URLs for the page servers",
NULL,
&pageserver_grpc_urls,
"",
PGC_SIGHUP,
0, /* no flags required */
CheckPageserverGrpcUrls, AssignPageserverGrpcUrls, NULL);
DefineCustomStringVariable("neon.timeline_id",
"Neon timeline_id the server is running on",
NULL,
@@ -1568,7 +1621,7 @@ pg_init_libpagestore(void)
if (neon_auth_token)
neon_log(LOG, "using storage auth token from NEON_AUTH_TOKEN environment variable");
if (page_server_connstring && page_server_connstring[0])
if (pageserver_connstring[0] || pageserver_connstring[0])
{
neon_log(PageStoreTrace, "set neon_smgr hook");
smgr_hook = smgr_neon;

View File

@@ -220,7 +220,8 @@ extern void prefetch_on_ps_disconnect(void);
extern page_server_api *page_server;
extern char *page_server_connstring;
extern char *pageserver_connstring;
extern char *pageserver_grpc_urls;
extern int flush_every_n_requests;
extern int readahead_buffer_size;
extern char *neon_timeline;

View File

@@ -5,7 +5,7 @@ use std::sync::Arc;
use std::time::Duration;
use anyhow::Context;
use control_plane::endpoint::{ComputeControlPlane, EndpointStatus, PageserverProtocol};
use control_plane::endpoint::{ComputeControlPlane, EndpointStatus, PageserverConnectionInfo, PageserverShardConnectionInfo};
use control_plane::local_env::LocalEnv;
use futures::StreamExt;
use hyper::StatusCode;
@@ -425,27 +425,40 @@ impl ComputeHook {
if endpoint.tenant_id == *tenant_id && endpoint.status() == EndpointStatus::Running {
tracing::info!("Reconfiguring endpoint {endpoint_name}");
let pageservers = shards
.iter()
.map(|shard| {
let ps_conf = env
.get_pageserver_conf(shard.node_id)
.expect("Unknown pageserver");
if endpoint.grpc {
let addr = ps_conf.listen_grpc_addr.as_ref().expect("no gRPC address");
let (host, port) = parse_host_port(addr).expect("invalid gRPC address");
let port = port.unwrap_or(DEFAULT_GRPC_LISTEN_PORT);
(PageserverProtocol::Grpc, host, port)
} else {
let (host, port) = parse_host_port(&ps_conf.listen_pg_addr)
.expect("Unable to parse listen_pg_addr");
(PageserverProtocol::Libpq, host, port.unwrap_or(5432))
}
})
.collect::<Vec<_>>();
let mut shard_conninfos = HashMap::new();
for shard in shards.iter() {
let ps_conf = env
.get_pageserver_conf(shard.node_id)
.expect("Unknown pageserver");
let libpq_url = Some({
let (host, port) = parse_host_port(&ps_conf.listen_pg_addr)
.expect("Unable to parse listen_pg_addr");
let port = port.unwrap_or(5432);
format!("postgres://no_user@{host}:{port}")
});
let grpc_url = if let Some(grpc_addr) = &ps_conf.listen_grpc_addr {
let (host, port) = parse_host_port(grpc_addr)
.expect("invalid gRPC address");
let port = port.unwrap_or(DEFAULT_GRPC_LISTEN_PORT);
Some(format!("grpc://no_user@{host}:{port}"))
} else {
None
};
let pageserver = PageserverShardConnectionInfo {
libpq_url,
grpc_url,
};
shard_conninfos.insert(shard.shard_number.0 as u32, pageserver);
}
let pageserver_conninfo = PageserverConnectionInfo {
shards: shard_conninfos,
prefer_grpc: endpoint.grpc,
};
endpoint
.reconfigure(pageservers, *stripe_size, None)
.reconfigure(pageserver_conninfo, *stripe_size, None)
.await
.map_err(NotifyError::NeonLocal)?;
}

View File

@@ -4258,7 +4258,13 @@ class Endpoint(PgProtocol, LogUtils):
# If gRPC is enabled, use the new communicator too.
#
# NB: the communicator is enabled by default, so force it to false otherwise.
config_lines += [f"neon.enable_new_communicator={str(grpc).lower()}"]
#
# XXX: By checking for None, we enable the new communicator for all tests
# by default
if grpc or grpc is None:
config_lines += [f"neon.enable_new_communicator=on"]
else:
config_lines += [f"neon.enable_new_communicator=off"]
# Delete file cache if it exists (and we're recreating the endpoint)
if USE_LFC: