mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-22 15:41:15 +00:00
Compare commits
2 Commits
MMeent/tes
...
erik/pages
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
472031e0b7 | ||
|
|
ec991877f4 |
21
Cargo.lock
generated
21
Cargo.lock
generated
@@ -4321,6 +4321,7 @@ dependencies = [
|
||||
"pageserver_api",
|
||||
"pageserver_client",
|
||||
"pageserver_compaction",
|
||||
"pageserver_page_api",
|
||||
"pem",
|
||||
"pin-project-lite",
|
||||
"postgres-protocol",
|
||||
@@ -4363,6 +4364,8 @@ dependencies = [
|
||||
"tokio-tar",
|
||||
"tokio-util",
|
||||
"toml_edit",
|
||||
"tonic 0.13.1",
|
||||
"tonic-reflection",
|
||||
"tracing",
|
||||
"tracing-utils",
|
||||
"twox-hash",
|
||||
@@ -7520,8 +7523,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"axum",
|
||||
"base64 0.22.1",
|
||||
"bytes",
|
||||
"h2 0.4.4",
|
||||
"http 1.1.0",
|
||||
"http-body 1.0.0",
|
||||
"http-body-util",
|
||||
@@ -7532,6 +7537,7 @@ dependencies = [
|
||||
"pin-project",
|
||||
"prost 0.13.5",
|
||||
"rustls-native-certs 0.8.0",
|
||||
"socket2",
|
||||
"tokio",
|
||||
"tokio-rustls 0.26.2",
|
||||
"tokio-stream",
|
||||
@@ -7555,6 +7561,19 @@ dependencies = [
|
||||
"syn 2.0.100",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tonic-reflection"
|
||||
version = "0.13.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f9687bd5bfeafebdded2356950f278bba8226f0b32109537c4253406e09aafe1"
|
||||
dependencies = [
|
||||
"prost 0.13.5",
|
||||
"prost-types 0.13.3",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tonic 0.13.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tower"
|
||||
version = "0.4.13"
|
||||
@@ -8526,6 +8545,8 @@ dependencies = [
|
||||
"ahash",
|
||||
"anstream",
|
||||
"anyhow",
|
||||
"axum",
|
||||
"axum-core",
|
||||
"base64 0.13.1",
|
||||
"base64 0.21.7",
|
||||
"base64ct",
|
||||
|
||||
@@ -199,7 +199,8 @@ tokio-tar = "0.3"
|
||||
tokio-util = { version = "0.7.10", features = ["io", "rt"] }
|
||||
toml = "0.8"
|
||||
toml_edit = "0.22"
|
||||
tonic = { version = "0.13.1", default-features = false, features = ["channel", "codegen", "prost", "tls-ring", "tls-native-roots"] }
|
||||
tonic = { version = "0.13.1", default-features = false, features = ["channel", "codegen", "prost", "router", "server", "tls-ring", "tls-native-roots"] }
|
||||
tonic-reflection = { version = "0.13.1", features = ["server"] }
|
||||
tower = { version = "0.5.2", default-features = false }
|
||||
tower-http = { version = "0.6.2", features = ["auth", "request-id", "trace"] }
|
||||
|
||||
|
||||
@@ -2,8 +2,10 @@
|
||||
[pageserver]
|
||||
listen_pg_addr = '127.0.0.1:64000'
|
||||
listen_http_addr = '127.0.0.1:9898'
|
||||
listen_grpc_addr = '127.0.0.1:51051'
|
||||
pg_auth_type = 'Trust'
|
||||
http_auth_type = 'Trust'
|
||||
grpc_auth_type = 'Trust'
|
||||
|
||||
[[safekeepers]]
|
||||
id = 1
|
||||
|
||||
@@ -4,8 +4,10 @@
|
||||
id=1
|
||||
listen_pg_addr = '127.0.0.1:64000'
|
||||
listen_http_addr = '127.0.0.1:9898'
|
||||
listen_grpc_addr = '127.0.0.1:51051'
|
||||
pg_auth_type = 'Trust'
|
||||
http_auth_type = 'Trust'
|
||||
grpc_auth_type = 'Trust'
|
||||
|
||||
[[safekeepers]]
|
||||
id = 1
|
||||
|
||||
@@ -32,6 +32,7 @@ use control_plane::storage_controller::{
|
||||
};
|
||||
use nix::fcntl::{Flock, FlockArg};
|
||||
use pageserver_api::config::{
|
||||
DEFAULT_GRPC_LISTEN_PORT as DEFAULT_PAGESERVER_GRPC_PORT,
|
||||
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
|
||||
DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT,
|
||||
};
|
||||
@@ -1007,13 +1008,16 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
|
||||
let pageserver_id = NodeId(DEFAULT_PAGESERVER_ID.0 + i as u64);
|
||||
let pg_port = DEFAULT_PAGESERVER_PG_PORT + i;
|
||||
let http_port = DEFAULT_PAGESERVER_HTTP_PORT + i;
|
||||
let grpc_port = DEFAULT_PAGESERVER_GRPC_PORT + i;
|
||||
NeonLocalInitPageserverConf {
|
||||
id: pageserver_id,
|
||||
listen_pg_addr: format!("127.0.0.1:{pg_port}"),
|
||||
listen_http_addr: format!("127.0.0.1:{http_port}"),
|
||||
listen_https_addr: None,
|
||||
listen_grpc_addr: Some(format!("127.0.0.1:{grpc_port}")),
|
||||
pg_auth_type: AuthType::Trust,
|
||||
http_auth_type: AuthType::Trust,
|
||||
grpc_auth_type: AuthType::Trust,
|
||||
other: Default::default(),
|
||||
// Typical developer machines use disks with slow fsync, and we don't care
|
||||
// about data integrity: disable disk syncs.
|
||||
|
||||
@@ -616,17 +616,19 @@ impl Endpoint {
|
||||
|
||||
/// Map safekeepers ids to the actual connection strings.
|
||||
fn build_safekeepers_connstrs(&self, sk_ids: Vec<NodeId>) -> Result<Vec<String>> {
|
||||
sk_ids
|
||||
.into_iter()
|
||||
.map(|node_id| {
|
||||
self.env
|
||||
let mut safekeeper_connstrings = Vec::new();
|
||||
if self.mode == ComputeMode::Primary {
|
||||
for sk_id in sk_ids {
|
||||
let sk = self
|
||||
.env
|
||||
.safekeepers
|
||||
.iter()
|
||||
.find(|node| node.id == node_id)
|
||||
.map(|node| format!("127.0.0.1:{}", node.get_compute_port()))
|
||||
.ok_or_else(|| anyhow!("safekeeer {node_id} does not exist"))
|
||||
})
|
||||
.collect::<Result<Vec<String>>>()
|
||||
.find(|node| node.id == sk_id)
|
||||
.ok_or_else(|| anyhow!("safekeeper {sk_id} does not exist"))?;
|
||||
safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.get_compute_port()));
|
||||
}
|
||||
}
|
||||
Ok(safekeeper_connstrings)
|
||||
}
|
||||
|
||||
/// Generate a JWT with the correct claims.
|
||||
|
||||
@@ -278,8 +278,10 @@ pub struct PageServerConf {
|
||||
pub listen_pg_addr: String,
|
||||
pub listen_http_addr: String,
|
||||
pub listen_https_addr: Option<String>,
|
||||
pub listen_grpc_addr: Option<String>,
|
||||
pub pg_auth_type: AuthType,
|
||||
pub http_auth_type: AuthType,
|
||||
pub grpc_auth_type: AuthType,
|
||||
pub no_sync: bool,
|
||||
}
|
||||
|
||||
@@ -290,8 +292,10 @@ impl Default for PageServerConf {
|
||||
listen_pg_addr: String::new(),
|
||||
listen_http_addr: String::new(),
|
||||
listen_https_addr: None,
|
||||
listen_grpc_addr: None,
|
||||
pg_auth_type: AuthType::Trust,
|
||||
http_auth_type: AuthType::Trust,
|
||||
grpc_auth_type: AuthType::Trust,
|
||||
no_sync: false,
|
||||
}
|
||||
}
|
||||
@@ -306,8 +310,10 @@ pub struct NeonLocalInitPageserverConf {
|
||||
pub listen_pg_addr: String,
|
||||
pub listen_http_addr: String,
|
||||
pub listen_https_addr: Option<String>,
|
||||
pub listen_grpc_addr: Option<String>,
|
||||
pub pg_auth_type: AuthType,
|
||||
pub http_auth_type: AuthType,
|
||||
pub grpc_auth_type: AuthType,
|
||||
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
|
||||
pub no_sync: bool,
|
||||
#[serde(flatten)]
|
||||
@@ -321,8 +327,10 @@ impl From<&NeonLocalInitPageserverConf> for PageServerConf {
|
||||
listen_pg_addr,
|
||||
listen_http_addr,
|
||||
listen_https_addr,
|
||||
listen_grpc_addr,
|
||||
pg_auth_type,
|
||||
http_auth_type,
|
||||
grpc_auth_type,
|
||||
no_sync,
|
||||
other: _,
|
||||
} = conf;
|
||||
@@ -331,7 +339,9 @@ impl From<&NeonLocalInitPageserverConf> for PageServerConf {
|
||||
listen_pg_addr: listen_pg_addr.clone(),
|
||||
listen_http_addr: listen_http_addr.clone(),
|
||||
listen_https_addr: listen_https_addr.clone(),
|
||||
listen_grpc_addr: listen_grpc_addr.clone(),
|
||||
pg_auth_type: *pg_auth_type,
|
||||
grpc_auth_type: *grpc_auth_type,
|
||||
http_auth_type: *http_auth_type,
|
||||
no_sync: *no_sync,
|
||||
}
|
||||
@@ -707,8 +717,10 @@ impl LocalEnv {
|
||||
listen_pg_addr: String,
|
||||
listen_http_addr: String,
|
||||
listen_https_addr: Option<String>,
|
||||
listen_grpc_addr: Option<String>,
|
||||
pg_auth_type: AuthType,
|
||||
http_auth_type: AuthType,
|
||||
grpc_auth_type: AuthType,
|
||||
#[serde(default)]
|
||||
no_sync: bool,
|
||||
}
|
||||
@@ -732,8 +744,10 @@ impl LocalEnv {
|
||||
listen_pg_addr,
|
||||
listen_http_addr,
|
||||
listen_https_addr,
|
||||
listen_grpc_addr,
|
||||
pg_auth_type,
|
||||
http_auth_type,
|
||||
grpc_auth_type,
|
||||
no_sync,
|
||||
} = config_toml;
|
||||
let IdentityTomlSubset {
|
||||
@@ -750,8 +764,10 @@ impl LocalEnv {
|
||||
listen_pg_addr,
|
||||
listen_http_addr,
|
||||
listen_https_addr,
|
||||
listen_grpc_addr,
|
||||
pg_auth_type,
|
||||
http_auth_type,
|
||||
grpc_auth_type,
|
||||
no_sync,
|
||||
};
|
||||
pageservers.push(conf);
|
||||
|
||||
@@ -129,7 +129,9 @@ impl PageServerNode {
|
||||
));
|
||||
}
|
||||
|
||||
if conf.http_auth_type != AuthType::Trust || conf.pg_auth_type != AuthType::Trust {
|
||||
if [conf.http_auth_type, conf.pg_auth_type, conf.grpc_auth_type]
|
||||
.contains(&AuthType::NeonJWT)
|
||||
{
|
||||
// Keys are generated in the toplevel repo dir, pageservers' workdirs
|
||||
// are one level below that, so refer to keys with ../
|
||||
overrides.push("auth_validation_public_key_path='../auth_public_key.pem'".to_owned());
|
||||
|
||||
@@ -8,6 +8,9 @@ pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000;
|
||||
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
|
||||
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898;
|
||||
pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}");
|
||||
// TODO: gRPC is disabled by default for now, but the port is used in neon_local.
|
||||
pub const DEFAULT_GRPC_LISTEN_PORT: u16 = 51051; // storage-broker already uses 50051
|
||||
pub const DEFAULT_GRPC_LISTEN_TLS: bool = false; // TODO: enable by default?
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::num::{NonZeroU64, NonZeroUsize};
|
||||
@@ -104,6 +107,8 @@ pub struct ConfigToml {
|
||||
pub listen_pg_addr: String,
|
||||
pub listen_http_addr: String,
|
||||
pub listen_https_addr: Option<String>,
|
||||
pub listen_grpc_addr: Option<String>,
|
||||
pub listen_grpc_tls: bool,
|
||||
pub ssl_key_file: Utf8PathBuf,
|
||||
pub ssl_cert_file: Utf8PathBuf,
|
||||
#[serde(with = "humantime_serde")]
|
||||
@@ -123,6 +128,7 @@ pub struct ConfigToml {
|
||||
pub http_auth_type: AuthType,
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
pub pg_auth_type: AuthType,
|
||||
pub grpc_auth_type: AuthType,
|
||||
pub auth_validation_public_key_path: Option<Utf8PathBuf>,
|
||||
pub remote_storage: Option<RemoteStorageConfig>,
|
||||
pub tenant_config: TenantConfigToml,
|
||||
@@ -588,6 +594,8 @@ impl Default for ConfigToml {
|
||||
listen_pg_addr: (DEFAULT_PG_LISTEN_ADDR.to_string()),
|
||||
listen_http_addr: (DEFAULT_HTTP_LISTEN_ADDR.to_string()),
|
||||
listen_https_addr: (None),
|
||||
listen_grpc_addr: None, // TODO: default to 127.0.0.1:51051
|
||||
listen_grpc_tls: DEFAULT_GRPC_LISTEN_TLS,
|
||||
ssl_key_file: Utf8PathBuf::from(DEFAULT_SSL_KEY_FILE),
|
||||
ssl_cert_file: Utf8PathBuf::from(DEFAULT_SSL_CERT_FILE),
|
||||
ssl_cert_reload_period: Duration::from_secs(60),
|
||||
@@ -604,6 +612,7 @@ impl Default for ConfigToml {
|
||||
pg_distrib_dir: None, // Utf8PathBuf::from("./pg_install"), // TODO: formely, this was std::env::current_dir()
|
||||
http_auth_type: (AuthType::Trust),
|
||||
pg_auth_type: (AuthType::Trust),
|
||||
grpc_auth_type: (AuthType::Trust),
|
||||
auth_validation_public_key_path: (None),
|
||||
remote_storage: None,
|
||||
broker_endpoint: (storage_broker::DEFAULT_ENDPOINT
|
||||
|
||||
@@ -439,7 +439,6 @@ pub fn empty_shmem() -> crate::bindings::WalproposerShmemState {
|
||||
currentClusterSize: crate::bindings::pg_atomic_uint64 { value: 0 },
|
||||
shard_ps_feedback: [empty_feedback; 128],
|
||||
num_shards: 0,
|
||||
replica_promote: false,
|
||||
min_ps_feedback: empty_feedback,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -43,6 +43,7 @@ nix.workspace = true
|
||||
num_cpus.workspace = true
|
||||
num-traits.workspace = true
|
||||
once_cell.workspace = true
|
||||
pageserver_page_api.workspace = true
|
||||
pin-project-lite.workspace = true
|
||||
postgres_backend.workspace = true
|
||||
postgres-protocol.workspace = true
|
||||
@@ -71,6 +72,8 @@ tokio-rustls.workspace = true
|
||||
tokio-stream.workspace = true
|
||||
tokio-util.workspace = true
|
||||
toml_edit = { workspace = true, features = [ "serde" ] }
|
||||
tonic.workspace = true
|
||||
tonic-reflection.workspace = true
|
||||
tracing.workspace = true
|
||||
tracing-utils.workspace = true
|
||||
url.workspace = true
|
||||
|
||||
@@ -388,23 +388,30 @@ fn start_pageserver(
|
||||
// We need to release the lock file only when the process exits.
|
||||
std::mem::forget(lock_file);
|
||||
|
||||
// Bind the HTTP and libpq ports early, so that if they are in use by some other
|
||||
// process, we error out early.
|
||||
let http_addr = &conf.listen_http_addr;
|
||||
info!("Starting pageserver http handler on {http_addr}");
|
||||
let http_listener = tcp_listener::bind(http_addr)?;
|
||||
// Bind the HTTP, libpq, and gRPC ports early, to error out if they are
|
||||
// already in use.
|
||||
info!(
|
||||
"Starting pageserver http handler on {} with auth {:#?}",
|
||||
conf.listen_http_addr, conf.http_auth_type
|
||||
);
|
||||
let http_listener = tcp_listener::bind(&conf.listen_http_addr)?;
|
||||
|
||||
let https_listener = match conf.listen_https_addr.as_ref() {
|
||||
Some(https_addr) => {
|
||||
info!("Starting pageserver https handler on {https_addr}");
|
||||
info!(
|
||||
"Starting pageserver https handler on {https_addr} with auth {:#?}",
|
||||
conf.http_auth_type
|
||||
);
|
||||
Some(tcp_listener::bind(https_addr)?)
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
let pg_addr = &conf.listen_pg_addr;
|
||||
info!("Starting pageserver pg protocol handler on {pg_addr}");
|
||||
let pageserver_listener = tcp_listener::bind(pg_addr)?;
|
||||
info!(
|
||||
"Starting pageserver pg protocol handler on {} with auth {:#?}",
|
||||
conf.listen_pg_addr, conf.pg_auth_type,
|
||||
);
|
||||
let pageserver_listener = tcp_listener::bind(&conf.listen_pg_addr)?;
|
||||
|
||||
// Enable SO_KEEPALIVE on the socket, to detect dead connections faster.
|
||||
// These are configured via net.ipv4.tcp_keepalive_* sysctls.
|
||||
@@ -413,6 +420,15 @@ fn start_pageserver(
|
||||
// support enabling keepalives while using the default OS sysctls.
|
||||
setsockopt(&pageserver_listener, sockopt::KeepAlive, &true)?;
|
||||
|
||||
let mut grpc_listener = None;
|
||||
if let Some(grpc_addr) = &conf.listen_grpc_addr {
|
||||
info!(
|
||||
"Starting pageserver gRPC handler on {grpc_addr} with auth {:#?}",
|
||||
conf.grpc_auth_type
|
||||
);
|
||||
grpc_listener = Some(tcp_listener::bind(grpc_addr).map_err(|e| anyhow!("{e}"))?);
|
||||
}
|
||||
|
||||
// Launch broker client
|
||||
// The storage_broker::connect call needs to happen inside a tokio runtime thread.
|
||||
let broker_client = WALRECEIVER_RUNTIME
|
||||
@@ -440,7 +456,8 @@ fn start_pageserver(
|
||||
// Initialize authentication for incoming connections
|
||||
let http_auth;
|
||||
let pg_auth;
|
||||
if conf.http_auth_type == AuthType::NeonJWT || conf.pg_auth_type == AuthType::NeonJWT {
|
||||
let grpc_auth;
|
||||
if [conf.http_auth_type, conf.pg_auth_type, conf.grpc_auth_type].contains(&AuthType::NeonJWT) {
|
||||
// unwrap is ok because check is performed when creating config, so path is set and exists
|
||||
let key_path = conf.auth_validation_public_key_path.as_ref().unwrap();
|
||||
info!("Loading public key(s) for verifying JWT tokens from {key_path:?}");
|
||||
@@ -448,22 +465,27 @@ fn start_pageserver(
|
||||
let jwt_auth = JwtAuth::from_key_path(key_path)?;
|
||||
let auth: Arc<SwappableJwtAuth> = Arc::new(SwappableJwtAuth::new(jwt_auth));
|
||||
|
||||
http_auth = match &conf.http_auth_type {
|
||||
http_auth = match conf.http_auth_type {
|
||||
AuthType::Trust => None,
|
||||
AuthType::NeonJWT => Some(auth.clone()),
|
||||
};
|
||||
pg_auth = match &conf.pg_auth_type {
|
||||
pg_auth = match conf.pg_auth_type {
|
||||
AuthType::Trust => None,
|
||||
AuthType::NeonJWT => Some(auth.clone()),
|
||||
};
|
||||
grpc_auth = match conf.grpc_auth_type {
|
||||
AuthType::Trust => None,
|
||||
AuthType::NeonJWT => Some(auth),
|
||||
};
|
||||
} else {
|
||||
http_auth = None;
|
||||
pg_auth = None;
|
||||
grpc_auth = None;
|
||||
}
|
||||
info!("Using auth for http API: {:#?}", conf.http_auth_type);
|
||||
info!("Using auth for pg connections: {:#?}", conf.pg_auth_type);
|
||||
|
||||
let tls_server_config = if conf.listen_https_addr.is_some() || conf.enable_tls_page_service_api
|
||||
let tls_server_config = if conf.listen_https_addr.is_some()
|
||||
|| conf.enable_tls_page_service_api
|
||||
|| conf.listen_grpc_tls
|
||||
{
|
||||
let resolver = BACKGROUND_RUNTIME.block_on(ReloadingCertificateResolver::new(
|
||||
"main",
|
||||
@@ -771,14 +793,33 @@ fn start_pageserver(
|
||||
tokio::net::TcpListener::from_std(pageserver_listener)
|
||||
.context("create tokio listener")?
|
||||
},
|
||||
if conf.enable_tls_page_service_api {
|
||||
tls_server_config
|
||||
} else {
|
||||
None
|
||||
},
|
||||
basebackup_cache,
|
||||
conf.enable_tls_page_service_api
|
||||
.then(|| tls_server_config.clone())
|
||||
.flatten(),
|
||||
basebackup_cache.clone(),
|
||||
);
|
||||
|
||||
// Spawn a Pageserver gRPC server task. It will spawn separate tasks for
|
||||
// each stream/request.
|
||||
//
|
||||
// TODO: this uses a separate Tokio runtime for the page service. If we want
|
||||
// other gRPC services, they will need their own port and runtime. Is this
|
||||
// necessary?
|
||||
let mut page_service_grpc = None;
|
||||
if let Some(grpc_listener) = grpc_listener {
|
||||
page_service_grpc = Some(page_service::spawn_grpc(
|
||||
conf,
|
||||
tenant_manager.clone(),
|
||||
grpc_auth,
|
||||
otel_guard.as_ref().map(|g| g.dispatch.clone()),
|
||||
grpc_listener,
|
||||
conf.listen_grpc_tls
|
||||
.then(|| tls_server_config.clone())
|
||||
.flatten(),
|
||||
basebackup_cache,
|
||||
)?);
|
||||
}
|
||||
|
||||
// All started up! Now just sit and wait for shutdown signal.
|
||||
BACKGROUND_RUNTIME.block_on(async move {
|
||||
let signal_token = CancellationToken::new();
|
||||
@@ -797,6 +838,7 @@ fn start_pageserver(
|
||||
http_endpoint_listener,
|
||||
https_endpoint_listener,
|
||||
page_service,
|
||||
page_service_grpc,
|
||||
consumption_metrics_tasks,
|
||||
disk_usage_eviction_task,
|
||||
&tenant_manager,
|
||||
|
||||
@@ -58,11 +58,18 @@ pub struct PageServerConf {
|
||||
pub listen_http_addr: String,
|
||||
/// Example: 127.0.0.1:9899
|
||||
pub listen_https_addr: Option<String>,
|
||||
/// If set, expose a gRPC API on this address.
|
||||
/// Example: 127.0.0.1:51051
|
||||
///
|
||||
/// EXPERIMENTAL: this protocol is unstable and under active development.
|
||||
pub listen_grpc_addr: Option<String>,
|
||||
/// If true, enable TLS for the gRPC server, using ssl_key_file and ssl_cert_file.
|
||||
pub listen_grpc_tls: bool,
|
||||
|
||||
/// Path to a file with certificate's private key for https API.
|
||||
/// Path to a file with certificate's private key for https and gRPC API.
|
||||
/// Default: server.key
|
||||
pub ssl_key_file: Utf8PathBuf,
|
||||
/// Path to a file with a X509 certificate for https API.
|
||||
/// Path to a file with a X509 certificate for https and gRPC API.
|
||||
/// Default: server.crt
|
||||
pub ssl_cert_file: Utf8PathBuf,
|
||||
/// Period to reload certificate and private key from files.
|
||||
@@ -100,6 +107,8 @@ pub struct PageServerConf {
|
||||
pub http_auth_type: AuthType,
|
||||
/// authentication method for libpq connections from compute
|
||||
pub pg_auth_type: AuthType,
|
||||
/// authentication method for gRPC connections from compute
|
||||
pub grpc_auth_type: AuthType,
|
||||
/// Path to a file or directory containing public key(s) for verifying JWT tokens.
|
||||
/// Used for both mgmt and compute auth, if enabled.
|
||||
pub auth_validation_public_key_path: Option<Utf8PathBuf>,
|
||||
@@ -221,7 +230,7 @@ pub struct PageServerConf {
|
||||
|
||||
pub tracing: Option<pageserver_api::config::Tracing>,
|
||||
|
||||
/// Enable TLS in page service API.
|
||||
/// Enable TLS in the libpq page service API.
|
||||
/// Does not force TLS: the client negotiates TLS usage during the handshake.
|
||||
/// Uses key and certificate from ssl_key_file/ssl_cert_file.
|
||||
pub enable_tls_page_service_api: bool,
|
||||
@@ -355,6 +364,8 @@ impl PageServerConf {
|
||||
listen_pg_addr,
|
||||
listen_http_addr,
|
||||
listen_https_addr,
|
||||
listen_grpc_addr,
|
||||
listen_grpc_tls,
|
||||
ssl_key_file,
|
||||
ssl_cert_file,
|
||||
ssl_cert_reload_period,
|
||||
@@ -369,6 +380,7 @@ impl PageServerConf {
|
||||
pg_distrib_dir,
|
||||
http_auth_type,
|
||||
pg_auth_type,
|
||||
grpc_auth_type,
|
||||
auth_validation_public_key_path,
|
||||
remote_storage,
|
||||
broker_endpoint,
|
||||
@@ -423,6 +435,8 @@ impl PageServerConf {
|
||||
listen_pg_addr,
|
||||
listen_http_addr,
|
||||
listen_https_addr,
|
||||
listen_grpc_addr,
|
||||
listen_grpc_tls,
|
||||
ssl_key_file,
|
||||
ssl_cert_file,
|
||||
ssl_cert_reload_period,
|
||||
@@ -435,6 +449,7 @@ impl PageServerConf {
|
||||
max_file_descriptors,
|
||||
http_auth_type,
|
||||
pg_auth_type,
|
||||
grpc_auth_type,
|
||||
auth_validation_public_key_path,
|
||||
remote_storage_config: remote_storage,
|
||||
broker_endpoint,
|
||||
@@ -531,7 +546,9 @@ impl PageServerConf {
|
||||
// custom validation code that covers more than one field in isolation
|
||||
// ------------------------------------------------------------
|
||||
|
||||
if conf.http_auth_type == AuthType::NeonJWT || conf.pg_auth_type == AuthType::NeonJWT {
|
||||
if [conf.http_auth_type, conf.pg_auth_type, conf.grpc_auth_type]
|
||||
.contains(&AuthType::NeonJWT)
|
||||
{
|
||||
let auth_validation_public_key_path = conf
|
||||
.auth_validation_public_key_path
|
||||
.get_or_insert_with(|| workdir.join("auth_public_key.pem"));
|
||||
|
||||
@@ -84,6 +84,7 @@ pub async fn shutdown_pageserver(
|
||||
http_listener: HttpEndpointListener,
|
||||
https_listener: Option<HttpsEndpointListener>,
|
||||
page_service: page_service::Listener,
|
||||
grpc_task: Option<CancellableTask>,
|
||||
consumption_metrics_worker: ConsumptionMetricsTasks,
|
||||
disk_usage_eviction_task: Option<DiskUsageEvictionTask>,
|
||||
tenant_manager: &TenantManager,
|
||||
@@ -177,6 +178,16 @@ pub async fn shutdown_pageserver(
|
||||
)
|
||||
.await;
|
||||
|
||||
// Shut down the gRPC server task, including request handlers.
|
||||
if let Some(grpc_task) = grpc_task {
|
||||
timed(
|
||||
grpc_task.shutdown(),
|
||||
"shutdown gRPC PageRequestHandler",
|
||||
Duration::from_secs(3),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// Shut down all the tenants. This flushes everything to disk and kills
|
||||
// the checkpoint and GC tasks.
|
||||
timed(
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
use std::borrow::Cow;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::os::fd::AsRawFd;
|
||||
use std::pin::Pin;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
@@ -12,9 +13,10 @@ use std::{io, str};
|
||||
use anyhow::{Context, bail};
|
||||
use async_compression::tokio::write::GzipEncoder;
|
||||
use bytes::Buf;
|
||||
use futures::FutureExt;
|
||||
use futures::{FutureExt, Stream, StreamExt as _};
|
||||
use itertools::Itertools;
|
||||
use jsonwebtoken::TokenData;
|
||||
use nix::sys::socket::{setsockopt, sockopt};
|
||||
use once_cell::sync::OnceCell;
|
||||
use pageserver_api::config::{
|
||||
GetVectoredConcurrentIo, PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
|
||||
@@ -30,6 +32,7 @@ use pageserver_api::models::{
|
||||
};
|
||||
use pageserver_api::reltag::SlruKind;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_page_api::proto;
|
||||
use postgres_backend::{
|
||||
AuthType, PostgresBackend, PostgresBackendReader, QueryError, is_expected_io_error,
|
||||
};
|
||||
@@ -40,6 +43,8 @@ use pq_proto::{BeMessage, FeMessage, FeStartupPacket, RowDescriptor};
|
||||
use strum_macros::IntoStaticStr;
|
||||
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufWriter};
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_rustls::TlsAcceptor;
|
||||
use tokio_stream::wrappers::TcpListenerStream;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::auth::{Claims, Scope, SwappableJwtAuth};
|
||||
@@ -51,9 +56,8 @@ use utils::simple_rcu::RcuReadGuard;
|
||||
use utils::sync::gate::{Gate, GateGuard};
|
||||
use utils::sync::spsc_fold;
|
||||
|
||||
use crate::PERF_TRACE_TARGET;
|
||||
use crate::auth::check_permission;
|
||||
use crate::basebackup::BasebackupError;
|
||||
use crate::basebackup::{self, BasebackupError};
|
||||
use crate::basebackup_cache::BasebackupCache;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{
|
||||
@@ -68,14 +72,14 @@ use crate::span::{
|
||||
debug_assert_current_span_has_tenant_and_timeline_id,
|
||||
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
|
||||
};
|
||||
use crate::task_mgr::{self, COMPUTE_REQUEST_RUNTIME, TaskKind};
|
||||
use crate::task_mgr::{COMPUTE_REQUEST_RUNTIME, TaskKind, exit_on_panic_or_error};
|
||||
use crate::tenant::mgr::{
|
||||
GetActiveTenantError, GetTenantError, ShardResolveResult, ShardSelector, TenantManager,
|
||||
};
|
||||
use crate::tenant::storage_layer::IoConcurrency;
|
||||
use crate::tenant::timeline::{self, WaitLsnError};
|
||||
use crate::tenant::{GetTimelineError, PageReconstructError, Timeline};
|
||||
use crate::{basebackup, timed_after_cancellation};
|
||||
use crate::{CancellableTask, PERF_TRACE_TARGET, timed_after_cancellation};
|
||||
|
||||
/// How long we may wait for a [`crate::tenant::mgr::TenantSlot::InProgress`]` and/or a [`crate::tenant::TenantShard`] which
|
||||
/// is not yet in state [`TenantState::Active`].
|
||||
@@ -86,6 +90,25 @@ const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
|
||||
/// Threshold at which to log slow GetPage requests.
|
||||
const LOG_SLOW_GETPAGE_THRESHOLD: Duration = Duration::from_secs(30);
|
||||
|
||||
/// Whether to enable TCP keepalives for gRPC connections. The interval and
|
||||
/// timeouts are configured via sysctl. This detects dead connections sooner.
|
||||
const GRPC_TCP_KEEPALIVE: bool = true;
|
||||
|
||||
/// Whether to enable TCP nodelay for gRPC connections. This disables Nagle's
|
||||
/// algorithm, which can cause latency spikes for small messages.
|
||||
const GRPC_TCP_NODELAY: bool = true;
|
||||
|
||||
/// The interval between HTTP2 keepalive pings. This allows shutting down server
|
||||
/// tasks when clients are unresponsive.
|
||||
const GRPC_HTTP2_KEEPALIVE_INTERVAL: Duration = Duration::from_secs(30);
|
||||
|
||||
/// The timeout for HTTP2 keepalive pings. Should be <= GRPC_KEEPALIVE_INTERVAL.
|
||||
const GRPC_HTTP2_KEEPALIVE_TIMEOUT: Duration = Duration::from_secs(20);
|
||||
|
||||
/// Number of concurrent gRPC streams per TCP connection. We expect something
|
||||
/// like 8 GetPage streams per connections, plus any unary requests.
|
||||
const GRPC_MAX_CONCURRENT_STREAMS: u32 = 256;
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
pub struct Listener {
|
||||
@@ -119,7 +142,7 @@ pub fn spawn(
|
||||
// accept connections.)
|
||||
DownloadBehavior::Error,
|
||||
);
|
||||
let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
|
||||
let task = COMPUTE_REQUEST_RUNTIME.spawn(exit_on_panic_or_error(
|
||||
"libpq listener",
|
||||
libpq_listener_main(
|
||||
conf,
|
||||
@@ -140,6 +163,109 @@ pub fn spawn(
|
||||
Listener { cancel, task }
|
||||
}
|
||||
|
||||
/// Spawns a gRPC server for the page service.
|
||||
pub fn spawn_grpc(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
auth: Option<Arc<SwappableJwtAuth>>,
|
||||
perf_trace_dispatch: Option<Dispatch>,
|
||||
listener: std::net::TcpListener,
|
||||
tls_config: Option<Arc<rustls::ServerConfig>>,
|
||||
basebackup_cache: Arc<BasebackupCache>,
|
||||
) -> anyhow::Result<CancellableTask> {
|
||||
// Use the compute runtime.
|
||||
let _runtime = COMPUTE_REQUEST_RUNTIME.enter();
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
let ctx = RequestContextBuilder::new(TaskKind::PageRequestHandler)
|
||||
.download_behavior(DownloadBehavior::Download)
|
||||
.perf_span_dispatch(perf_trace_dispatch)
|
||||
.detached_child();
|
||||
let gate = Gate::default();
|
||||
|
||||
// Set up the gRPC server.
|
||||
//
|
||||
// NB: does not respect TCP settings, since we configure the socket manually.
|
||||
// TODO: consider tuning window sizes.
|
||||
// TODO: wire up tracing.
|
||||
let mut server = tonic::transport::Server::builder()
|
||||
.http2_keepalive_interval(Some(GRPC_HTTP2_KEEPALIVE_INTERVAL))
|
||||
.http2_keepalive_timeout(Some(GRPC_HTTP2_KEEPALIVE_TIMEOUT))
|
||||
.max_concurrent_streams(Some(GRPC_MAX_CONCURRENT_STREAMS));
|
||||
|
||||
// Main page service.
|
||||
let page_service = proto::PageServiceServer::new(PageServerHandler::new(
|
||||
tenant_manager,
|
||||
auth,
|
||||
PageServicePipeliningConfig::Serial, // TODO: unused with gRPC
|
||||
conf.get_vectored_concurrent_io,
|
||||
ConnectionPerfSpanFields::default(),
|
||||
basebackup_cache,
|
||||
ctx,
|
||||
cancel.clone(),
|
||||
gate.enter().expect("just created"),
|
||||
));
|
||||
let server = server.add_service(page_service);
|
||||
|
||||
// Reflection service for use with e.g. grpcurl.
|
||||
let reflection_service = tonic_reflection::server::Builder::configure()
|
||||
.register_encoded_file_descriptor_set(proto::FILE_DESCRIPTOR_SET)
|
||||
.build_v1()?;
|
||||
let server = server.add_service(reflection_service);
|
||||
|
||||
// Set up the TCP socket. We take a preconfigured TcpListener to bind the port early.
|
||||
listener.set_nonblocking(true)?;
|
||||
setsockopt(&listener, sockopt::KeepAlive, &GRPC_TCP_KEEPALIVE)?;
|
||||
let listener = tokio::net::TcpListener::from_std(listener)?;
|
||||
|
||||
// Build the serve future.
|
||||
let cancel_serve = cancel.clone();
|
||||
let serve = async move {
|
||||
// Accept TCP connections.
|
||||
let tcp_conns = TcpListenerStream::new(listener).map(|result| {
|
||||
let tcp_conn = result.inspect_err(|err| error!("TCP accept failed: {err}"))?;
|
||||
tcp_conn.set_nodelay(GRPC_TCP_NODELAY).inspect_err(|err| {
|
||||
error!("TCP nodelay failed: {err}");
|
||||
})?;
|
||||
Ok(tcp_conn)
|
||||
});
|
||||
|
||||
if let Some(tls_config) = tls_config {
|
||||
// If TLS is enabled, decrypt the TCP streams before passing them to the server.
|
||||
let tls_acceptor = TlsAcceptor::from(tls_config);
|
||||
let tls_conns = async_stream::stream! {
|
||||
for await result in tcp_conns {
|
||||
match result {
|
||||
Ok(tcp_conn) => yield tls_acceptor
|
||||
.accept(tcp_conn)
|
||||
.await
|
||||
.inspect_err(|err| error!("TLS handshake failed: {err}")),
|
||||
Err(err) => yield Err(err),
|
||||
}
|
||||
}
|
||||
};
|
||||
server
|
||||
.serve_with_incoming_shutdown(tls_conns, cancel_serve.cancelled())
|
||||
.await?;
|
||||
} else {
|
||||
// Otherwise, just pass the plaintext TCP streams.
|
||||
server
|
||||
.serve_with_incoming_shutdown(tcp_conns, cancel_serve.cancelled())
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Clean shutdown, wait for tasks to finish.
|
||||
// TODO: revisit shutdown logic once page service is implemented.
|
||||
gate.close().await;
|
||||
anyhow::Ok(())
|
||||
};
|
||||
|
||||
// Spawn a task to run the serve future.
|
||||
let task = tokio::spawn(exit_on_panic_or_error("grpc listener", serve));
|
||||
|
||||
Ok(CancellableTask { task, cancel })
|
||||
}
|
||||
|
||||
impl Listener {
|
||||
pub async fn stop_accepting(self) -> Connections {
|
||||
self.cancel.cancel();
|
||||
@@ -259,7 +385,7 @@ type ConnectionHandlerResult = anyhow::Result<()>;
|
||||
|
||||
/// Perf root spans start at the per-request level, after shard routing.
|
||||
/// This struct carries connection-level information to the root perf span definition.
|
||||
#[derive(Clone)]
|
||||
#[derive(Clone, Default)]
|
||||
struct ConnectionPerfSpanFields {
|
||||
peer_addr: String,
|
||||
application_name: Option<String>,
|
||||
@@ -377,6 +503,11 @@ async fn page_service_conn_main(
|
||||
}
|
||||
}
|
||||
|
||||
/// Page service connection handler.
|
||||
///
|
||||
/// TODO: for gRPC, this will be shared by all requests from all connections.
|
||||
/// Decompose it into global state and per-connection/request state, and make
|
||||
/// libpq-specific options (e.g. pipelining) separate.
|
||||
struct PageServerHandler {
|
||||
auth: Option<Arc<SwappableJwtAuth>>,
|
||||
claims: Option<Claims>,
|
||||
@@ -3117,6 +3248,60 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Implements the page service over gRPC.
|
||||
///
|
||||
/// TODO: not yet implemented, all methods return unimplemented.
|
||||
#[tonic::async_trait]
|
||||
impl proto::PageService for PageServerHandler {
|
||||
type GetBaseBackupStream = Pin<
|
||||
Box<dyn Stream<Item = Result<proto::GetBaseBackupResponseChunk, tonic::Status>> + Send>,
|
||||
>;
|
||||
type GetPagesStream =
|
||||
Pin<Box<dyn Stream<Item = Result<proto::GetPageResponse, tonic::Status>> + Send>>;
|
||||
|
||||
async fn check_rel_exists(
|
||||
&self,
|
||||
_: tonic::Request<proto::CheckRelExistsRequest>,
|
||||
) -> Result<tonic::Response<proto::CheckRelExistsResponse>, tonic::Status> {
|
||||
Err(tonic::Status::unimplemented("not implemented"))
|
||||
}
|
||||
|
||||
async fn get_base_backup(
|
||||
&self,
|
||||
_: tonic::Request<proto::GetBaseBackupRequest>,
|
||||
) -> Result<tonic::Response<Self::GetBaseBackupStream>, tonic::Status> {
|
||||
Err(tonic::Status::unimplemented("not implemented"))
|
||||
}
|
||||
|
||||
async fn get_db_size(
|
||||
&self,
|
||||
_: tonic::Request<proto::GetDbSizeRequest>,
|
||||
) -> Result<tonic::Response<proto::GetDbSizeResponse>, tonic::Status> {
|
||||
Err(tonic::Status::unimplemented("not implemented"))
|
||||
}
|
||||
|
||||
async fn get_pages(
|
||||
&self,
|
||||
_: tonic::Request<tonic::Streaming<proto::GetPageRequest>>,
|
||||
) -> Result<tonic::Response<Self::GetPagesStream>, tonic::Status> {
|
||||
Err(tonic::Status::unimplemented("not implemented"))
|
||||
}
|
||||
|
||||
async fn get_rel_size(
|
||||
&self,
|
||||
_: tonic::Request<proto::GetRelSizeRequest>,
|
||||
) -> Result<tonic::Response<proto::GetRelSizeResponse>, tonic::Status> {
|
||||
Err(tonic::Status::unimplemented("not implemented"))
|
||||
}
|
||||
|
||||
async fn get_slru_segment(
|
||||
&self,
|
||||
_: tonic::Request<proto::GetSlruSegmentRequest>,
|
||||
) -> Result<tonic::Response<proto::GetSlruSegmentResponse>, tonic::Status> {
|
||||
Err(tonic::Status::unimplemented("not implemented"))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetActiveTenantError> for QueryError {
|
||||
fn from(e: GetActiveTenantError) -> Self {
|
||||
match e {
|
||||
|
||||
@@ -276,9 +276,10 @@ pub enum TaskKind {
|
||||
// HTTP endpoint listener.
|
||||
HttpEndpointListener,
|
||||
|
||||
// Task that handles a single connection. A PageRequestHandler task
|
||||
// starts detached from any particular tenant or timeline, but it can be
|
||||
// associated with one later, after receiving a command from the client.
|
||||
/// Task that handles a single page service connection. A PageRequestHandler
|
||||
/// task starts detached from any particular tenant or timeline, but it can
|
||||
/// be associated with one later, after receiving a command from the client.
|
||||
/// Also used for the gRPC page service API, including the main server task.
|
||||
PageRequestHandler,
|
||||
|
||||
/// Manages the WAL receiver connection for one timeline.
|
||||
|
||||
@@ -69,7 +69,6 @@ struct NeonWALReader
|
||||
WALSegmentContext segcxt;
|
||||
WALOpenSegment seg;
|
||||
int wre_errno;
|
||||
TimeLineID local_active_tlid;
|
||||
/* Explains failure to read, static for simplicity. */
|
||||
char err_msg[NEON_WALREADER_ERR_MSG_LEN];
|
||||
|
||||
@@ -107,8 +106,7 @@ struct NeonWALReader
|
||||
|
||||
/* palloc and initialize NeonWALReader */
|
||||
NeonWALReader *
|
||||
NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn,
|
||||
char *log_prefix, TimeLineID tlid)
|
||||
NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, char *log_prefix)
|
||||
{
|
||||
NeonWALReader *reader;
|
||||
|
||||
@@ -120,7 +118,6 @@ NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn,
|
||||
MemoryContextAllocZero(TopMemoryContext, sizeof(NeonWALReader));
|
||||
|
||||
reader->available_lsn = available_lsn;
|
||||
reader->local_active_tlid = tlid;
|
||||
reader->seg.ws_file = -1;
|
||||
reader->seg.ws_segno = 0;
|
||||
reader->seg.ws_tli = 0;
|
||||
@@ -580,17 +577,6 @@ NeonWALReaderIsRemConnEstablished(NeonWALReader *state)
|
||||
return state->rem_state == RS_ESTABLISHED;
|
||||
}
|
||||
|
||||
/*
|
||||
* Whether remote connection is established. Once this is done, until successful
|
||||
* local read or error socket is stable and user can update socket events
|
||||
* instead of readding it each time.
|
||||
*/
|
||||
TimeLineID
|
||||
NeonWALReaderLocalActiveTimeLineID(NeonWALReader *state)
|
||||
{
|
||||
return state->local_active_tlid;
|
||||
}
|
||||
|
||||
/*
|
||||
* Returns events user should wait on connection socket or 0 if remote
|
||||
* connection is not active.
|
||||
|
||||
@@ -19,12 +19,9 @@ typedef enum
|
||||
NEON_WALREAD_ERROR,
|
||||
} NeonWALReadResult;
|
||||
|
||||
extern NeonWALReader *NeonWALReaderAllocate(int wal_segment_size,
|
||||
XLogRecPtr available_lsn,
|
||||
char *log_prefix, TimeLineID tlid);
|
||||
extern NeonWALReader *NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, char *log_prefix);
|
||||
extern void NeonWALReaderFree(NeonWALReader *state);
|
||||
extern void NeonWALReaderResetRemote(NeonWALReader *state);
|
||||
extern TimeLineID NeonWALReaderLocalActiveTimeLineID(NeonWALReader *state);
|
||||
extern NeonWALReadResult NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli);
|
||||
extern pgsocket NeonWALReaderSocket(NeonWALReader *state);
|
||||
extern uint32 NeonWALReaderEvents(NeonWALReader *state);
|
||||
|
||||
@@ -98,7 +98,6 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
|
||||
wp = palloc0(sizeof(WalProposer));
|
||||
wp->config = config;
|
||||
wp->api = api;
|
||||
wp->localTimeLineID = config->pgTimeline;
|
||||
wp->state = WPS_COLLECTING_TERMS;
|
||||
wp->mconf.generation = INVALID_GENERATION;
|
||||
wp->mconf.members.len = 0;
|
||||
@@ -1380,7 +1379,7 @@ ProcessPropStartPos(WalProposer *wp)
|
||||
* we must bail out, as clog and other non rel data is inconsistent.
|
||||
*/
|
||||
walprop_shared = wp->api.get_shmem_state(wp);
|
||||
if (!wp->config->syncSafekeepers && !walprop_shared->replica_promote)
|
||||
if (!wp->config->syncSafekeepers)
|
||||
{
|
||||
/*
|
||||
* Basebackup LSN always points to the beginning of the record (not
|
||||
|
||||
@@ -391,7 +391,6 @@ typedef struct WalproposerShmemState
|
||||
/* last feedback from each shard */
|
||||
PageserverFeedback shard_ps_feedback[MAX_SHARDS];
|
||||
int num_shards;
|
||||
bool replica_promote;
|
||||
|
||||
/* aggregated feedback with min LSNs across shards */
|
||||
PageserverFeedback min_ps_feedback;
|
||||
@@ -806,8 +805,6 @@ typedef struct WalProposer
|
||||
|
||||
/* WAL has been generated up to this point */
|
||||
XLogRecPtr availableLsn;
|
||||
/* Current local TimeLineId in use */
|
||||
TimeLineID localTimeLineID;
|
||||
|
||||
/* cached GetAcknowledgedByQuorumWALPosition result */
|
||||
XLogRecPtr commitLsn;
|
||||
|
||||
@@ -35,7 +35,6 @@
|
||||
#include "storage/proc.h"
|
||||
#include "storage/ipc.h"
|
||||
#include "storage/lwlock.h"
|
||||
#include "storage/pg_shmem.h"
|
||||
#include "storage/shmem.h"
|
||||
#include "storage/spin.h"
|
||||
#include "tcop/tcopprot.h"
|
||||
@@ -158,23 +157,12 @@ WalProposerMain(Datum main_arg)
|
||||
{
|
||||
WalProposer *wp;
|
||||
|
||||
if (*wal_acceptors_list == '\0')
|
||||
{
|
||||
wpg_log(WARNING, "Safekeepers list is empty");
|
||||
return;
|
||||
}
|
||||
|
||||
init_walprop_config(false);
|
||||
walprop_pg_init_bgworker();
|
||||
am_walproposer = true;
|
||||
walprop_pg_load_libpqwalreceiver();
|
||||
|
||||
wp = WalProposerCreate(&walprop_config, walprop_pg);
|
||||
#if PG_MAJORVERSION_NUM < 15
|
||||
wp->localTimeLineID = ThisTimeLineID;
|
||||
#else
|
||||
wp->localTimeLineID = GetWALInsertionTimeLine();
|
||||
#endif
|
||||
wp->last_reconnect_attempt = walprop_pg_get_current_timestamp(wp);
|
||||
|
||||
walprop_pg_init_walsender();
|
||||
@@ -306,15 +294,16 @@ safekeepers_cmp(char *old, char *new)
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* GUC assign_hook for neon.safekeepers. Restarts walproposer through FATAL if
|
||||
* the list changed.
|
||||
*/
|
||||
static void
|
||||
assign_neon_safekeepers(const char *newval, void *extra)
|
||||
{
|
||||
char *newval_copy;
|
||||
char *oldval;
|
||||
|
||||
if (newval && *newval != '\0' && UsedShmemSegAddr && walprop_shared && RecoveryInProgress())
|
||||
walprop_shared->replica_promote = true;
|
||||
|
||||
if (!am_walproposer)
|
||||
return;
|
||||
|
||||
@@ -511,6 +500,10 @@ walprop_register_bgworker(void)
|
||||
{
|
||||
BackgroundWorker bgw;
|
||||
|
||||
/* If no wal acceptors are specified, don't start the background worker. */
|
||||
if (*wal_acceptors_list == '\0')
|
||||
return;
|
||||
|
||||
memset(&bgw, 0, sizeof(bgw));
|
||||
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
|
||||
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
|
||||
@@ -1503,10 +1496,7 @@ walprop_pg_wal_reader_allocate(Safekeeper *sk)
|
||||
|
||||
snprintf(log_prefix, sizeof(log_prefix), WP_LOG_PREFIX "sk %s:%s nwr: ", sk->host, sk->port);
|
||||
Assert(!sk->xlogreader);
|
||||
/* note that WalProposer shouldn't access safekeepers when active */
|
||||
sk->xlogreader = NeonWALReaderAllocate(wal_segment_size,
|
||||
sk->wp->propTermStartLsn, log_prefix,
|
||||
sk->wp->localTimeLineID);
|
||||
sk->xlogreader = NeonWALReaderAllocate(wal_segment_size, sk->wp->propTermStartLsn, log_prefix);
|
||||
if (sk->xlogreader == NULL)
|
||||
wpg_log(FATAL, "failed to allocate xlog reader");
|
||||
}
|
||||
@@ -1520,7 +1510,7 @@ walprop_pg_wal_read(Safekeeper *sk, char *buf, XLogRecPtr startptr, Size count,
|
||||
buf,
|
||||
startptr,
|
||||
count,
|
||||
sk->wp->localTimeLineID);
|
||||
walprop_pg_get_timeline_id());
|
||||
|
||||
if (res == NEON_WALREAD_SUCCESS)
|
||||
{
|
||||
|
||||
@@ -68,7 +68,8 @@ NeonWALReadWaitForWAL(XLogRecPtr loc)
|
||||
}
|
||||
|
||||
static int
|
||||
NeonWALPageRead(XLogReaderState *xlogreader,
|
||||
NeonWALPageRead(
|
||||
XLogReaderState *xlogreader,
|
||||
XLogRecPtr targetPagePtr,
|
||||
int reqLen,
|
||||
XLogRecPtr targetRecPtr,
|
||||
@@ -105,11 +106,12 @@ NeonWALPageRead(XLogReaderState *xlogreader,
|
||||
|
||||
for (;;)
|
||||
{
|
||||
NeonWALReadResult res = NeonWALRead(wal_reader,
|
||||
NeonWALReadResult res = NeonWALRead(
|
||||
wal_reader,
|
||||
readBuf,
|
||||
targetPagePtr,
|
||||
count,
|
||||
NeonWALReaderLocalActiveTimeLineID(wal_reader));
|
||||
walprop_pg_get_timeline_id());
|
||||
|
||||
if (res == NEON_WALREAD_SUCCESS)
|
||||
{
|
||||
@@ -200,8 +202,7 @@ NeonOnDemandXLogReaderRoutines(XLogReaderRoutine *xlr)
|
||||
{
|
||||
elog(ERROR, "unable to start walsender when basebackupLsn is 0");
|
||||
}
|
||||
wal_reader = NeonWALReaderAllocate(wal_segment_size, basebackupLsn,
|
||||
"[walsender] ", 1);
|
||||
wal_reader = NeonWALReaderAllocate(wal_segment_size, basebackupLsn, "[walsender] ");
|
||||
}
|
||||
xlr->page_read = NeonWALPageRead;
|
||||
xlr->segment_open = NeonWALReadSegmentOpen;
|
||||
|
||||
@@ -1224,6 +1224,7 @@ class NeonEnv:
|
||||
# Create config for pageserver
|
||||
http_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
|
||||
pg_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
|
||||
grpc_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
|
||||
for ps_id in range(
|
||||
self.BASE_PAGESERVER_ID, self.BASE_PAGESERVER_ID + config.num_pageservers
|
||||
):
|
||||
@@ -1250,6 +1251,7 @@ class NeonEnv:
|
||||
else None,
|
||||
"pg_auth_type": pg_auth_type,
|
||||
"http_auth_type": http_auth_type,
|
||||
"grpc_auth_type": grpc_auth_type,
|
||||
"availability_zone": availability_zone,
|
||||
# Disable pageserver disk syncs in tests: when running tests concurrently, this avoids
|
||||
# the pageserver taking a long time to start up due to syncfs flushing other tests' data
|
||||
@@ -4658,7 +4660,7 @@ class EndpointFactory:
|
||||
origin: Endpoint,
|
||||
endpoint_id: str | None = None,
|
||||
config_lines: list[str] | None = None,
|
||||
) -> Endpoint:
|
||||
):
|
||||
branch_name = origin.branch_name
|
||||
assert origin in self.endpoints
|
||||
assert branch_name is not None
|
||||
@@ -4677,7 +4679,7 @@ class EndpointFactory:
|
||||
origin: Endpoint,
|
||||
endpoint_id: str | None = None,
|
||||
config_lines: list[str] | None = None,
|
||||
) -> Endpoint:
|
||||
):
|
||||
branch_name = origin.branch_name
|
||||
assert origin in self.endpoints
|
||||
assert branch_name is not None
|
||||
|
||||
@@ -74,9 +74,8 @@ def test_hot_standby(neon_simple_env: NeonEnv):
|
||||
for query in queries:
|
||||
with s_con.cursor() as secondary_cursor:
|
||||
secondary_cursor.execute(query)
|
||||
res = secondary_cursor.fetchone()
|
||||
assert res is not None
|
||||
response = res
|
||||
response = secondary_cursor.fetchone()
|
||||
assert response is not None
|
||||
assert response == responses[query]
|
||||
|
||||
# Check for corrupted WAL messages which might otherwise go unnoticed if
|
||||
@@ -165,7 +164,7 @@ def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder, pause_apply: bool):
|
||||
|
||||
s_cur.execute("SELECT COUNT(*) FROM test")
|
||||
res = s_cur.fetchone()
|
||||
assert res == (10000,)
|
||||
assert res[0] == 10000
|
||||
|
||||
# Clear the cache in the standby, so that when we
|
||||
# re-execute the query, it will make GetPage
|
||||
@@ -196,7 +195,7 @@ def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder, pause_apply: bool):
|
||||
s_cur.execute("SELECT COUNT(*) FROM test")
|
||||
log_replica_lag(primary, secondary)
|
||||
res = s_cur.fetchone()
|
||||
assert res == (10000,)
|
||||
assert res[0] == 10000
|
||||
|
||||
|
||||
def run_pgbench(connstr: str, pg_bin: PgBin):
|
||||
|
||||
@@ -1,129 +0,0 @@
|
||||
"""
|
||||
File with secondary->primary promotion testing.
|
||||
|
||||
This far, only contains a test that we don't break and that the data is persisted.
|
||||
"""
|
||||
|
||||
import psycopg2
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import Endpoint, NeonEnv, wait_for_last_flush_lsn, wait_replica_caughtup
|
||||
from fixtures.pg_version import PgVersion
|
||||
from pytest import raises
|
||||
|
||||
|
||||
def test_replica_promotes(neon_simple_env: NeonEnv, pg_version: PgVersion):
|
||||
"""
|
||||
Test that a replica safely promotes, and can commit data updates which
|
||||
show up when the primary boots up after the promoted secondary endpoint
|
||||
shut down.
|
||||
"""
|
||||
|
||||
# Initialize the primary, a test table, and a helper function to create lots
|
||||
# of subtransactions.
|
||||
env: NeonEnv = neon_simple_env
|
||||
primary: Endpoint = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
|
||||
secondary: Endpoint = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
|
||||
|
||||
with primary.connect() as primary_conn:
|
||||
primary_cur = primary_conn.cursor()
|
||||
primary_cur.execute(
|
||||
"create table t(pk bigint GENERATED ALWAYS AS IDENTITY, payload integer)"
|
||||
)
|
||||
primary_cur.execute("INSERT INTO t(payload) SELECT generate_series(1, 100)")
|
||||
primary_cur.execute(
|
||||
"""
|
||||
SELECT pg_current_wal_insert_lsn(),
|
||||
pg_current_wal_lsn(),
|
||||
pg_current_wal_flush_lsn()
|
||||
"""
|
||||
)
|
||||
log.info(f"Primary: Current LSN after workload is {primary_cur.fetchone()}")
|
||||
|
||||
wait_replica_caughtup(primary, secondary)
|
||||
|
||||
with secondary.connect() as secondary_conn:
|
||||
secondary_cur = secondary_conn.cursor()
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
|
||||
assert secondary_cur.fetchone() == (100,)
|
||||
|
||||
with raises(psycopg2.Error):
|
||||
secondary_cur.execute("INSERT INTO t (payload) SELECT generate_series(101, 200)")
|
||||
secondary_conn.commit()
|
||||
|
||||
secondary_conn.rollback()
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (100,)
|
||||
|
||||
primary.stop_and_destroy(mode="immediate")
|
||||
|
||||
# Reconnect to the secondary to make sure we get a read-write connection
|
||||
with secondary.connect() as promo_conn:
|
||||
promo_cur = promo_conn.cursor()
|
||||
|
||||
promo_cur.execute("SELECT * FROM pg_promote()")
|
||||
assert promo_cur.fetchone() == (True,)
|
||||
promo_cur.execute(
|
||||
"""
|
||||
SELECT pg_current_wal_insert_lsn(),
|
||||
pg_current_wal_lsn(),
|
||||
pg_current_wal_flush_lsn()
|
||||
"""
|
||||
)
|
||||
log.info(f"Secondary: LSN after promotion is {promo_cur.fetchone()}")
|
||||
|
||||
# Reconnect to the secondary to make sure we get a read-write connection
|
||||
with secondary.connect() as new_primary_conn:
|
||||
new_primary_cur = new_primary_conn.cursor()
|
||||
new_primary_cur.execute("select count(*) from t")
|
||||
assert new_primary_cur.fetchone() == (100,)
|
||||
|
||||
new_primary_cur.execute(
|
||||
"INSERT INTO t (payload) SELECT generate_series(101, 200) RETURNING payload"
|
||||
)
|
||||
assert new_primary_cur.fetchall() == [(it,) for it in range(101, 201)]
|
||||
|
||||
new_primary_cur = new_primary_conn.cursor()
|
||||
new_primary_cur.execute("select payload from t")
|
||||
assert new_primary_cur.fetchall() == [(it,) for it in range(1, 201)]
|
||||
|
||||
new_primary_cur.execute("select count(*) from t")
|
||||
assert new_primary_cur.fetchone() == (200,)
|
||||
new_primary_cur.execute(
|
||||
"""
|
||||
SELECT pg_current_wal_insert_lsn(),
|
||||
pg_current_wal_lsn(),
|
||||
pg_current_wal_flush_lsn()
|
||||
"""
|
||||
)
|
||||
log.info(f"Secondary: LSN after workload is {new_primary_cur.fetchone()}")
|
||||
|
||||
with secondary.connect() as second_viewpoint_conn:
|
||||
new_primary_cur = second_viewpoint_conn.cursor()
|
||||
new_primary_cur.execute("select payload from t")
|
||||
assert new_primary_cur.fetchall() == [(it,) for it in range(1, 201)]
|
||||
|
||||
wait_for_last_flush_lsn(env, secondary, env.initial_tenant, env.initial_timeline)
|
||||
|
||||
secondary.stop_and_destroy()
|
||||
|
||||
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
|
||||
|
||||
with primary.connect() as new_primary:
|
||||
new_primary_cur = new_primary.cursor()
|
||||
new_primary_cur.execute(
|
||||
"""
|
||||
SELECT pg_current_wal_insert_lsn(),
|
||||
pg_current_wal_lsn(),
|
||||
pg_current_wal_flush_lsn()
|
||||
"""
|
||||
)
|
||||
log.info(f"New primary: Boot LSN is {new_primary_cur.fetchone()}")
|
||||
|
||||
new_primary_cur.execute("select count(*) from t")
|
||||
assert new_primary_cur.fetchone() == (200,)
|
||||
new_primary_cur.execute("INSERT INTO t (payload) SELECT generate_series(201, 300)")
|
||||
new_primary_cur.execute("select count(*) from t")
|
||||
assert new_primary_cur.fetchone() == (300,)
|
||||
|
||||
primary.stop(mode="immediate")
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: b6eece3f52...55c0d45abe
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 20f8491225...de7640f55d
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 77c63bfebf...0bf96bd6d7
2
vendor/postgres-v17
vendored
2
vendor/postgres-v17
vendored
Submodule vendor/postgres-v17 updated: 32d704d965...8be779fd3a
8
vendor/revisions.json
vendored
8
vendor/revisions.json
vendored
@@ -1,18 +1,18 @@
|
||||
{
|
||||
"v17": [
|
||||
"17.5",
|
||||
"32d704d965d8ad632c0ddef64b45a5ba95536442"
|
||||
"8be779fd3ab9e87206da96a7e4842ef1abf04f44"
|
||||
],
|
||||
"v16": [
|
||||
"16.9",
|
||||
"77c63bfebff5c833682cc2654e2191fec4d5b24e"
|
||||
"0bf96bd6d70301a0b43b0b3457bb3cf8fb43c198"
|
||||
],
|
||||
"v15": [
|
||||
"15.13",
|
||||
"20f8491225f86bdedbc986e9a69ebafb1c94aa99"
|
||||
"de7640f55da07512834d5cc40c4b3fb376b5f04f"
|
||||
],
|
||||
"v14": [
|
||||
"14.18",
|
||||
"b6eece3f528fdc380e6e2c13381434470606787f"
|
||||
"55c0d45abe6467c02084c2192bca117eda6ce1e7"
|
||||
]
|
||||
}
|
||||
|
||||
@@ -18,6 +18,8 @@ license.workspace = true
|
||||
ahash = { version = "0.8" }
|
||||
anstream = { version = "0.6" }
|
||||
anyhow = { version = "1", features = ["backtrace"] }
|
||||
axum = { version = "0.8", features = ["ws"] }
|
||||
axum-core = { version = "0.5", default-features = false, features = ["tracing"] }
|
||||
base64-594e8ee84c453af0 = { package = "base64", version = "0.13", features = ["alloc"] }
|
||||
base64-647d43efb71741da = { package = "base64", version = "0.21" }
|
||||
base64ct = { version = "1", default-features = false, features = ["std"] }
|
||||
@@ -52,7 +54,7 @@ hex = { version = "0.4", features = ["serde"] }
|
||||
hmac = { version = "0.12", default-features = false, features = ["reset"] }
|
||||
hyper-582f2526e08bb6a0 = { package = "hyper", version = "0.14", features = ["client", "http1", "http2", "runtime", "server", "stream"] }
|
||||
hyper-dff4ba8e3ae991db = { package = "hyper", version = "1", features = ["full"] }
|
||||
hyper-util = { version = "0.1", features = ["client-legacy", "http1", "http2", "server", "service"] }
|
||||
hyper-util = { version = "0.1", features = ["client-legacy", "server-auto", "service"] }
|
||||
indexmap = { version = "2", features = ["serde"] }
|
||||
itertools = { version = "0.12" }
|
||||
lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] }
|
||||
@@ -98,7 +100,7 @@ tikv-jemalloc-sys = { version = "0.6", features = ["profiling", "stats", "unpref
|
||||
time = { version = "0.3", features = ["macros", "serde-well-known"] }
|
||||
tokio = { version = "1", features = ["full", "test-util"] }
|
||||
tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring", "tls12"] }
|
||||
tokio-stream = { version = "0.1" }
|
||||
tokio-stream = { version = "0.1", features = ["net"] }
|
||||
tokio-util = { version = "0.7", features = ["codec", "compat", "io", "rt"] }
|
||||
toml_edit = { version = "0.22", features = ["serde"] }
|
||||
tower = { version = "0.5", default-features = false, features = ["balance", "buffer", "limit", "log"] }
|
||||
|
||||
Reference in New Issue
Block a user