pageserver: add gRPC server (#11972)

## Problem

We want to expose the page service over gRPC, for use with the
communicator.

Requires #11995.
Touches #11728.

## Summary of changes

This patch wires up a gRPC server in the Pageserver, using Tonic. It
does not yet implement the actual page service.

* Adds `listen_grpc_addr` and `grpc_auth_type` config options (disabled
by default).
* Enables gRPC by default with `neon_local`.
* Stub implementation of `page_api.PageService`, returning unimplemented
errors.
* gRPC reflection service for use with e.g. `grpcurl`.

Subsequent PRs will implement the actual page service, including
authentication and observability.

Notably, TLS support is not yet implemented. Certificate reloading
requires us to reimplement the entire Tonic gRPC server.
This commit is contained in:
Erik Grinaker
2025-05-26 10:27:48 +02:00
committed by GitHub
parent abc6c84262
commit ec991877f4
16 changed files with 312 additions and 30 deletions

21
Cargo.lock generated
View File

@@ -4321,6 +4321,7 @@ dependencies = [
"pageserver_api", "pageserver_api",
"pageserver_client", "pageserver_client",
"pageserver_compaction", "pageserver_compaction",
"pageserver_page_api",
"pem", "pem",
"pin-project-lite", "pin-project-lite",
"postgres-protocol", "postgres-protocol",
@@ -4363,6 +4364,8 @@ dependencies = [
"tokio-tar", "tokio-tar",
"tokio-util", "tokio-util",
"toml_edit", "toml_edit",
"tonic 0.13.1",
"tonic-reflection",
"tracing", "tracing",
"tracing-utils", "tracing-utils",
"twox-hash", "twox-hash",
@@ -7520,8 +7523,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9" checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"axum",
"base64 0.22.1", "base64 0.22.1",
"bytes", "bytes",
"h2 0.4.4",
"http 1.1.0", "http 1.1.0",
"http-body 1.0.0", "http-body 1.0.0",
"http-body-util", "http-body-util",
@@ -7532,6 +7537,7 @@ dependencies = [
"pin-project", "pin-project",
"prost 0.13.5", "prost 0.13.5",
"rustls-native-certs 0.8.0", "rustls-native-certs 0.8.0",
"socket2",
"tokio", "tokio",
"tokio-rustls 0.26.2", "tokio-rustls 0.26.2",
"tokio-stream", "tokio-stream",
@@ -7555,6 +7561,19 @@ dependencies = [
"syn 2.0.100", "syn 2.0.100",
] ]
[[package]]
name = "tonic-reflection"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9687bd5bfeafebdded2356950f278bba8226f0b32109537c4253406e09aafe1"
dependencies = [
"prost 0.13.5",
"prost-types 0.13.3",
"tokio",
"tokio-stream",
"tonic 0.13.1",
]
[[package]] [[package]]
name = "tower" name = "tower"
version = "0.4.13" version = "0.4.13"
@@ -8526,6 +8545,8 @@ dependencies = [
"ahash", "ahash",
"anstream", "anstream",
"anyhow", "anyhow",
"axum",
"axum-core",
"base64 0.13.1", "base64 0.13.1",
"base64 0.21.7", "base64 0.21.7",
"base64ct", "base64ct",

View File

@@ -199,7 +199,8 @@ tokio-tar = "0.3"
tokio-util = { version = "0.7.10", features = ["io", "rt"] } tokio-util = { version = "0.7.10", features = ["io", "rt"] }
toml = "0.8" toml = "0.8"
toml_edit = "0.22" toml_edit = "0.22"
tonic = { version = "0.13.1", default-features = false, features = ["channel", "codegen", "prost", "tls-ring", "tls-native-roots"] } tonic = { version = "0.13.1", default-features = false, features = ["channel", "codegen", "prost", "router", "server", "tls-ring", "tls-native-roots"] }
tonic-reflection = { version = "0.13.1", features = ["server"] }
tower = { version = "0.5.2", default-features = false } tower = { version = "0.5.2", default-features = false }
tower-http = { version = "0.6.2", features = ["auth", "request-id", "trace"] } tower-http = { version = "0.6.2", features = ["auth", "request-id", "trace"] }

View File

@@ -2,8 +2,10 @@
[pageserver] [pageserver]
listen_pg_addr = '127.0.0.1:64000' listen_pg_addr = '127.0.0.1:64000'
listen_http_addr = '127.0.0.1:9898' listen_http_addr = '127.0.0.1:9898'
listen_grpc_addr = '127.0.0.1:51051'
pg_auth_type = 'Trust' pg_auth_type = 'Trust'
http_auth_type = 'Trust' http_auth_type = 'Trust'
grpc_auth_type = 'Trust'
[[safekeepers]] [[safekeepers]]
id = 1 id = 1

View File

@@ -4,8 +4,10 @@
id=1 id=1
listen_pg_addr = '127.0.0.1:64000' listen_pg_addr = '127.0.0.1:64000'
listen_http_addr = '127.0.0.1:9898' listen_http_addr = '127.0.0.1:9898'
listen_grpc_addr = '127.0.0.1:51051'
pg_auth_type = 'Trust' pg_auth_type = 'Trust'
http_auth_type = 'Trust' http_auth_type = 'Trust'
grpc_auth_type = 'Trust'
[[safekeepers]] [[safekeepers]]
id = 1 id = 1

View File

@@ -32,6 +32,7 @@ use control_plane::storage_controller::{
}; };
use nix::fcntl::{Flock, FlockArg}; use nix::fcntl::{Flock, FlockArg};
use pageserver_api::config::{ use pageserver_api::config::{
DEFAULT_GRPC_LISTEN_PORT as DEFAULT_PAGESERVER_GRPC_PORT,
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT, DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT, DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT,
}; };
@@ -1007,13 +1008,16 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
let pageserver_id = NodeId(DEFAULT_PAGESERVER_ID.0 + i as u64); let pageserver_id = NodeId(DEFAULT_PAGESERVER_ID.0 + i as u64);
let pg_port = DEFAULT_PAGESERVER_PG_PORT + i; let pg_port = DEFAULT_PAGESERVER_PG_PORT + i;
let http_port = DEFAULT_PAGESERVER_HTTP_PORT + i; let http_port = DEFAULT_PAGESERVER_HTTP_PORT + i;
let grpc_port = DEFAULT_PAGESERVER_GRPC_PORT + i;
NeonLocalInitPageserverConf { NeonLocalInitPageserverConf {
id: pageserver_id, id: pageserver_id,
listen_pg_addr: format!("127.0.0.1:{pg_port}"), listen_pg_addr: format!("127.0.0.1:{pg_port}"),
listen_http_addr: format!("127.0.0.1:{http_port}"), listen_http_addr: format!("127.0.0.1:{http_port}"),
listen_https_addr: None, listen_https_addr: None,
listen_grpc_addr: Some(format!("127.0.0.1:{grpc_port}")),
pg_auth_type: AuthType::Trust, pg_auth_type: AuthType::Trust,
http_auth_type: AuthType::Trust, http_auth_type: AuthType::Trust,
grpc_auth_type: AuthType::Trust,
other: Default::default(), other: Default::default(),
// Typical developer machines use disks with slow fsync, and we don't care // Typical developer machines use disks with slow fsync, and we don't care
// about data integrity: disable disk syncs. // about data integrity: disable disk syncs.

View File

@@ -278,8 +278,10 @@ pub struct PageServerConf {
pub listen_pg_addr: String, pub listen_pg_addr: String,
pub listen_http_addr: String, pub listen_http_addr: String,
pub listen_https_addr: Option<String>, pub listen_https_addr: Option<String>,
pub listen_grpc_addr: Option<String>,
pub pg_auth_type: AuthType, pub pg_auth_type: AuthType,
pub http_auth_type: AuthType, pub http_auth_type: AuthType,
pub grpc_auth_type: AuthType,
pub no_sync: bool, pub no_sync: bool,
} }
@@ -290,8 +292,10 @@ impl Default for PageServerConf {
listen_pg_addr: String::new(), listen_pg_addr: String::new(),
listen_http_addr: String::new(), listen_http_addr: String::new(),
listen_https_addr: None, listen_https_addr: None,
listen_grpc_addr: None,
pg_auth_type: AuthType::Trust, pg_auth_type: AuthType::Trust,
http_auth_type: AuthType::Trust, http_auth_type: AuthType::Trust,
grpc_auth_type: AuthType::Trust,
no_sync: false, no_sync: false,
} }
} }
@@ -306,8 +310,10 @@ pub struct NeonLocalInitPageserverConf {
pub listen_pg_addr: String, pub listen_pg_addr: String,
pub listen_http_addr: String, pub listen_http_addr: String,
pub listen_https_addr: Option<String>, pub listen_https_addr: Option<String>,
pub listen_grpc_addr: Option<String>,
pub pg_auth_type: AuthType, pub pg_auth_type: AuthType,
pub http_auth_type: AuthType, pub http_auth_type: AuthType,
pub grpc_auth_type: AuthType,
#[serde(default, skip_serializing_if = "std::ops::Not::not")] #[serde(default, skip_serializing_if = "std::ops::Not::not")]
pub no_sync: bool, pub no_sync: bool,
#[serde(flatten)] #[serde(flatten)]
@@ -321,8 +327,10 @@ impl From<&NeonLocalInitPageserverConf> for PageServerConf {
listen_pg_addr, listen_pg_addr,
listen_http_addr, listen_http_addr,
listen_https_addr, listen_https_addr,
listen_grpc_addr,
pg_auth_type, pg_auth_type,
http_auth_type, http_auth_type,
grpc_auth_type,
no_sync, no_sync,
other: _, other: _,
} = conf; } = conf;
@@ -331,7 +339,9 @@ impl From<&NeonLocalInitPageserverConf> for PageServerConf {
listen_pg_addr: listen_pg_addr.clone(), listen_pg_addr: listen_pg_addr.clone(),
listen_http_addr: listen_http_addr.clone(), listen_http_addr: listen_http_addr.clone(),
listen_https_addr: listen_https_addr.clone(), listen_https_addr: listen_https_addr.clone(),
listen_grpc_addr: listen_grpc_addr.clone(),
pg_auth_type: *pg_auth_type, pg_auth_type: *pg_auth_type,
grpc_auth_type: *grpc_auth_type,
http_auth_type: *http_auth_type, http_auth_type: *http_auth_type,
no_sync: *no_sync, no_sync: *no_sync,
} }
@@ -707,8 +717,10 @@ impl LocalEnv {
listen_pg_addr: String, listen_pg_addr: String,
listen_http_addr: String, listen_http_addr: String,
listen_https_addr: Option<String>, listen_https_addr: Option<String>,
listen_grpc_addr: Option<String>,
pg_auth_type: AuthType, pg_auth_type: AuthType,
http_auth_type: AuthType, http_auth_type: AuthType,
grpc_auth_type: AuthType,
#[serde(default)] #[serde(default)]
no_sync: bool, no_sync: bool,
} }
@@ -732,8 +744,10 @@ impl LocalEnv {
listen_pg_addr, listen_pg_addr,
listen_http_addr, listen_http_addr,
listen_https_addr, listen_https_addr,
listen_grpc_addr,
pg_auth_type, pg_auth_type,
http_auth_type, http_auth_type,
grpc_auth_type,
no_sync, no_sync,
} = config_toml; } = config_toml;
let IdentityTomlSubset { let IdentityTomlSubset {
@@ -750,8 +764,10 @@ impl LocalEnv {
listen_pg_addr, listen_pg_addr,
listen_http_addr, listen_http_addr,
listen_https_addr, listen_https_addr,
listen_grpc_addr,
pg_auth_type, pg_auth_type,
http_auth_type, http_auth_type,
grpc_auth_type,
no_sync, no_sync,
}; };
pageservers.push(conf); pageservers.push(conf);

View File

@@ -129,7 +129,9 @@ impl PageServerNode {
)); ));
} }
if conf.http_auth_type != AuthType::Trust || conf.pg_auth_type != AuthType::Trust { if [conf.http_auth_type, conf.pg_auth_type, conf.grpc_auth_type]
.contains(&AuthType::NeonJWT)
{
// Keys are generated in the toplevel repo dir, pageservers' workdirs // Keys are generated in the toplevel repo dir, pageservers' workdirs
// are one level below that, so refer to keys with ../ // are one level below that, so refer to keys with ../
overrides.push("auth_validation_public_key_path='../auth_public_key.pem'".to_owned()); overrides.push("auth_validation_public_key_path='../auth_public_key.pem'".to_owned());

View File

@@ -8,6 +8,8 @@ pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}"); pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898; pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898;
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}"); pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
// TODO: gRPC is disabled by default for now, but the port is used in neon_local.
pub const DEFAULT_GRPC_LISTEN_PORT: u16 = 51051; // storage-broker already uses 50051
use std::collections::HashMap; use std::collections::HashMap;
use std::num::{NonZeroU64, NonZeroUsize}; use std::num::{NonZeroU64, NonZeroUsize};
@@ -104,6 +106,7 @@ pub struct ConfigToml {
pub listen_pg_addr: String, pub listen_pg_addr: String,
pub listen_http_addr: String, pub listen_http_addr: String,
pub listen_https_addr: Option<String>, pub listen_https_addr: Option<String>,
pub listen_grpc_addr: Option<String>,
pub ssl_key_file: Utf8PathBuf, pub ssl_key_file: Utf8PathBuf,
pub ssl_cert_file: Utf8PathBuf, pub ssl_cert_file: Utf8PathBuf,
#[serde(with = "humantime_serde")] #[serde(with = "humantime_serde")]
@@ -123,6 +126,7 @@ pub struct ConfigToml {
pub http_auth_type: AuthType, pub http_auth_type: AuthType,
#[serde_as(as = "serde_with::DisplayFromStr")] #[serde_as(as = "serde_with::DisplayFromStr")]
pub pg_auth_type: AuthType, pub pg_auth_type: AuthType,
pub grpc_auth_type: AuthType,
pub auth_validation_public_key_path: Option<Utf8PathBuf>, pub auth_validation_public_key_path: Option<Utf8PathBuf>,
pub remote_storage: Option<RemoteStorageConfig>, pub remote_storage: Option<RemoteStorageConfig>,
pub tenant_config: TenantConfigToml, pub tenant_config: TenantConfigToml,
@@ -588,6 +592,7 @@ impl Default for ConfigToml {
listen_pg_addr: (DEFAULT_PG_LISTEN_ADDR.to_string()), listen_pg_addr: (DEFAULT_PG_LISTEN_ADDR.to_string()),
listen_http_addr: (DEFAULT_HTTP_LISTEN_ADDR.to_string()), listen_http_addr: (DEFAULT_HTTP_LISTEN_ADDR.to_string()),
listen_https_addr: (None), listen_https_addr: (None),
listen_grpc_addr: None, // TODO: default to 127.0.0.1:51051
ssl_key_file: Utf8PathBuf::from(DEFAULT_SSL_KEY_FILE), ssl_key_file: Utf8PathBuf::from(DEFAULT_SSL_KEY_FILE),
ssl_cert_file: Utf8PathBuf::from(DEFAULT_SSL_CERT_FILE), ssl_cert_file: Utf8PathBuf::from(DEFAULT_SSL_CERT_FILE),
ssl_cert_reload_period: Duration::from_secs(60), ssl_cert_reload_period: Duration::from_secs(60),
@@ -604,6 +609,7 @@ impl Default for ConfigToml {
pg_distrib_dir: None, // Utf8PathBuf::from("./pg_install"), // TODO: formely, this was std::env::current_dir() pg_distrib_dir: None, // Utf8PathBuf::from("./pg_install"), // TODO: formely, this was std::env::current_dir()
http_auth_type: (AuthType::Trust), http_auth_type: (AuthType::Trust),
pg_auth_type: (AuthType::Trust), pg_auth_type: (AuthType::Trust),
grpc_auth_type: (AuthType::Trust),
auth_validation_public_key_path: (None), auth_validation_public_key_path: (None),
remote_storage: None, remote_storage: None,
broker_endpoint: (storage_broker::DEFAULT_ENDPOINT broker_endpoint: (storage_broker::DEFAULT_ENDPOINT

View File

@@ -43,6 +43,7 @@ nix.workspace = true
num_cpus.workspace = true num_cpus.workspace = true
num-traits.workspace = true num-traits.workspace = true
once_cell.workspace = true once_cell.workspace = true
pageserver_page_api.workspace = true
pin-project-lite.workspace = true pin-project-lite.workspace = true
postgres_backend.workspace = true postgres_backend.workspace = true
postgres-protocol.workspace = true postgres-protocol.workspace = true
@@ -71,6 +72,8 @@ tokio-rustls.workspace = true
tokio-stream.workspace = true tokio-stream.workspace = true
tokio-util.workspace = true tokio-util.workspace = true
toml_edit = { workspace = true, features = [ "serde" ] } toml_edit = { workspace = true, features = [ "serde" ] }
tonic.workspace = true
tonic-reflection.workspace = true
tracing.workspace = true tracing.workspace = true
tracing-utils.workspace = true tracing-utils.workspace = true
url.workspace = true url.workspace = true

View File

@@ -388,23 +388,30 @@ fn start_pageserver(
// We need to release the lock file only when the process exits. // We need to release the lock file only when the process exits.
std::mem::forget(lock_file); std::mem::forget(lock_file);
// Bind the HTTP and libpq ports early, so that if they are in use by some other // Bind the HTTP, libpq, and gRPC ports early, to error out if they are
// process, we error out early. // already in use.
let http_addr = &conf.listen_http_addr; info!(
info!("Starting pageserver http handler on {http_addr}"); "Starting pageserver http handler on {} with auth {:#?}",
let http_listener = tcp_listener::bind(http_addr)?; conf.listen_http_addr, conf.http_auth_type
);
let http_listener = tcp_listener::bind(&conf.listen_http_addr)?;
let https_listener = match conf.listen_https_addr.as_ref() { let https_listener = match conf.listen_https_addr.as_ref() {
Some(https_addr) => { Some(https_addr) => {
info!("Starting pageserver https handler on {https_addr}"); info!(
"Starting pageserver https handler on {https_addr} with auth {:#?}",
conf.http_auth_type
);
Some(tcp_listener::bind(https_addr)?) Some(tcp_listener::bind(https_addr)?)
} }
None => None, None => None,
}; };
let pg_addr = &conf.listen_pg_addr; info!(
info!("Starting pageserver pg protocol handler on {pg_addr}"); "Starting pageserver pg protocol handler on {} with auth {:#?}",
let pageserver_listener = tcp_listener::bind(pg_addr)?; conf.listen_pg_addr, conf.pg_auth_type,
);
let pageserver_listener = tcp_listener::bind(&conf.listen_pg_addr)?;
// Enable SO_KEEPALIVE on the socket, to detect dead connections faster. // Enable SO_KEEPALIVE on the socket, to detect dead connections faster.
// These are configured via net.ipv4.tcp_keepalive_* sysctls. // These are configured via net.ipv4.tcp_keepalive_* sysctls.
@@ -413,6 +420,15 @@ fn start_pageserver(
// support enabling keepalives while using the default OS sysctls. // support enabling keepalives while using the default OS sysctls.
setsockopt(&pageserver_listener, sockopt::KeepAlive, &true)?; setsockopt(&pageserver_listener, sockopt::KeepAlive, &true)?;
let mut grpc_listener = None;
if let Some(grpc_addr) = &conf.listen_grpc_addr {
info!(
"Starting pageserver gRPC handler on {grpc_addr} with auth {:#?}",
conf.grpc_auth_type
);
grpc_listener = Some(tcp_listener::bind(grpc_addr).map_err(|e| anyhow!("{e}"))?);
}
// Launch broker client // Launch broker client
// The storage_broker::connect call needs to happen inside a tokio runtime thread. // The storage_broker::connect call needs to happen inside a tokio runtime thread.
let broker_client = WALRECEIVER_RUNTIME let broker_client = WALRECEIVER_RUNTIME
@@ -440,7 +456,8 @@ fn start_pageserver(
// Initialize authentication for incoming connections // Initialize authentication for incoming connections
let http_auth; let http_auth;
let pg_auth; let pg_auth;
if conf.http_auth_type == AuthType::NeonJWT || conf.pg_auth_type == AuthType::NeonJWT { let grpc_auth;
if [conf.http_auth_type, conf.pg_auth_type, conf.grpc_auth_type].contains(&AuthType::NeonJWT) {
// unwrap is ok because check is performed when creating config, so path is set and exists // unwrap is ok because check is performed when creating config, so path is set and exists
let key_path = conf.auth_validation_public_key_path.as_ref().unwrap(); let key_path = conf.auth_validation_public_key_path.as_ref().unwrap();
info!("Loading public key(s) for verifying JWT tokens from {key_path:?}"); info!("Loading public key(s) for verifying JWT tokens from {key_path:?}");
@@ -448,20 +465,23 @@ fn start_pageserver(
let jwt_auth = JwtAuth::from_key_path(key_path)?; let jwt_auth = JwtAuth::from_key_path(key_path)?;
let auth: Arc<SwappableJwtAuth> = Arc::new(SwappableJwtAuth::new(jwt_auth)); let auth: Arc<SwappableJwtAuth> = Arc::new(SwappableJwtAuth::new(jwt_auth));
http_auth = match &conf.http_auth_type { http_auth = match conf.http_auth_type {
AuthType::Trust => None, AuthType::Trust => None,
AuthType::NeonJWT => Some(auth.clone()), AuthType::NeonJWT => Some(auth.clone()),
}; };
pg_auth = match &conf.pg_auth_type { pg_auth = match conf.pg_auth_type {
AuthType::Trust => None,
AuthType::NeonJWT => Some(auth.clone()),
};
grpc_auth = match conf.grpc_auth_type {
AuthType::Trust => None, AuthType::Trust => None,
AuthType::NeonJWT => Some(auth), AuthType::NeonJWT => Some(auth),
}; };
} else { } else {
http_auth = None; http_auth = None;
pg_auth = None; pg_auth = None;
grpc_auth = None;
} }
info!("Using auth for http API: {:#?}", conf.http_auth_type);
info!("Using auth for pg connections: {:#?}", conf.pg_auth_type);
let tls_server_config = if conf.listen_https_addr.is_some() || conf.enable_tls_page_service_api let tls_server_config = if conf.listen_https_addr.is_some() || conf.enable_tls_page_service_api
{ {
@@ -776,9 +796,27 @@ fn start_pageserver(
} else { } else {
None None
}, },
basebackup_cache, basebackup_cache.clone(),
); );
// Spawn a Pageserver gRPC server task. It will spawn separate tasks for
// each stream/request.
//
// TODO: this uses a separate Tokio runtime for the page service. If we want
// other gRPC services, they will need their own port and runtime. Is this
// necessary?
let mut page_service_grpc = None;
if let Some(grpc_listener) = grpc_listener {
page_service_grpc = Some(page_service::spawn_grpc(
conf,
tenant_manager.clone(),
grpc_auth,
otel_guard.as_ref().map(|g| g.dispatch.clone()),
grpc_listener,
basebackup_cache,
)?);
}
// All started up! Now just sit and wait for shutdown signal. // All started up! Now just sit and wait for shutdown signal.
BACKGROUND_RUNTIME.block_on(async move { BACKGROUND_RUNTIME.block_on(async move {
let signal_token = CancellationToken::new(); let signal_token = CancellationToken::new();
@@ -797,6 +835,7 @@ fn start_pageserver(
http_endpoint_listener, http_endpoint_listener,
https_endpoint_listener, https_endpoint_listener,
page_service, page_service,
page_service_grpc,
consumption_metrics_tasks, consumption_metrics_tasks,
disk_usage_eviction_task, disk_usage_eviction_task,
&tenant_manager, &tenant_manager,

View File

@@ -58,11 +58,16 @@ pub struct PageServerConf {
pub listen_http_addr: String, pub listen_http_addr: String,
/// Example: 127.0.0.1:9899 /// Example: 127.0.0.1:9899
pub listen_https_addr: Option<String>, pub listen_https_addr: Option<String>,
/// If set, expose a gRPC API on this address.
/// Example: 127.0.0.1:51051
///
/// EXPERIMENTAL: this protocol is unstable and under active development.
pub listen_grpc_addr: Option<String>,
/// Path to a file with certificate's private key for https API. /// Path to a file with certificate's private key for https and gRPC API.
/// Default: server.key /// Default: server.key
pub ssl_key_file: Utf8PathBuf, pub ssl_key_file: Utf8PathBuf,
/// Path to a file with a X509 certificate for https API. /// Path to a file with a X509 certificate for https and gRPC API.
/// Default: server.crt /// Default: server.crt
pub ssl_cert_file: Utf8PathBuf, pub ssl_cert_file: Utf8PathBuf,
/// Period to reload certificate and private key from files. /// Period to reload certificate and private key from files.
@@ -100,6 +105,8 @@ pub struct PageServerConf {
pub http_auth_type: AuthType, pub http_auth_type: AuthType,
/// authentication method for libpq connections from compute /// authentication method for libpq connections from compute
pub pg_auth_type: AuthType, pub pg_auth_type: AuthType,
/// authentication method for gRPC connections from compute
pub grpc_auth_type: AuthType,
/// Path to a file or directory containing public key(s) for verifying JWT tokens. /// Path to a file or directory containing public key(s) for verifying JWT tokens.
/// Used for both mgmt and compute auth, if enabled. /// Used for both mgmt and compute auth, if enabled.
pub auth_validation_public_key_path: Option<Utf8PathBuf>, pub auth_validation_public_key_path: Option<Utf8PathBuf>,
@@ -355,6 +362,7 @@ impl PageServerConf {
listen_pg_addr, listen_pg_addr,
listen_http_addr, listen_http_addr,
listen_https_addr, listen_https_addr,
listen_grpc_addr,
ssl_key_file, ssl_key_file,
ssl_cert_file, ssl_cert_file,
ssl_cert_reload_period, ssl_cert_reload_period,
@@ -369,6 +377,7 @@ impl PageServerConf {
pg_distrib_dir, pg_distrib_dir,
http_auth_type, http_auth_type,
pg_auth_type, pg_auth_type,
grpc_auth_type,
auth_validation_public_key_path, auth_validation_public_key_path,
remote_storage, remote_storage,
broker_endpoint, broker_endpoint,
@@ -423,6 +432,7 @@ impl PageServerConf {
listen_pg_addr, listen_pg_addr,
listen_http_addr, listen_http_addr,
listen_https_addr, listen_https_addr,
listen_grpc_addr,
ssl_key_file, ssl_key_file,
ssl_cert_file, ssl_cert_file,
ssl_cert_reload_period, ssl_cert_reload_period,
@@ -435,6 +445,7 @@ impl PageServerConf {
max_file_descriptors, max_file_descriptors,
http_auth_type, http_auth_type,
pg_auth_type, pg_auth_type,
grpc_auth_type,
auth_validation_public_key_path, auth_validation_public_key_path,
remote_storage_config: remote_storage, remote_storage_config: remote_storage,
broker_endpoint, broker_endpoint,
@@ -531,7 +542,9 @@ impl PageServerConf {
// custom validation code that covers more than one field in isolation // custom validation code that covers more than one field in isolation
// ------------------------------------------------------------ // ------------------------------------------------------------
if conf.http_auth_type == AuthType::NeonJWT || conf.pg_auth_type == AuthType::NeonJWT { if [conf.http_auth_type, conf.pg_auth_type, conf.grpc_auth_type]
.contains(&AuthType::NeonJWT)
{
let auth_validation_public_key_path = conf let auth_validation_public_key_path = conf
.auth_validation_public_key_path .auth_validation_public_key_path
.get_or_insert_with(|| workdir.join("auth_public_key.pem")); .get_or_insert_with(|| workdir.join("auth_public_key.pem"));

View File

@@ -84,6 +84,7 @@ pub async fn shutdown_pageserver(
http_listener: HttpEndpointListener, http_listener: HttpEndpointListener,
https_listener: Option<HttpsEndpointListener>, https_listener: Option<HttpsEndpointListener>,
page_service: page_service::Listener, page_service: page_service::Listener,
grpc_task: Option<CancellableTask>,
consumption_metrics_worker: ConsumptionMetricsTasks, consumption_metrics_worker: ConsumptionMetricsTasks,
disk_usage_eviction_task: Option<DiskUsageEvictionTask>, disk_usage_eviction_task: Option<DiskUsageEvictionTask>,
tenant_manager: &TenantManager, tenant_manager: &TenantManager,
@@ -177,6 +178,16 @@ pub async fn shutdown_pageserver(
) )
.await; .await;
// Shut down the gRPC server task, including request handlers.
if let Some(grpc_task) = grpc_task {
timed(
grpc_task.shutdown(),
"shutdown gRPC PageRequestHandler",
Duration::from_secs(3),
)
.await;
}
// Shut down all the tenants. This flushes everything to disk and kills // Shut down all the tenants. This flushes everything to disk and kills
// the checkpoint and GC tasks. // the checkpoint and GC tasks.
timed( timed(

View File

@@ -4,6 +4,7 @@
use std::borrow::Cow; use std::borrow::Cow;
use std::num::NonZeroUsize; use std::num::NonZeroUsize;
use std::os::fd::AsRawFd; use std::os::fd::AsRawFd;
use std::pin::Pin;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime}; use std::time::{Duration, Instant, SystemTime};
@@ -12,7 +13,7 @@ use std::{io, str};
use anyhow::{Context, bail}; use anyhow::{Context, bail};
use async_compression::tokio::write::GzipEncoder; use async_compression::tokio::write::GzipEncoder;
use bytes::Buf; use bytes::Buf;
use futures::FutureExt; use futures::{FutureExt, Stream};
use itertools::Itertools; use itertools::Itertools;
use jsonwebtoken::TokenData; use jsonwebtoken::TokenData;
use once_cell::sync::OnceCell; use once_cell::sync::OnceCell;
@@ -30,6 +31,7 @@ use pageserver_api::models::{
}; };
use pageserver_api::reltag::SlruKind; use pageserver_api::reltag::SlruKind;
use pageserver_api::shard::TenantShardId; use pageserver_api::shard::TenantShardId;
use pageserver_page_api::proto;
use postgres_backend::{ use postgres_backend::{
AuthType, PostgresBackend, PostgresBackendReader, QueryError, is_expected_io_error, AuthType, PostgresBackend, PostgresBackendReader, QueryError, is_expected_io_error,
}; };
@@ -51,9 +53,8 @@ use utils::simple_rcu::RcuReadGuard;
use utils::sync::gate::{Gate, GateGuard}; use utils::sync::gate::{Gate, GateGuard};
use utils::sync::spsc_fold; use utils::sync::spsc_fold;
use crate::PERF_TRACE_TARGET;
use crate::auth::check_permission; use crate::auth::check_permission;
use crate::basebackup::BasebackupError; use crate::basebackup::{self, BasebackupError};
use crate::basebackup_cache::BasebackupCache; use crate::basebackup_cache::BasebackupCache;
use crate::config::PageServerConf; use crate::config::PageServerConf;
use crate::context::{ use crate::context::{
@@ -75,7 +76,7 @@ use crate::tenant::mgr::{
use crate::tenant::storage_layer::IoConcurrency; use crate::tenant::storage_layer::IoConcurrency;
use crate::tenant::timeline::{self, WaitLsnError}; use crate::tenant::timeline::{self, WaitLsnError};
use crate::tenant::{GetTimelineError, PageReconstructError, Timeline}; use crate::tenant::{GetTimelineError, PageReconstructError, Timeline};
use crate::{basebackup, timed_after_cancellation}; use crate::{CancellableTask, PERF_TRACE_TARGET, timed_after_cancellation};
/// How long we may wait for a [`crate::tenant::mgr::TenantSlot::InProgress`]` and/or a [`crate::tenant::TenantShard`] which /// How long we may wait for a [`crate::tenant::mgr::TenantSlot::InProgress`]` and/or a [`crate::tenant::TenantShard`] which
/// is not yet in state [`TenantState::Active`]. /// is not yet in state [`TenantState::Active`].
@@ -86,6 +87,26 @@ const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
/// Threshold at which to log slow GetPage requests. /// Threshold at which to log slow GetPage requests.
const LOG_SLOW_GETPAGE_THRESHOLD: Duration = Duration::from_secs(30); const LOG_SLOW_GETPAGE_THRESHOLD: Duration = Duration::from_secs(30);
/// The idle time before sending TCP keepalive probes for gRPC connections. The
/// interval and timeout between each probe is configured via sysctl. This
/// allows detecting dead connections sooner.
const GRPC_TCP_KEEPALIVE_TIME: Duration = Duration::from_secs(60);
/// Whether to enable TCP nodelay for gRPC connections. This disables Nagle's
/// algorithm, which can cause latency spikes for small messages.
const GRPC_TCP_NODELAY: bool = true;
/// The interval between HTTP2 keepalive pings. This allows shutting down server
/// tasks when clients are unresponsive.
const GRPC_HTTP2_KEEPALIVE_INTERVAL: Duration = Duration::from_secs(30);
/// The timeout for HTTP2 keepalive pings. Should be <= GRPC_KEEPALIVE_INTERVAL.
const GRPC_HTTP2_KEEPALIVE_TIMEOUT: Duration = Duration::from_secs(20);
/// Number of concurrent gRPC streams per TCP connection. We expect something
/// like 8 GetPage streams per connections, plus any unary requests.
const GRPC_MAX_CONCURRENT_STREAMS: u32 = 256;
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
pub struct Listener { pub struct Listener {
@@ -140,6 +161,83 @@ pub fn spawn(
Listener { cancel, task } Listener { cancel, task }
} }
/// Spawns a gRPC server for the page service.
///
/// TODO: this doesn't support TLS. We need TLS reloading via ReloadingCertificateResolver, so we
/// need to reimplement the TCP+TLS accept loop ourselves.
pub fn spawn_grpc(
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<SwappableJwtAuth>>,
perf_trace_dispatch: Option<Dispatch>,
listener: std::net::TcpListener,
basebackup_cache: Arc<BasebackupCache>,
) -> anyhow::Result<CancellableTask> {
let cancel = CancellationToken::new();
let ctx = RequestContextBuilder::new(TaskKind::PageRequestHandler)
.download_behavior(DownloadBehavior::Download)
.perf_span_dispatch(perf_trace_dispatch)
.detached_child();
let gate = Gate::default();
// Set up the TCP socket. We take a preconfigured TcpListener to bind the
// port early during startup.
let incoming = {
let _runtime = COMPUTE_REQUEST_RUNTIME.enter(); // required by TcpListener::from_std
listener.set_nonblocking(true)?;
tonic::transport::server::TcpIncoming::from(tokio::net::TcpListener::from_std(listener)?)
.with_nodelay(Some(GRPC_TCP_NODELAY))
.with_keepalive(Some(GRPC_TCP_KEEPALIVE_TIME))
};
// Set up the gRPC server.
//
// TODO: consider tuning window sizes.
// TODO: wire up tracing.
let mut server = tonic::transport::Server::builder()
.http2_keepalive_interval(Some(GRPC_HTTP2_KEEPALIVE_INTERVAL))
.http2_keepalive_timeout(Some(GRPC_HTTP2_KEEPALIVE_TIMEOUT))
.max_concurrent_streams(Some(GRPC_MAX_CONCURRENT_STREAMS));
// Main page service.
let page_service = proto::PageServiceServer::new(PageServerHandler::new(
tenant_manager,
auth,
PageServicePipeliningConfig::Serial, // TODO: unused with gRPC
conf.get_vectored_concurrent_io,
ConnectionPerfSpanFields::default(),
basebackup_cache,
ctx,
cancel.clone(),
gate.enter().expect("just created"),
));
let server = server.add_service(page_service);
// Reflection service for use with e.g. grpcurl.
let reflection_service = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(proto::FILE_DESCRIPTOR_SET)
.build_v1()?;
let server = server.add_service(reflection_service);
// Spawn server task.
let task_cancel = cancel.clone();
let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
"grpc listener",
async move {
let result = server
.serve_with_incoming_shutdown(incoming, task_cancel.cancelled())
.await;
if result.is_ok() {
// TODO: revisit shutdown logic once page service is implemented.
gate.close().await;
}
result
},
));
Ok(CancellableTask { task, cancel })
}
impl Listener { impl Listener {
pub async fn stop_accepting(self) -> Connections { pub async fn stop_accepting(self) -> Connections {
self.cancel.cancel(); self.cancel.cancel();
@@ -259,7 +357,7 @@ type ConnectionHandlerResult = anyhow::Result<()>;
/// Perf root spans start at the per-request level, after shard routing. /// Perf root spans start at the per-request level, after shard routing.
/// This struct carries connection-level information to the root perf span definition. /// This struct carries connection-level information to the root perf span definition.
#[derive(Clone)] #[derive(Clone, Default)]
struct ConnectionPerfSpanFields { struct ConnectionPerfSpanFields {
peer_addr: String, peer_addr: String,
application_name: Option<String>, application_name: Option<String>,
@@ -377,6 +475,11 @@ async fn page_service_conn_main(
} }
} }
/// Page service connection handler.
///
/// TODO: for gRPC, this will be shared by all requests from all connections.
/// Decompose it into global state and per-connection/request state, and make
/// libpq-specific options (e.g. pipelining) separate.
struct PageServerHandler { struct PageServerHandler {
auth: Option<Arc<SwappableJwtAuth>>, auth: Option<Arc<SwappableJwtAuth>>,
claims: Option<Claims>, claims: Option<Claims>,
@@ -3117,6 +3220,60 @@ where
} }
} }
/// Implements the page service over gRPC.
///
/// TODO: not yet implemented, all methods return unimplemented.
#[tonic::async_trait]
impl proto::PageService for PageServerHandler {
type GetBaseBackupStream = Pin<
Box<dyn Stream<Item = Result<proto::GetBaseBackupResponseChunk, tonic::Status>> + Send>,
>;
type GetPagesStream =
Pin<Box<dyn Stream<Item = Result<proto::GetPageResponse, tonic::Status>> + Send>>;
async fn check_rel_exists(
&self,
_: tonic::Request<proto::CheckRelExistsRequest>,
) -> Result<tonic::Response<proto::CheckRelExistsResponse>, tonic::Status> {
Err(tonic::Status::unimplemented("not implemented"))
}
async fn get_base_backup(
&self,
_: tonic::Request<proto::GetBaseBackupRequest>,
) -> Result<tonic::Response<Self::GetBaseBackupStream>, tonic::Status> {
Err(tonic::Status::unimplemented("not implemented"))
}
async fn get_db_size(
&self,
_: tonic::Request<proto::GetDbSizeRequest>,
) -> Result<tonic::Response<proto::GetDbSizeResponse>, tonic::Status> {
Err(tonic::Status::unimplemented("not implemented"))
}
async fn get_pages(
&self,
_: tonic::Request<tonic::Streaming<proto::GetPageRequest>>,
) -> Result<tonic::Response<Self::GetPagesStream>, tonic::Status> {
Err(tonic::Status::unimplemented("not implemented"))
}
async fn get_rel_size(
&self,
_: tonic::Request<proto::GetRelSizeRequest>,
) -> Result<tonic::Response<proto::GetRelSizeResponse>, tonic::Status> {
Err(tonic::Status::unimplemented("not implemented"))
}
async fn get_slru_segment(
&self,
_: tonic::Request<proto::GetSlruSegmentRequest>,
) -> Result<tonic::Response<proto::GetSlruSegmentResponse>, tonic::Status> {
Err(tonic::Status::unimplemented("not implemented"))
}
}
impl From<GetActiveTenantError> for QueryError { impl From<GetActiveTenantError> for QueryError {
fn from(e: GetActiveTenantError) -> Self { fn from(e: GetActiveTenantError) -> Self {
match e { match e {

View File

@@ -276,9 +276,10 @@ pub enum TaskKind {
// HTTP endpoint listener. // HTTP endpoint listener.
HttpEndpointListener, HttpEndpointListener,
// Task that handles a single connection. A PageRequestHandler task /// Task that handles a single page service connection. A PageRequestHandler
// starts detached from any particular tenant or timeline, but it can be /// task starts detached from any particular tenant or timeline, but it can
// associated with one later, after receiving a command from the client. /// be associated with one later, after receiving a command from the client.
/// Also used for the gRPC page service API, including the main server task.
PageRequestHandler, PageRequestHandler,
/// Manages the WAL receiver connection for one timeline. /// Manages the WAL receiver connection for one timeline.

View File

@@ -1224,6 +1224,7 @@ class NeonEnv:
# Create config for pageserver # Create config for pageserver
http_auth_type = "NeonJWT" if config.auth_enabled else "Trust" http_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
pg_auth_type = "NeonJWT" if config.auth_enabled else "Trust" pg_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
grpc_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
for ps_id in range( for ps_id in range(
self.BASE_PAGESERVER_ID, self.BASE_PAGESERVER_ID + config.num_pageservers self.BASE_PAGESERVER_ID, self.BASE_PAGESERVER_ID + config.num_pageservers
): ):
@@ -1250,6 +1251,7 @@ class NeonEnv:
else None, else None,
"pg_auth_type": pg_auth_type, "pg_auth_type": pg_auth_type,
"http_auth_type": http_auth_type, "http_auth_type": http_auth_type,
"grpc_auth_type": grpc_auth_type,
"availability_zone": availability_zone, "availability_zone": availability_zone,
# Disable pageserver disk syncs in tests: when running tests concurrently, this avoids # Disable pageserver disk syncs in tests: when running tests concurrently, this avoids
# the pageserver taking a long time to start up due to syncfs flushing other tests' data # the pageserver taking a long time to start up due to syncfs flushing other tests' data

View File

@@ -18,6 +18,8 @@ license.workspace = true
ahash = { version = "0.8" } ahash = { version = "0.8" }
anstream = { version = "0.6" } anstream = { version = "0.6" }
anyhow = { version = "1", features = ["backtrace"] } anyhow = { version = "1", features = ["backtrace"] }
axum = { version = "0.8", features = ["ws"] }
axum-core = { version = "0.5", default-features = false, features = ["tracing"] }
base64-594e8ee84c453af0 = { package = "base64", version = "0.13", features = ["alloc"] } base64-594e8ee84c453af0 = { package = "base64", version = "0.13", features = ["alloc"] }
base64-647d43efb71741da = { package = "base64", version = "0.21" } base64-647d43efb71741da = { package = "base64", version = "0.21" }
base64ct = { version = "1", default-features = false, features = ["std"] } base64ct = { version = "1", default-features = false, features = ["std"] }
@@ -52,7 +54,7 @@ hex = { version = "0.4", features = ["serde"] }
hmac = { version = "0.12", default-features = false, features = ["reset"] } hmac = { version = "0.12", default-features = false, features = ["reset"] }
hyper-582f2526e08bb6a0 = { package = "hyper", version = "0.14", features = ["client", "http1", "http2", "runtime", "server", "stream"] } hyper-582f2526e08bb6a0 = { package = "hyper", version = "0.14", features = ["client", "http1", "http2", "runtime", "server", "stream"] }
hyper-dff4ba8e3ae991db = { package = "hyper", version = "1", features = ["full"] } hyper-dff4ba8e3ae991db = { package = "hyper", version = "1", features = ["full"] }
hyper-util = { version = "0.1", features = ["client-legacy", "http1", "http2", "server", "service"] } hyper-util = { version = "0.1", features = ["client-legacy", "server-auto", "service"] }
indexmap = { version = "2", features = ["serde"] } indexmap = { version = "2", features = ["serde"] }
itertools = { version = "0.12" } itertools = { version = "0.12" }
lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] } lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] }
@@ -98,7 +100,7 @@ tikv-jemalloc-sys = { version = "0.6", features = ["profiling", "stats", "unpref
time = { version = "0.3", features = ["macros", "serde-well-known"] } time = { version = "0.3", features = ["macros", "serde-well-known"] }
tokio = { version = "1", features = ["full", "test-util"] } tokio = { version = "1", features = ["full", "test-util"] }
tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring", "tls12"] } tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring", "tls12"] }
tokio-stream = { version = "0.1" } tokio-stream = { version = "0.1", features = ["net"] }
tokio-util = { version = "0.7", features = ["codec", "compat", "io", "rt"] } tokio-util = { version = "0.7", features = ["codec", "compat", "io", "rt"] }
toml_edit = { version = "0.22", features = ["serde"] } toml_edit = { version = "0.22", features = ["serde"] }
tower = { version = "0.5", default-features = false, features = ["balance", "buffer", "limit", "log"] } tower = { version = "0.5", default-features = false, features = ["balance", "buffer", "limit", "log"] }