Compare commits

...

1 Commits

Author SHA1 Message Date
Erik Grinaker
472031e0b7 pageserver: enable TLS for gRPC server 2025-05-26 11:38:23 +02:00
4 changed files with 80 additions and 42 deletions

View File

@@ -10,6 +10,7 @@ pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
// TODO: gRPC is disabled by default for now, but the port is used in neon_local.
pub const DEFAULT_GRPC_LISTEN_PORT: u16 = 51051; // storage-broker already uses 50051
pub const DEFAULT_GRPC_LISTEN_TLS: bool = false; // TODO: enable by default?
use std::collections::HashMap;
use std::num::{NonZeroU64, NonZeroUsize};
@@ -107,6 +108,7 @@ pub struct ConfigToml {
pub listen_http_addr: String,
pub listen_https_addr: Option<String>,
pub listen_grpc_addr: Option<String>,
pub listen_grpc_tls: bool,
pub ssl_key_file: Utf8PathBuf,
pub ssl_cert_file: Utf8PathBuf,
#[serde(with = "humantime_serde")]
@@ -593,6 +595,7 @@ impl Default for ConfigToml {
listen_http_addr: (DEFAULT_HTTP_LISTEN_ADDR.to_string()),
listen_https_addr: (None),
listen_grpc_addr: None, // TODO: default to 127.0.0.1:51051
listen_grpc_tls: DEFAULT_GRPC_LISTEN_TLS,
ssl_key_file: Utf8PathBuf::from(DEFAULT_SSL_KEY_FILE),
ssl_cert_file: Utf8PathBuf::from(DEFAULT_SSL_CERT_FILE),
ssl_cert_reload_period: Duration::from_secs(60),

View File

@@ -483,7 +483,9 @@ fn start_pageserver(
grpc_auth = None;
}
let tls_server_config = if conf.listen_https_addr.is_some() || conf.enable_tls_page_service_api
let tls_server_config = if conf.listen_https_addr.is_some()
|| conf.enable_tls_page_service_api
|| conf.listen_grpc_tls
{
let resolver = BACKGROUND_RUNTIME.block_on(ReloadingCertificateResolver::new(
"main",
@@ -791,11 +793,9 @@ fn start_pageserver(
tokio::net::TcpListener::from_std(pageserver_listener)
.context("create tokio listener")?
},
if conf.enable_tls_page_service_api {
tls_server_config
} else {
None
},
conf.enable_tls_page_service_api
.then(|| tls_server_config.clone())
.flatten(),
basebackup_cache.clone(),
);
@@ -813,6 +813,9 @@ fn start_pageserver(
grpc_auth,
otel_guard.as_ref().map(|g| g.dispatch.clone()),
grpc_listener,
conf.listen_grpc_tls
.then(|| tls_server_config.clone())
.flatten(),
basebackup_cache,
)?);
}

View File

@@ -63,6 +63,8 @@ pub struct PageServerConf {
///
/// EXPERIMENTAL: this protocol is unstable and under active development.
pub listen_grpc_addr: Option<String>,
/// If true, enable TLS for the gRPC server, using ssl_key_file and ssl_cert_file.
pub listen_grpc_tls: bool,
/// Path to a file with certificate's private key for https and gRPC API.
/// Default: server.key
@@ -228,7 +230,7 @@ pub struct PageServerConf {
pub tracing: Option<pageserver_api::config::Tracing>,
/// Enable TLS in page service API.
/// Enable TLS in the libpq page service API.
/// Does not force TLS: the client negotiates TLS usage during the handshake.
/// Uses key and certificate from ssl_key_file/ssl_cert_file.
pub enable_tls_page_service_api: bool,
@@ -363,6 +365,7 @@ impl PageServerConf {
listen_http_addr,
listen_https_addr,
listen_grpc_addr,
listen_grpc_tls,
ssl_key_file,
ssl_cert_file,
ssl_cert_reload_period,
@@ -433,6 +436,7 @@ impl PageServerConf {
listen_http_addr,
listen_https_addr,
listen_grpc_addr,
listen_grpc_tls,
ssl_key_file,
ssl_cert_file,
ssl_cert_reload_period,

View File

@@ -13,9 +13,10 @@ use std::{io, str};
use anyhow::{Context, bail};
use async_compression::tokio::write::GzipEncoder;
use bytes::Buf;
use futures::{FutureExt, Stream};
use futures::{FutureExt, Stream, StreamExt as _};
use itertools::Itertools;
use jsonwebtoken::TokenData;
use nix::sys::socket::{setsockopt, sockopt};
use once_cell::sync::OnceCell;
use pageserver_api::config::{
GetVectoredConcurrentIo, PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
@@ -42,6 +43,8 @@ use pq_proto::{BeMessage, FeMessage, FeStartupPacket, RowDescriptor};
use strum_macros::IntoStaticStr;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufWriter};
use tokio::task::JoinHandle;
use tokio_rustls::TlsAcceptor;
use tokio_stream::wrappers::TcpListenerStream;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::auth::{Claims, Scope, SwappableJwtAuth};
@@ -69,7 +72,7 @@ use crate::span::{
debug_assert_current_span_has_tenant_and_timeline_id,
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
};
use crate::task_mgr::{self, COMPUTE_REQUEST_RUNTIME, TaskKind};
use crate::task_mgr::{COMPUTE_REQUEST_RUNTIME, TaskKind, exit_on_panic_or_error};
use crate::tenant::mgr::{
GetActiveTenantError, GetTenantError, ShardResolveResult, ShardSelector, TenantManager,
};
@@ -87,10 +90,9 @@ const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
/// Threshold at which to log slow GetPage requests.
const LOG_SLOW_GETPAGE_THRESHOLD: Duration = Duration::from_secs(30);
/// The idle time before sending TCP keepalive probes for gRPC connections. The
/// interval and timeout between each probe is configured via sysctl. This
/// allows detecting dead connections sooner.
const GRPC_TCP_KEEPALIVE_TIME: Duration = Duration::from_secs(60);
/// Whether to enable TCP keepalives for gRPC connections. The interval and
/// timeouts are configured via sysctl. This detects dead connections sooner.
const GRPC_TCP_KEEPALIVE: bool = true;
/// Whether to enable TCP nodelay for gRPC connections. This disables Nagle's
/// algorithm, which can cause latency spikes for small messages.
@@ -140,7 +142,7 @@ pub fn spawn(
// accept connections.)
DownloadBehavior::Error,
);
let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
let task = COMPUTE_REQUEST_RUNTIME.spawn(exit_on_panic_or_error(
"libpq listener",
libpq_listener_main(
conf,
@@ -162,17 +164,18 @@ pub fn spawn(
}
/// Spawns a gRPC server for the page service.
///
/// TODO: this doesn't support TLS. We need TLS reloading via ReloadingCertificateResolver, so we
/// need to reimplement the TCP+TLS accept loop ourselves.
pub fn spawn_grpc(
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<SwappableJwtAuth>>,
perf_trace_dispatch: Option<Dispatch>,
listener: std::net::TcpListener,
tls_config: Option<Arc<rustls::ServerConfig>>,
basebackup_cache: Arc<BasebackupCache>,
) -> anyhow::Result<CancellableTask> {
// Use the compute runtime.
let _runtime = COMPUTE_REQUEST_RUNTIME.enter();
let cancel = CancellationToken::new();
let ctx = RequestContextBuilder::new(TaskKind::PageRequestHandler)
.download_behavior(DownloadBehavior::Download)
@@ -180,18 +183,9 @@ pub fn spawn_grpc(
.detached_child();
let gate = Gate::default();
// Set up the TCP socket. We take a preconfigured TcpListener to bind the
// port early during startup.
let incoming = {
let _runtime = COMPUTE_REQUEST_RUNTIME.enter(); // required by TcpListener::from_std
listener.set_nonblocking(true)?;
tonic::transport::server::TcpIncoming::from(tokio::net::TcpListener::from_std(listener)?)
.with_nodelay(Some(GRPC_TCP_NODELAY))
.with_keepalive(Some(GRPC_TCP_KEEPALIVE_TIME))
};
// Set up the gRPC server.
//
// NB: does not respect TCP settings, since we configure the socket manually.
// TODO: consider tuning window sizes.
// TODO: wire up tracing.
let mut server = tonic::transport::Server::builder()
@@ -219,21 +213,55 @@ pub fn spawn_grpc(
.build_v1()?;
let server = server.add_service(reflection_service);
// Spawn server task.
let task_cancel = cancel.clone();
let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
"grpc listener",
async move {
let result = server
.serve_with_incoming_shutdown(incoming, task_cancel.cancelled())
.await;
if result.is_ok() {
// TODO: revisit shutdown logic once page service is implemented.
gate.close().await;
}
result
},
));
// Set up the TCP socket. We take a preconfigured TcpListener to bind the port early.
listener.set_nonblocking(true)?;
setsockopt(&listener, sockopt::KeepAlive, &GRPC_TCP_KEEPALIVE)?;
let listener = tokio::net::TcpListener::from_std(listener)?;
// Build the serve future.
let cancel_serve = cancel.clone();
let serve = async move {
// Accept TCP connections.
let tcp_conns = TcpListenerStream::new(listener).map(|result| {
let tcp_conn = result.inspect_err(|err| error!("TCP accept failed: {err}"))?;
tcp_conn.set_nodelay(GRPC_TCP_NODELAY).inspect_err(|err| {
error!("TCP nodelay failed: {err}");
})?;
Ok(tcp_conn)
});
if let Some(tls_config) = tls_config {
// If TLS is enabled, decrypt the TCP streams before passing them to the server.
let tls_acceptor = TlsAcceptor::from(tls_config);
let tls_conns = async_stream::stream! {
for await result in tcp_conns {
match result {
Ok(tcp_conn) => yield tls_acceptor
.accept(tcp_conn)
.await
.inspect_err(|err| error!("TLS handshake failed: {err}")),
Err(err) => yield Err(err),
}
}
};
server
.serve_with_incoming_shutdown(tls_conns, cancel_serve.cancelled())
.await?;
} else {
// Otherwise, just pass the plaintext TCP streams.
server
.serve_with_incoming_shutdown(tcp_conns, cancel_serve.cancelled())
.await?;
}
// Clean shutdown, wait for tasks to finish.
// TODO: revisit shutdown logic once page service is implemented.
gate.close().await;
anyhow::Ok(())
};
// Spawn a task to run the serve future.
let task = tokio::spawn(exit_on_panic_or_error("grpc listener", serve));
Ok(CancellableTask { task, cancel })
}