mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-14 03:30:36 +00:00
Compare commits
1 Commits
release-pr
...
erik/pages
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
472031e0b7 |
@@ -10,6 +10,7 @@ pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898;
|
||||
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
|
||||
// TODO: gRPC is disabled by default for now, but the port is used in neon_local.
|
||||
pub const DEFAULT_GRPC_LISTEN_PORT: u16 = 51051; // storage-broker already uses 50051
|
||||
pub const DEFAULT_GRPC_LISTEN_TLS: bool = false; // TODO: enable by default?
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::num::{NonZeroU64, NonZeroUsize};
|
||||
@@ -107,6 +108,7 @@ pub struct ConfigToml {
|
||||
pub listen_http_addr: String,
|
||||
pub listen_https_addr: Option<String>,
|
||||
pub listen_grpc_addr: Option<String>,
|
||||
pub listen_grpc_tls: bool,
|
||||
pub ssl_key_file: Utf8PathBuf,
|
||||
pub ssl_cert_file: Utf8PathBuf,
|
||||
#[serde(with = "humantime_serde")]
|
||||
@@ -593,6 +595,7 @@ impl Default for ConfigToml {
|
||||
listen_http_addr: (DEFAULT_HTTP_LISTEN_ADDR.to_string()),
|
||||
listen_https_addr: (None),
|
||||
listen_grpc_addr: None, // TODO: default to 127.0.0.1:51051
|
||||
listen_grpc_tls: DEFAULT_GRPC_LISTEN_TLS,
|
||||
ssl_key_file: Utf8PathBuf::from(DEFAULT_SSL_KEY_FILE),
|
||||
ssl_cert_file: Utf8PathBuf::from(DEFAULT_SSL_CERT_FILE),
|
||||
ssl_cert_reload_period: Duration::from_secs(60),
|
||||
|
||||
@@ -483,7 +483,9 @@ fn start_pageserver(
|
||||
grpc_auth = None;
|
||||
}
|
||||
|
||||
let tls_server_config = if conf.listen_https_addr.is_some() || conf.enable_tls_page_service_api
|
||||
let tls_server_config = if conf.listen_https_addr.is_some()
|
||||
|| conf.enable_tls_page_service_api
|
||||
|| conf.listen_grpc_tls
|
||||
{
|
||||
let resolver = BACKGROUND_RUNTIME.block_on(ReloadingCertificateResolver::new(
|
||||
"main",
|
||||
@@ -791,11 +793,9 @@ fn start_pageserver(
|
||||
tokio::net::TcpListener::from_std(pageserver_listener)
|
||||
.context("create tokio listener")?
|
||||
},
|
||||
if conf.enable_tls_page_service_api {
|
||||
tls_server_config
|
||||
} else {
|
||||
None
|
||||
},
|
||||
conf.enable_tls_page_service_api
|
||||
.then(|| tls_server_config.clone())
|
||||
.flatten(),
|
||||
basebackup_cache.clone(),
|
||||
);
|
||||
|
||||
@@ -813,6 +813,9 @@ fn start_pageserver(
|
||||
grpc_auth,
|
||||
otel_guard.as_ref().map(|g| g.dispatch.clone()),
|
||||
grpc_listener,
|
||||
conf.listen_grpc_tls
|
||||
.then(|| tls_server_config.clone())
|
||||
.flatten(),
|
||||
basebackup_cache,
|
||||
)?);
|
||||
}
|
||||
|
||||
@@ -63,6 +63,8 @@ pub struct PageServerConf {
|
||||
///
|
||||
/// EXPERIMENTAL: this protocol is unstable and under active development.
|
||||
pub listen_grpc_addr: Option<String>,
|
||||
/// If true, enable TLS for the gRPC server, using ssl_key_file and ssl_cert_file.
|
||||
pub listen_grpc_tls: bool,
|
||||
|
||||
/// Path to a file with certificate's private key for https and gRPC API.
|
||||
/// Default: server.key
|
||||
@@ -228,7 +230,7 @@ pub struct PageServerConf {
|
||||
|
||||
pub tracing: Option<pageserver_api::config::Tracing>,
|
||||
|
||||
/// Enable TLS in page service API.
|
||||
/// Enable TLS in the libpq page service API.
|
||||
/// Does not force TLS: the client negotiates TLS usage during the handshake.
|
||||
/// Uses key and certificate from ssl_key_file/ssl_cert_file.
|
||||
pub enable_tls_page_service_api: bool,
|
||||
@@ -363,6 +365,7 @@ impl PageServerConf {
|
||||
listen_http_addr,
|
||||
listen_https_addr,
|
||||
listen_grpc_addr,
|
||||
listen_grpc_tls,
|
||||
ssl_key_file,
|
||||
ssl_cert_file,
|
||||
ssl_cert_reload_period,
|
||||
@@ -433,6 +436,7 @@ impl PageServerConf {
|
||||
listen_http_addr,
|
||||
listen_https_addr,
|
||||
listen_grpc_addr,
|
||||
listen_grpc_tls,
|
||||
ssl_key_file,
|
||||
ssl_cert_file,
|
||||
ssl_cert_reload_period,
|
||||
|
||||
@@ -13,9 +13,10 @@ use std::{io, str};
|
||||
use anyhow::{Context, bail};
|
||||
use async_compression::tokio::write::GzipEncoder;
|
||||
use bytes::Buf;
|
||||
use futures::{FutureExt, Stream};
|
||||
use futures::{FutureExt, Stream, StreamExt as _};
|
||||
use itertools::Itertools;
|
||||
use jsonwebtoken::TokenData;
|
||||
use nix::sys::socket::{setsockopt, sockopt};
|
||||
use once_cell::sync::OnceCell;
|
||||
use pageserver_api::config::{
|
||||
GetVectoredConcurrentIo, PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
|
||||
@@ -42,6 +43,8 @@ use pq_proto::{BeMessage, FeMessage, FeStartupPacket, RowDescriptor};
|
||||
use strum_macros::IntoStaticStr;
|
||||
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufWriter};
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_rustls::TlsAcceptor;
|
||||
use tokio_stream::wrappers::TcpListenerStream;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::auth::{Claims, Scope, SwappableJwtAuth};
|
||||
@@ -69,7 +72,7 @@ use crate::span::{
|
||||
debug_assert_current_span_has_tenant_and_timeline_id,
|
||||
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
|
||||
};
|
||||
use crate::task_mgr::{self, COMPUTE_REQUEST_RUNTIME, TaskKind};
|
||||
use crate::task_mgr::{COMPUTE_REQUEST_RUNTIME, TaskKind, exit_on_panic_or_error};
|
||||
use crate::tenant::mgr::{
|
||||
GetActiveTenantError, GetTenantError, ShardResolveResult, ShardSelector, TenantManager,
|
||||
};
|
||||
@@ -87,10 +90,9 @@ const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
|
||||
/// Threshold at which to log slow GetPage requests.
|
||||
const LOG_SLOW_GETPAGE_THRESHOLD: Duration = Duration::from_secs(30);
|
||||
|
||||
/// The idle time before sending TCP keepalive probes for gRPC connections. The
|
||||
/// interval and timeout between each probe is configured via sysctl. This
|
||||
/// allows detecting dead connections sooner.
|
||||
const GRPC_TCP_KEEPALIVE_TIME: Duration = Duration::from_secs(60);
|
||||
/// Whether to enable TCP keepalives for gRPC connections. The interval and
|
||||
/// timeouts are configured via sysctl. This detects dead connections sooner.
|
||||
const GRPC_TCP_KEEPALIVE: bool = true;
|
||||
|
||||
/// Whether to enable TCP nodelay for gRPC connections. This disables Nagle's
|
||||
/// algorithm, which can cause latency spikes for small messages.
|
||||
@@ -140,7 +142,7 @@ pub fn spawn(
|
||||
// accept connections.)
|
||||
DownloadBehavior::Error,
|
||||
);
|
||||
let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
|
||||
let task = COMPUTE_REQUEST_RUNTIME.spawn(exit_on_panic_or_error(
|
||||
"libpq listener",
|
||||
libpq_listener_main(
|
||||
conf,
|
||||
@@ -162,17 +164,18 @@ pub fn spawn(
|
||||
}
|
||||
|
||||
/// Spawns a gRPC server for the page service.
|
||||
///
|
||||
/// TODO: this doesn't support TLS. We need TLS reloading via ReloadingCertificateResolver, so we
|
||||
/// need to reimplement the TCP+TLS accept loop ourselves.
|
||||
pub fn spawn_grpc(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
auth: Option<Arc<SwappableJwtAuth>>,
|
||||
perf_trace_dispatch: Option<Dispatch>,
|
||||
listener: std::net::TcpListener,
|
||||
tls_config: Option<Arc<rustls::ServerConfig>>,
|
||||
basebackup_cache: Arc<BasebackupCache>,
|
||||
) -> anyhow::Result<CancellableTask> {
|
||||
// Use the compute runtime.
|
||||
let _runtime = COMPUTE_REQUEST_RUNTIME.enter();
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
let ctx = RequestContextBuilder::new(TaskKind::PageRequestHandler)
|
||||
.download_behavior(DownloadBehavior::Download)
|
||||
@@ -180,18 +183,9 @@ pub fn spawn_grpc(
|
||||
.detached_child();
|
||||
let gate = Gate::default();
|
||||
|
||||
// Set up the TCP socket. We take a preconfigured TcpListener to bind the
|
||||
// port early during startup.
|
||||
let incoming = {
|
||||
let _runtime = COMPUTE_REQUEST_RUNTIME.enter(); // required by TcpListener::from_std
|
||||
listener.set_nonblocking(true)?;
|
||||
tonic::transport::server::TcpIncoming::from(tokio::net::TcpListener::from_std(listener)?)
|
||||
.with_nodelay(Some(GRPC_TCP_NODELAY))
|
||||
.with_keepalive(Some(GRPC_TCP_KEEPALIVE_TIME))
|
||||
};
|
||||
|
||||
// Set up the gRPC server.
|
||||
//
|
||||
// NB: does not respect TCP settings, since we configure the socket manually.
|
||||
// TODO: consider tuning window sizes.
|
||||
// TODO: wire up tracing.
|
||||
let mut server = tonic::transport::Server::builder()
|
||||
@@ -219,21 +213,55 @@ pub fn spawn_grpc(
|
||||
.build_v1()?;
|
||||
let server = server.add_service(reflection_service);
|
||||
|
||||
// Spawn server task.
|
||||
let task_cancel = cancel.clone();
|
||||
let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
|
||||
"grpc listener",
|
||||
async move {
|
||||
let result = server
|
||||
.serve_with_incoming_shutdown(incoming, task_cancel.cancelled())
|
||||
.await;
|
||||
if result.is_ok() {
|
||||
// TODO: revisit shutdown logic once page service is implemented.
|
||||
gate.close().await;
|
||||
}
|
||||
result
|
||||
},
|
||||
));
|
||||
// Set up the TCP socket. We take a preconfigured TcpListener to bind the port early.
|
||||
listener.set_nonblocking(true)?;
|
||||
setsockopt(&listener, sockopt::KeepAlive, &GRPC_TCP_KEEPALIVE)?;
|
||||
let listener = tokio::net::TcpListener::from_std(listener)?;
|
||||
|
||||
// Build the serve future.
|
||||
let cancel_serve = cancel.clone();
|
||||
let serve = async move {
|
||||
// Accept TCP connections.
|
||||
let tcp_conns = TcpListenerStream::new(listener).map(|result| {
|
||||
let tcp_conn = result.inspect_err(|err| error!("TCP accept failed: {err}"))?;
|
||||
tcp_conn.set_nodelay(GRPC_TCP_NODELAY).inspect_err(|err| {
|
||||
error!("TCP nodelay failed: {err}");
|
||||
})?;
|
||||
Ok(tcp_conn)
|
||||
});
|
||||
|
||||
if let Some(tls_config) = tls_config {
|
||||
// If TLS is enabled, decrypt the TCP streams before passing them to the server.
|
||||
let tls_acceptor = TlsAcceptor::from(tls_config);
|
||||
let tls_conns = async_stream::stream! {
|
||||
for await result in tcp_conns {
|
||||
match result {
|
||||
Ok(tcp_conn) => yield tls_acceptor
|
||||
.accept(tcp_conn)
|
||||
.await
|
||||
.inspect_err(|err| error!("TLS handshake failed: {err}")),
|
||||
Err(err) => yield Err(err),
|
||||
}
|
||||
}
|
||||
};
|
||||
server
|
||||
.serve_with_incoming_shutdown(tls_conns, cancel_serve.cancelled())
|
||||
.await?;
|
||||
} else {
|
||||
// Otherwise, just pass the plaintext TCP streams.
|
||||
server
|
||||
.serve_with_incoming_shutdown(tcp_conns, cancel_serve.cancelled())
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Clean shutdown, wait for tasks to finish.
|
||||
// TODO: revisit shutdown logic once page service is implemented.
|
||||
gate.close().await;
|
||||
anyhow::Ok(())
|
||||
};
|
||||
|
||||
// Spawn a task to run the serve future.
|
||||
let task = tokio::spawn(exit_on_panic_or_error("grpc listener", serve));
|
||||
|
||||
Ok(CancellableTask { task, cancel })
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user