From ec991877f451893d81db5856f18ae65070baa211 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 26 May 2025 10:27:48 +0200 Subject: [PATCH] pageserver: add gRPC server (#11972) ## Problem We want to expose the page service over gRPC, for use with the communicator. Requires #11995. Touches #11728. ## Summary of changes This patch wires up a gRPC server in the Pageserver, using Tonic. It does not yet implement the actual page service. * Adds `listen_grpc_addr` and `grpc_auth_type` config options (disabled by default). * Enables gRPC by default with `neon_local`. * Stub implementation of `page_api.PageService`, returning unimplemented errors. * gRPC reflection service for use with e.g. `grpcurl`. Subsequent PRs will implement the actual page service, including authentication and observability. Notably, TLS support is not yet implemented. Certificate reloading requires us to reimplement the entire Tonic gRPC server. --- Cargo.lock | 21 ++++ Cargo.toml | 3 +- control_plane/safekeepers.conf | 2 + control_plane/simple.conf | 2 + control_plane/src/bin/neon_local.rs | 4 + control_plane/src/local_env.rs | 16 +++ control_plane/src/pageserver.rs | 4 +- libs/pageserver_api/src/config.rs | 6 + pageserver/Cargo.toml | 3 + pageserver/src/bin/pageserver.rs | 69 ++++++++--- pageserver/src/config.rs | 19 ++- pageserver/src/lib.rs | 11 ++ pageserver/src/page_service.rs | 167 +++++++++++++++++++++++++- pageserver/src/task_mgr.rs | 7 +- test_runner/fixtures/neon_fixtures.py | 2 + workspace_hack/Cargo.toml | 6 +- 16 files changed, 312 insertions(+), 30 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 422af2c97e..ddca5bbd3f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4321,6 +4321,7 @@ dependencies = [ "pageserver_api", "pageserver_client", "pageserver_compaction", + "pageserver_page_api", "pem", "pin-project-lite", "postgres-protocol", @@ -4363,6 +4364,8 @@ dependencies = [ "tokio-tar", "tokio-util", "toml_edit", + "tonic 0.13.1", + "tonic-reflection", "tracing", "tracing-utils", "twox-hash", @@ -7520,8 +7523,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9" dependencies = [ "async-trait", + "axum", "base64 0.22.1", "bytes", + "h2 0.4.4", "http 1.1.0", "http-body 1.0.0", "http-body-util", @@ -7532,6 +7537,7 @@ dependencies = [ "pin-project", "prost 0.13.5", "rustls-native-certs 0.8.0", + "socket2", "tokio", "tokio-rustls 0.26.2", "tokio-stream", @@ -7555,6 +7561,19 @@ dependencies = [ "syn 2.0.100", ] +[[package]] +name = "tonic-reflection" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9687bd5bfeafebdded2356950f278bba8226f0b32109537c4253406e09aafe1" +dependencies = [ + "prost 0.13.5", + "prost-types 0.13.3", + "tokio", + "tokio-stream", + "tonic 0.13.1", +] + [[package]] name = "tower" version = "0.4.13" @@ -8526,6 +8545,8 @@ dependencies = [ "ahash", "anstream", "anyhow", + "axum", + "axum-core", "base64 0.13.1", "base64 0.21.7", "base64ct", diff --git a/Cargo.toml b/Cargo.toml index c8e2c38c85..d2c8e86bd4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -199,7 +199,8 @@ tokio-tar = "0.3" tokio-util = { version = "0.7.10", features = ["io", "rt"] } toml = "0.8" toml_edit = "0.22" -tonic = { version = "0.13.1", default-features = false, features = ["channel", "codegen", "prost", "tls-ring", "tls-native-roots"] } +tonic = { version = "0.13.1", default-features = false, features = ["channel", "codegen", "prost", "router", "server", "tls-ring", "tls-native-roots"] } +tonic-reflection = { version = "0.13.1", features = ["server"] } tower = { version = "0.5.2", default-features = false } tower-http = { version = "0.6.2", features = ["auth", "request-id", "trace"] } diff --git a/control_plane/safekeepers.conf b/control_plane/safekeepers.conf index 576cc4a3a9..a73e274dfa 100644 --- a/control_plane/safekeepers.conf +++ b/control_plane/safekeepers.conf @@ -2,8 +2,10 @@ [pageserver] listen_pg_addr = '127.0.0.1:64000' listen_http_addr = '127.0.0.1:9898' +listen_grpc_addr = '127.0.0.1:51051' pg_auth_type = 'Trust' http_auth_type = 'Trust' +grpc_auth_type = 'Trust' [[safekeepers]] id = 1 diff --git a/control_plane/simple.conf b/control_plane/simple.conf index 0ad90a4618..1eb21f846e 100644 --- a/control_plane/simple.conf +++ b/control_plane/simple.conf @@ -4,8 +4,10 @@ id=1 listen_pg_addr = '127.0.0.1:64000' listen_http_addr = '127.0.0.1:9898' +listen_grpc_addr = '127.0.0.1:51051' pg_auth_type = 'Trust' http_auth_type = 'Trust' +grpc_auth_type = 'Trust' [[safekeepers]] id = 1 diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 98ab6e5657..3bceef8fa7 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -32,6 +32,7 @@ use control_plane::storage_controller::{ }; use nix::fcntl::{Flock, FlockArg}; use pageserver_api::config::{ + DEFAULT_GRPC_LISTEN_PORT as DEFAULT_PAGESERVER_GRPC_PORT, DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT, DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT, }; @@ -1007,13 +1008,16 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result { let pageserver_id = NodeId(DEFAULT_PAGESERVER_ID.0 + i as u64); let pg_port = DEFAULT_PAGESERVER_PG_PORT + i; let http_port = DEFAULT_PAGESERVER_HTTP_PORT + i; + let grpc_port = DEFAULT_PAGESERVER_GRPC_PORT + i; NeonLocalInitPageserverConf { id: pageserver_id, listen_pg_addr: format!("127.0.0.1:{pg_port}"), listen_http_addr: format!("127.0.0.1:{http_port}"), listen_https_addr: None, + listen_grpc_addr: Some(format!("127.0.0.1:{grpc_port}")), pg_auth_type: AuthType::Trust, http_auth_type: AuthType::Trust, + grpc_auth_type: AuthType::Trust, other: Default::default(), // Typical developer machines use disks with slow fsync, and we don't care // about data integrity: disable disk syncs. diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 4a8892c6de..47b77f0720 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -278,8 +278,10 @@ pub struct PageServerConf { pub listen_pg_addr: String, pub listen_http_addr: String, pub listen_https_addr: Option, + pub listen_grpc_addr: Option, pub pg_auth_type: AuthType, pub http_auth_type: AuthType, + pub grpc_auth_type: AuthType, pub no_sync: bool, } @@ -290,8 +292,10 @@ impl Default for PageServerConf { listen_pg_addr: String::new(), listen_http_addr: String::new(), listen_https_addr: None, + listen_grpc_addr: None, pg_auth_type: AuthType::Trust, http_auth_type: AuthType::Trust, + grpc_auth_type: AuthType::Trust, no_sync: false, } } @@ -306,8 +310,10 @@ pub struct NeonLocalInitPageserverConf { pub listen_pg_addr: String, pub listen_http_addr: String, pub listen_https_addr: Option, + pub listen_grpc_addr: Option, pub pg_auth_type: AuthType, pub http_auth_type: AuthType, + pub grpc_auth_type: AuthType, #[serde(default, skip_serializing_if = "std::ops::Not::not")] pub no_sync: bool, #[serde(flatten)] @@ -321,8 +327,10 @@ impl From<&NeonLocalInitPageserverConf> for PageServerConf { listen_pg_addr, listen_http_addr, listen_https_addr, + listen_grpc_addr, pg_auth_type, http_auth_type, + grpc_auth_type, no_sync, other: _, } = conf; @@ -331,7 +339,9 @@ impl From<&NeonLocalInitPageserverConf> for PageServerConf { listen_pg_addr: listen_pg_addr.clone(), listen_http_addr: listen_http_addr.clone(), listen_https_addr: listen_https_addr.clone(), + listen_grpc_addr: listen_grpc_addr.clone(), pg_auth_type: *pg_auth_type, + grpc_auth_type: *grpc_auth_type, http_auth_type: *http_auth_type, no_sync: *no_sync, } @@ -707,8 +717,10 @@ impl LocalEnv { listen_pg_addr: String, listen_http_addr: String, listen_https_addr: Option, + listen_grpc_addr: Option, pg_auth_type: AuthType, http_auth_type: AuthType, + grpc_auth_type: AuthType, #[serde(default)] no_sync: bool, } @@ -732,8 +744,10 @@ impl LocalEnv { listen_pg_addr, listen_http_addr, listen_https_addr, + listen_grpc_addr, pg_auth_type, http_auth_type, + grpc_auth_type, no_sync, } = config_toml; let IdentityTomlSubset { @@ -750,8 +764,10 @@ impl LocalEnv { listen_pg_addr, listen_http_addr, listen_https_addr, + listen_grpc_addr, pg_auth_type, http_auth_type, + grpc_auth_type, no_sync, }; pageservers.push(conf); diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 756f2b02db..29314dab9e 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -129,7 +129,9 @@ impl PageServerNode { )); } - if conf.http_auth_type != AuthType::Trust || conf.pg_auth_type != AuthType::Trust { + if [conf.http_auth_type, conf.pg_auth_type, conf.grpc_auth_type] + .contains(&AuthType::NeonJWT) + { // Keys are generated in the toplevel repo dir, pageservers' workdirs // are one level below that, so refer to keys with ../ overrides.push("auth_validation_public_key_path='../auth_public_key.pem'".to_owned()); diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index 0fb2ff38ff..daec65ce2d 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -8,6 +8,8 @@ pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000; pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}"); pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898; pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}"); +// TODO: gRPC is disabled by default for now, but the port is used in neon_local. +pub const DEFAULT_GRPC_LISTEN_PORT: u16 = 51051; // storage-broker already uses 50051 use std::collections::HashMap; use std::num::{NonZeroU64, NonZeroUsize}; @@ -104,6 +106,7 @@ pub struct ConfigToml { pub listen_pg_addr: String, pub listen_http_addr: String, pub listen_https_addr: Option, + pub listen_grpc_addr: Option, pub ssl_key_file: Utf8PathBuf, pub ssl_cert_file: Utf8PathBuf, #[serde(with = "humantime_serde")] @@ -123,6 +126,7 @@ pub struct ConfigToml { pub http_auth_type: AuthType, #[serde_as(as = "serde_with::DisplayFromStr")] pub pg_auth_type: AuthType, + pub grpc_auth_type: AuthType, pub auth_validation_public_key_path: Option, pub remote_storage: Option, pub tenant_config: TenantConfigToml, @@ -588,6 +592,7 @@ impl Default for ConfigToml { listen_pg_addr: (DEFAULT_PG_LISTEN_ADDR.to_string()), listen_http_addr: (DEFAULT_HTTP_LISTEN_ADDR.to_string()), listen_https_addr: (None), + listen_grpc_addr: None, // TODO: default to 127.0.0.1:51051 ssl_key_file: Utf8PathBuf::from(DEFAULT_SSL_KEY_FILE), ssl_cert_file: Utf8PathBuf::from(DEFAULT_SSL_CERT_FILE), ssl_cert_reload_period: Duration::from_secs(60), @@ -604,6 +609,7 @@ impl Default for ConfigToml { pg_distrib_dir: None, // Utf8PathBuf::from("./pg_install"), // TODO: formely, this was std::env::current_dir() http_auth_type: (AuthType::Trust), pg_auth_type: (AuthType::Trust), + grpc_auth_type: (AuthType::Trust), auth_validation_public_key_path: (None), remote_storage: None, broker_endpoint: (storage_broker::DEFAULT_ENDPOINT diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 6a9a5a292a..1f5cc89b33 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -43,6 +43,7 @@ nix.workspace = true num_cpus.workspace = true num-traits.workspace = true once_cell.workspace = true +pageserver_page_api.workspace = true pin-project-lite.workspace = true postgres_backend.workspace = true postgres-protocol.workspace = true @@ -71,6 +72,8 @@ tokio-rustls.workspace = true tokio-stream.workspace = true tokio-util.workspace = true toml_edit = { workspace = true, features = [ "serde" ] } +tonic.workspace = true +tonic-reflection.workspace = true tracing.workspace = true tracing-utils.workspace = true url.workspace = true diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 6001ea0345..8d76d0d678 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -388,23 +388,30 @@ fn start_pageserver( // We need to release the lock file only when the process exits. std::mem::forget(lock_file); - // Bind the HTTP and libpq ports early, so that if they are in use by some other - // process, we error out early. - let http_addr = &conf.listen_http_addr; - info!("Starting pageserver http handler on {http_addr}"); - let http_listener = tcp_listener::bind(http_addr)?; + // Bind the HTTP, libpq, and gRPC ports early, to error out if they are + // already in use. + info!( + "Starting pageserver http handler on {} with auth {:#?}", + conf.listen_http_addr, conf.http_auth_type + ); + let http_listener = tcp_listener::bind(&conf.listen_http_addr)?; let https_listener = match conf.listen_https_addr.as_ref() { Some(https_addr) => { - info!("Starting pageserver https handler on {https_addr}"); + info!( + "Starting pageserver https handler on {https_addr} with auth {:#?}", + conf.http_auth_type + ); Some(tcp_listener::bind(https_addr)?) } None => None, }; - let pg_addr = &conf.listen_pg_addr; - info!("Starting pageserver pg protocol handler on {pg_addr}"); - let pageserver_listener = tcp_listener::bind(pg_addr)?; + info!( + "Starting pageserver pg protocol handler on {} with auth {:#?}", + conf.listen_pg_addr, conf.pg_auth_type, + ); + let pageserver_listener = tcp_listener::bind(&conf.listen_pg_addr)?; // Enable SO_KEEPALIVE on the socket, to detect dead connections faster. // These are configured via net.ipv4.tcp_keepalive_* sysctls. @@ -413,6 +420,15 @@ fn start_pageserver( // support enabling keepalives while using the default OS sysctls. setsockopt(&pageserver_listener, sockopt::KeepAlive, &true)?; + let mut grpc_listener = None; + if let Some(grpc_addr) = &conf.listen_grpc_addr { + info!( + "Starting pageserver gRPC handler on {grpc_addr} with auth {:#?}", + conf.grpc_auth_type + ); + grpc_listener = Some(tcp_listener::bind(grpc_addr).map_err(|e| anyhow!("{e}"))?); + } + // Launch broker client // The storage_broker::connect call needs to happen inside a tokio runtime thread. let broker_client = WALRECEIVER_RUNTIME @@ -440,7 +456,8 @@ fn start_pageserver( // Initialize authentication for incoming connections let http_auth; let pg_auth; - if conf.http_auth_type == AuthType::NeonJWT || conf.pg_auth_type == AuthType::NeonJWT { + let grpc_auth; + if [conf.http_auth_type, conf.pg_auth_type, conf.grpc_auth_type].contains(&AuthType::NeonJWT) { // unwrap is ok because check is performed when creating config, so path is set and exists let key_path = conf.auth_validation_public_key_path.as_ref().unwrap(); info!("Loading public key(s) for verifying JWT tokens from {key_path:?}"); @@ -448,20 +465,23 @@ fn start_pageserver( let jwt_auth = JwtAuth::from_key_path(key_path)?; let auth: Arc = Arc::new(SwappableJwtAuth::new(jwt_auth)); - http_auth = match &conf.http_auth_type { + http_auth = match conf.http_auth_type { AuthType::Trust => None, AuthType::NeonJWT => Some(auth.clone()), }; - pg_auth = match &conf.pg_auth_type { + pg_auth = match conf.pg_auth_type { + AuthType::Trust => None, + AuthType::NeonJWT => Some(auth.clone()), + }; + grpc_auth = match conf.grpc_auth_type { AuthType::Trust => None, AuthType::NeonJWT => Some(auth), }; } else { http_auth = None; pg_auth = None; + grpc_auth = None; } - info!("Using auth for http API: {:#?}", conf.http_auth_type); - info!("Using auth for pg connections: {:#?}", conf.pg_auth_type); let tls_server_config = if conf.listen_https_addr.is_some() || conf.enable_tls_page_service_api { @@ -776,9 +796,27 @@ fn start_pageserver( } else { None }, - basebackup_cache, + basebackup_cache.clone(), ); + // Spawn a Pageserver gRPC server task. It will spawn separate tasks for + // each stream/request. + // + // TODO: this uses a separate Tokio runtime for the page service. If we want + // other gRPC services, they will need their own port and runtime. Is this + // necessary? + let mut page_service_grpc = None; + if let Some(grpc_listener) = grpc_listener { + page_service_grpc = Some(page_service::spawn_grpc( + conf, + tenant_manager.clone(), + grpc_auth, + otel_guard.as_ref().map(|g| g.dispatch.clone()), + grpc_listener, + basebackup_cache, + )?); + } + // All started up! Now just sit and wait for shutdown signal. BACKGROUND_RUNTIME.block_on(async move { let signal_token = CancellationToken::new(); @@ -797,6 +835,7 @@ fn start_pageserver( http_endpoint_listener, https_endpoint_listener, page_service, + page_service_grpc, consumption_metrics_tasks, disk_usage_eviction_task, &tenant_manager, diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index e8b3b7b3ab..e8af548ec4 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -58,11 +58,16 @@ pub struct PageServerConf { pub listen_http_addr: String, /// Example: 127.0.0.1:9899 pub listen_https_addr: Option, + /// If set, expose a gRPC API on this address. + /// Example: 127.0.0.1:51051 + /// + /// EXPERIMENTAL: this protocol is unstable and under active development. + pub listen_grpc_addr: Option, - /// Path to a file with certificate's private key for https API. + /// Path to a file with certificate's private key for https and gRPC API. /// Default: server.key pub ssl_key_file: Utf8PathBuf, - /// Path to a file with a X509 certificate for https API. + /// Path to a file with a X509 certificate for https and gRPC API. /// Default: server.crt pub ssl_cert_file: Utf8PathBuf, /// Period to reload certificate and private key from files. @@ -100,6 +105,8 @@ pub struct PageServerConf { pub http_auth_type: AuthType, /// authentication method for libpq connections from compute pub pg_auth_type: AuthType, + /// authentication method for gRPC connections from compute + pub grpc_auth_type: AuthType, /// Path to a file or directory containing public key(s) for verifying JWT tokens. /// Used for both mgmt and compute auth, if enabled. pub auth_validation_public_key_path: Option, @@ -355,6 +362,7 @@ impl PageServerConf { listen_pg_addr, listen_http_addr, listen_https_addr, + listen_grpc_addr, ssl_key_file, ssl_cert_file, ssl_cert_reload_period, @@ -369,6 +377,7 @@ impl PageServerConf { pg_distrib_dir, http_auth_type, pg_auth_type, + grpc_auth_type, auth_validation_public_key_path, remote_storage, broker_endpoint, @@ -423,6 +432,7 @@ impl PageServerConf { listen_pg_addr, listen_http_addr, listen_https_addr, + listen_grpc_addr, ssl_key_file, ssl_cert_file, ssl_cert_reload_period, @@ -435,6 +445,7 @@ impl PageServerConf { max_file_descriptors, http_auth_type, pg_auth_type, + grpc_auth_type, auth_validation_public_key_path, remote_storage_config: remote_storage, broker_endpoint, @@ -531,7 +542,9 @@ impl PageServerConf { // custom validation code that covers more than one field in isolation // ------------------------------------------------------------ - if conf.http_auth_type == AuthType::NeonJWT || conf.pg_auth_type == AuthType::NeonJWT { + if [conf.http_auth_type, conf.pg_auth_type, conf.grpc_auth_type] + .contains(&AuthType::NeonJWT) + { let auth_validation_public_key_path = conf .auth_validation_public_key_path .get_or_insert_with(|| workdir.join("auth_public_key.pem")); diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 71d9c6603f..25461c23ab 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -84,6 +84,7 @@ pub async fn shutdown_pageserver( http_listener: HttpEndpointListener, https_listener: Option, page_service: page_service::Listener, + grpc_task: Option, consumption_metrics_worker: ConsumptionMetricsTasks, disk_usage_eviction_task: Option, tenant_manager: &TenantManager, @@ -177,6 +178,16 @@ pub async fn shutdown_pageserver( ) .await; + // Shut down the gRPC server task, including request handlers. + if let Some(grpc_task) = grpc_task { + timed( + grpc_task.shutdown(), + "shutdown gRPC PageRequestHandler", + Duration::from_secs(3), + ) + .await; + } + // Shut down all the tenants. This flushes everything to disk and kills // the checkpoint and GC tasks. timed( diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 69519dfa87..34dc158694 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -4,6 +4,7 @@ use std::borrow::Cow; use std::num::NonZeroUsize; use std::os::fd::AsRawFd; +use std::pin::Pin; use std::str::FromStr; use std::sync::Arc; use std::time::{Duration, Instant, SystemTime}; @@ -12,7 +13,7 @@ use std::{io, str}; use anyhow::{Context, bail}; use async_compression::tokio::write::GzipEncoder; use bytes::Buf; -use futures::FutureExt; +use futures::{FutureExt, Stream}; use itertools::Itertools; use jsonwebtoken::TokenData; use once_cell::sync::OnceCell; @@ -30,6 +31,7 @@ use pageserver_api::models::{ }; use pageserver_api::reltag::SlruKind; use pageserver_api::shard::TenantShardId; +use pageserver_page_api::proto; use postgres_backend::{ AuthType, PostgresBackend, PostgresBackendReader, QueryError, is_expected_io_error, }; @@ -51,9 +53,8 @@ use utils::simple_rcu::RcuReadGuard; use utils::sync::gate::{Gate, GateGuard}; use utils::sync::spsc_fold; -use crate::PERF_TRACE_TARGET; use crate::auth::check_permission; -use crate::basebackup::BasebackupError; +use crate::basebackup::{self, BasebackupError}; use crate::basebackup_cache::BasebackupCache; use crate::config::PageServerConf; use crate::context::{ @@ -75,7 +76,7 @@ use crate::tenant::mgr::{ use crate::tenant::storage_layer::IoConcurrency; use crate::tenant::timeline::{self, WaitLsnError}; use crate::tenant::{GetTimelineError, PageReconstructError, Timeline}; -use crate::{basebackup, timed_after_cancellation}; +use crate::{CancellableTask, PERF_TRACE_TARGET, timed_after_cancellation}; /// How long we may wait for a [`crate::tenant::mgr::TenantSlot::InProgress`]` and/or a [`crate::tenant::TenantShard`] which /// is not yet in state [`TenantState::Active`]. @@ -86,6 +87,26 @@ const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000); /// Threshold at which to log slow GetPage requests. const LOG_SLOW_GETPAGE_THRESHOLD: Duration = Duration::from_secs(30); +/// The idle time before sending TCP keepalive probes for gRPC connections. The +/// interval and timeout between each probe is configured via sysctl. This +/// allows detecting dead connections sooner. +const GRPC_TCP_KEEPALIVE_TIME: Duration = Duration::from_secs(60); + +/// Whether to enable TCP nodelay for gRPC connections. This disables Nagle's +/// algorithm, which can cause latency spikes for small messages. +const GRPC_TCP_NODELAY: bool = true; + +/// The interval between HTTP2 keepalive pings. This allows shutting down server +/// tasks when clients are unresponsive. +const GRPC_HTTP2_KEEPALIVE_INTERVAL: Duration = Duration::from_secs(30); + +/// The timeout for HTTP2 keepalive pings. Should be <= GRPC_KEEPALIVE_INTERVAL. +const GRPC_HTTP2_KEEPALIVE_TIMEOUT: Duration = Duration::from_secs(20); + +/// Number of concurrent gRPC streams per TCP connection. We expect something +/// like 8 GetPage streams per connections, plus any unary requests. +const GRPC_MAX_CONCURRENT_STREAMS: u32 = 256; + /////////////////////////////////////////////////////////////////////////////// pub struct Listener { @@ -140,6 +161,83 @@ pub fn spawn( Listener { cancel, task } } +/// Spawns a gRPC server for the page service. +/// +/// TODO: this doesn't support TLS. We need TLS reloading via ReloadingCertificateResolver, so we +/// need to reimplement the TCP+TLS accept loop ourselves. +pub fn spawn_grpc( + conf: &'static PageServerConf, + tenant_manager: Arc, + auth: Option>, + perf_trace_dispatch: Option, + listener: std::net::TcpListener, + basebackup_cache: Arc, +) -> anyhow::Result { + let cancel = CancellationToken::new(); + let ctx = RequestContextBuilder::new(TaskKind::PageRequestHandler) + .download_behavior(DownloadBehavior::Download) + .perf_span_dispatch(perf_trace_dispatch) + .detached_child(); + let gate = Gate::default(); + + // Set up the TCP socket. We take a preconfigured TcpListener to bind the + // port early during startup. + let incoming = { + let _runtime = COMPUTE_REQUEST_RUNTIME.enter(); // required by TcpListener::from_std + listener.set_nonblocking(true)?; + tonic::transport::server::TcpIncoming::from(tokio::net::TcpListener::from_std(listener)?) + .with_nodelay(Some(GRPC_TCP_NODELAY)) + .with_keepalive(Some(GRPC_TCP_KEEPALIVE_TIME)) + }; + + // Set up the gRPC server. + // + // TODO: consider tuning window sizes. + // TODO: wire up tracing. + let mut server = tonic::transport::Server::builder() + .http2_keepalive_interval(Some(GRPC_HTTP2_KEEPALIVE_INTERVAL)) + .http2_keepalive_timeout(Some(GRPC_HTTP2_KEEPALIVE_TIMEOUT)) + .max_concurrent_streams(Some(GRPC_MAX_CONCURRENT_STREAMS)); + + // Main page service. + let page_service = proto::PageServiceServer::new(PageServerHandler::new( + tenant_manager, + auth, + PageServicePipeliningConfig::Serial, // TODO: unused with gRPC + conf.get_vectored_concurrent_io, + ConnectionPerfSpanFields::default(), + basebackup_cache, + ctx, + cancel.clone(), + gate.enter().expect("just created"), + )); + let server = server.add_service(page_service); + + // Reflection service for use with e.g. grpcurl. + let reflection_service = tonic_reflection::server::Builder::configure() + .register_encoded_file_descriptor_set(proto::FILE_DESCRIPTOR_SET) + .build_v1()?; + let server = server.add_service(reflection_service); + + // Spawn server task. + let task_cancel = cancel.clone(); + let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error( + "grpc listener", + async move { + let result = server + .serve_with_incoming_shutdown(incoming, task_cancel.cancelled()) + .await; + if result.is_ok() { + // TODO: revisit shutdown logic once page service is implemented. + gate.close().await; + } + result + }, + )); + + Ok(CancellableTask { task, cancel }) +} + impl Listener { pub async fn stop_accepting(self) -> Connections { self.cancel.cancel(); @@ -259,7 +357,7 @@ type ConnectionHandlerResult = anyhow::Result<()>; /// Perf root spans start at the per-request level, after shard routing. /// This struct carries connection-level information to the root perf span definition. -#[derive(Clone)] +#[derive(Clone, Default)] struct ConnectionPerfSpanFields { peer_addr: String, application_name: Option, @@ -377,6 +475,11 @@ async fn page_service_conn_main( } } +/// Page service connection handler. +/// +/// TODO: for gRPC, this will be shared by all requests from all connections. +/// Decompose it into global state and per-connection/request state, and make +/// libpq-specific options (e.g. pipelining) separate. struct PageServerHandler { auth: Option>, claims: Option, @@ -3117,6 +3220,60 @@ where } } +/// Implements the page service over gRPC. +/// +/// TODO: not yet implemented, all methods return unimplemented. +#[tonic::async_trait] +impl proto::PageService for PageServerHandler { + type GetBaseBackupStream = Pin< + Box> + Send>, + >; + type GetPagesStream = + Pin> + Send>>; + + async fn check_rel_exists( + &self, + _: tonic::Request, + ) -> Result, tonic::Status> { + Err(tonic::Status::unimplemented("not implemented")) + } + + async fn get_base_backup( + &self, + _: tonic::Request, + ) -> Result, tonic::Status> { + Err(tonic::Status::unimplemented("not implemented")) + } + + async fn get_db_size( + &self, + _: tonic::Request, + ) -> Result, tonic::Status> { + Err(tonic::Status::unimplemented("not implemented")) + } + + async fn get_pages( + &self, + _: tonic::Request>, + ) -> Result, tonic::Status> { + Err(tonic::Status::unimplemented("not implemented")) + } + + async fn get_rel_size( + &self, + _: tonic::Request, + ) -> Result, tonic::Status> { + Err(tonic::Status::unimplemented("not implemented")) + } + + async fn get_slru_segment( + &self, + _: tonic::Request, + ) -> Result, tonic::Status> { + Err(tonic::Status::unimplemented("not implemented")) + } +} + impl From for QueryError { fn from(e: GetActiveTenantError) -> Self { match e { diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index 55272b2125..29897af642 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -276,9 +276,10 @@ pub enum TaskKind { // HTTP endpoint listener. HttpEndpointListener, - // Task that handles a single connection. A PageRequestHandler task - // starts detached from any particular tenant or timeline, but it can be - // associated with one later, after receiving a command from the client. + /// Task that handles a single page service connection. A PageRequestHandler + /// task starts detached from any particular tenant or timeline, but it can + /// be associated with one later, after receiving a command from the client. + /// Also used for the gRPC page service API, including the main server task. PageRequestHandler, /// Manages the WAL receiver connection for one timeline. diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 5c92f2e2d0..dda4d40a11 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1224,6 +1224,7 @@ class NeonEnv: # Create config for pageserver http_auth_type = "NeonJWT" if config.auth_enabled else "Trust" pg_auth_type = "NeonJWT" if config.auth_enabled else "Trust" + grpc_auth_type = "NeonJWT" if config.auth_enabled else "Trust" for ps_id in range( self.BASE_PAGESERVER_ID, self.BASE_PAGESERVER_ID + config.num_pageservers ): @@ -1250,6 +1251,7 @@ class NeonEnv: else None, "pg_auth_type": pg_auth_type, "http_auth_type": http_auth_type, + "grpc_auth_type": grpc_auth_type, "availability_zone": availability_zone, # Disable pageserver disk syncs in tests: when running tests concurrently, this avoids # the pageserver taking a long time to start up due to syncfs flushing other tests' data diff --git a/workspace_hack/Cargo.toml b/workspace_hack/Cargo.toml index 9e1123ac0e..726d7c20c9 100644 --- a/workspace_hack/Cargo.toml +++ b/workspace_hack/Cargo.toml @@ -18,6 +18,8 @@ license.workspace = true ahash = { version = "0.8" } anstream = { version = "0.6" } anyhow = { version = "1", features = ["backtrace"] } +axum = { version = "0.8", features = ["ws"] } +axum-core = { version = "0.5", default-features = false, features = ["tracing"] } base64-594e8ee84c453af0 = { package = "base64", version = "0.13", features = ["alloc"] } base64-647d43efb71741da = { package = "base64", version = "0.21" } base64ct = { version = "1", default-features = false, features = ["std"] } @@ -52,7 +54,7 @@ hex = { version = "0.4", features = ["serde"] } hmac = { version = "0.12", default-features = false, features = ["reset"] } hyper-582f2526e08bb6a0 = { package = "hyper", version = "0.14", features = ["client", "http1", "http2", "runtime", "server", "stream"] } hyper-dff4ba8e3ae991db = { package = "hyper", version = "1", features = ["full"] } -hyper-util = { version = "0.1", features = ["client-legacy", "http1", "http2", "server", "service"] } +hyper-util = { version = "0.1", features = ["client-legacy", "server-auto", "service"] } indexmap = { version = "2", features = ["serde"] } itertools = { version = "0.12" } lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] } @@ -98,7 +100,7 @@ tikv-jemalloc-sys = { version = "0.6", features = ["profiling", "stats", "unpref time = { version = "0.3", features = ["macros", "serde-well-known"] } tokio = { version = "1", features = ["full", "test-util"] } tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring", "tls12"] } -tokio-stream = { version = "0.1" } +tokio-stream = { version = "0.1", features = ["net"] } tokio-util = { version = "0.7", features = ["codec", "compat", "io", "rt"] } toml_edit = { version = "0.22", features = ["serde"] } tower = { version = "0.5", default-features = false, features = ["balance", "buffer", "limit", "log"] }