From 23aca81943fe659b80a7db9a19410bf03b8edf5a Mon Sep 17 00:00:00 2001 From: Stas Kelvich Date: Fri, 21 Apr 2023 15:17:19 +0300 Subject: [PATCH 01/26] Add SNI-based proxy router In order to not to create NodePorts for each compute we can setup services that accept connections on wildcard domains and then use information from domain name to route connection to some internal service. There are ready solutions for HTTPS and TLS connections but postgresql protocol uses opportunistic TLS and we haven't found any ready solutions. This patch introduces `pg_sni_router` which routes connections to `aaa--bbb--123.external.domain` to `aaa.bbb.123.internal.domain`. In the long run we can avoid console -> compute psql communications, but now this router seems to be the easier way forward. --- proxy/src/bin/pg_sni_router.rs | 226 ++++++++++++++++++++++++++++ proxy/src/{main.rs => bin/proxy.rs} | 83 ++-------- proxy/src/lib.rs | 57 +++++++ proxy/src/proxy.rs | 29 ++-- 4 files changed, 315 insertions(+), 80 deletions(-) create mode 100644 proxy/src/bin/pg_sni_router.rs rename proxy/src/{main.rs => bin/proxy.rs} (79%) create mode 100644 proxy/src/lib.rs diff --git a/proxy/src/bin/pg_sni_router.rs b/proxy/src/bin/pg_sni_router.rs new file mode 100644 index 0000000000..adaf065e1b --- /dev/null +++ b/proxy/src/bin/pg_sni_router.rs @@ -0,0 +1,226 @@ +use std::{net::SocketAddr, sync::Arc}; +use tokio::{net::TcpListener, io::AsyncWriteExt}; + +use anyhow::{bail, ensure, Context}; +use clap::{self, Arg}; +use futures::TryFutureExt; +use proxy::{cancellation::CancelMap, auth::{AuthFlow, self}, compute::ConnCfg, console::messages::MetricsAuxInfo}; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio_util::sync::CancellationToken; +use utils::{project_git_version, sentry_init::init_sentry}; + +use tracing::{error, info, warn}; + +project_git_version!(GIT_VERSION); + +fn cli() -> clap::Command { + clap::Command::new("Neon proxy/router") + .disable_help_flag(true) + .version(GIT_VERSION) + .arg( + Arg::new("listen") + .short('l') + .long("listen") + .help("listen for incoming client connections on ip:port") + .default_value("127.0.0.1:4432"), + ) + .arg( + Arg::new("tls-key") + .short('k') + .long("tls-key") + .help("path to TLS key for client postgres connections"), + ) + .arg( + Arg::new("tls-cert") + .short('c') + .long("tls-cert") + .help("path to TLS cert for client postgres connections"), + ) + .arg( + Arg::new("dest") + .short('d') + .long("destination") + .help("append this domain zone to the SNI hostname to get the destination address"), + ) +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let _logging_guard = proxy::logging::init().await?; + let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook(); + let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]); + + let args = cli().get_matches(); + + // Configure TLS + let tls_config: Arc = match ( + args.get_one::("tls-key"), + args.get_one::("tls-cert"), + ) { + (Some(key_path), Some(cert_path)) => { + let key = { + let key_bytes = std::fs::read(key_path).context("TLS key file")?; + let mut keys = rustls_pemfile::pkcs8_private_keys(&mut &key_bytes[..]) + .context(format!("Failed to read TLS keys at '{key_path}'"))?; + + ensure!(keys.len() == 1, "keys.len() = {} (should be 1)", keys.len()); + keys.pop().map(rustls::PrivateKey).unwrap() + }; + + let cert_chain_bytes = std::fs::read(cert_path) + .context(format!("Failed to read TLS cert file at '{cert_path}.'"))?; + + let cert_chain = { + rustls_pemfile::certs(&mut &cert_chain_bytes[..]) + .context(format!( + "Failed to read TLS certificate chain from bytes from file at '{cert_path}'." + ))? + .into_iter() + .map(rustls::Certificate) + .collect() + }; + + rustls::ServerConfig::builder() + .with_safe_default_cipher_suites() + .with_safe_default_kx_groups() + .with_protocol_versions(&[&rustls::version::TLS13, &rustls::version::TLS12])? + .with_no_client_auth() + .with_single_cert(cert_chain, key)? + .into() + } + _ => bail!("tls-key and tls-cert must be specified"), + }; + + let destination: String = args.get_one::("dest").unwrap().parse()?; + + // Start listening for incoming client connections + let proxy_address: SocketAddr = args.get_one::("listen").unwrap().parse()?; + info!("Starting proxy on {proxy_address}"); + let proxy_listener = TcpListener::bind(proxy_address).await?; + + let cancellation_token = CancellationToken::new(); + let tasks = vec![ + tokio::spawn(proxy::handle_signals(cancellation_token.clone())), + tokio::spawn(task_main( + Arc::new(destination), + tls_config, + proxy_listener, + cancellation_token.clone(), + )), + ]; + + let _tasks = futures::future::try_join_all(tasks.into_iter().map(proxy::flatten_err)).await?; + + Ok(()) +} + +async fn task_main( + dest_suffix: Arc, + tls_config: Arc, + listener: tokio::net::TcpListener, + cancellation_token: CancellationToken, +) -> anyhow::Result<()> { + scopeguard::defer! { + info!("proxy has shut down"); + } + + // When set for the server socket, the keepalive setting + // will be inherited by all accepted client sockets. + socket2::SockRef::from(&listener).set_keepalive(true)?; + + let mut connections = tokio::task::JoinSet::new(); + let cancel_map = Arc::new(CancelMap::default()); + + loop { + tokio::select! { + accept_result = listener.accept() => { + let (socket, peer_addr) = accept_result?; + info!("accepted postgres client connection from {peer_addr}"); + + let session_id = uuid::Uuid::new_v4(); + let cancel_map = Arc::clone(&cancel_map); + let tls_config = Arc::clone(&tls_config); + let dest_suffix = Arc::clone(&dest_suffix); + + connections.spawn( + async move { + info!("spawned a task for {peer_addr}"); + + socket + .set_nodelay(true) + .context("failed to set socket option")?; + + handle_client(dest_suffix, tls_config, &cancel_map, session_id, socket).await + } + .unwrap_or_else(|e| { + // Acknowledge that the task has finished with an error. + error!("per-client task finished with an error: {e:#}"); + }), + ); + } + _ = cancellation_token.cancelled() => { + drop(listener); + break; + } + } + } + // Drain connections + while let Some(res) = connections.join_next().await { + if let Err(e) = res { + if !e.is_panic() && !e.is_cancelled() { + warn!("unexpected error from joined connection task: {e:?}"); + } + } + } + Ok(()) +} + +#[tracing::instrument(fields(session_id = ?session_id), skip_all)] +async fn handle_client( + dest_suffix: Arc, + tls: Arc, + cancel_map: &CancelMap, + session_id: uuid::Uuid, + stream: impl AsyncRead + AsyncWrite + Unpin, +) -> anyhow::Result<()> { + let do_handshake = proxy::proxy::handshake(stream, Some(tls), cancel_map); + let (mut stream, params) = match do_handshake.await? { + Some(x) => x, + None => return Ok(()), // it's a cancellation request + }; + + let password = AuthFlow::new(&mut stream) + .begin(auth::CleartextPassword) + .await? + .authenticate() + .await?; + + let mut conn_cfg = ConnCfg::new(); + conn_cfg.set_startup_params(¶ms); + conn_cfg.password(password); + + // cut off first part of the sni domain + let sni = stream.get_ref().sni_hostname().unwrap(); + let dest = sni + .split_once('.').context("invalid sni")?.0 + .replace("--", "."); + + let destination = format!("{}.{}", dest, dest_suffix); + + info!("destination: {:?}", destination); + + conn_cfg.host(destination.as_str()); + + let mut conn = conn_cfg.connect() + .or_else(|e| stream.throw_error(e)) + .await?; + + cancel_map.with_session(|session| async { + proxy::proxy::prepare_client_connection(&conn, false, session, &mut stream).await?; + let (stream, read_buf) = stream.into_inner(); + conn.stream.write_all(&read_buf).await?; + let metrics_aux: MetricsAuxInfo = Default::default(); + proxy::proxy::proxy_pass(stream, conn.stream, &metrics_aux).await + }) + .await +} diff --git a/proxy/src/main.rs b/proxy/src/bin/proxy.rs similarity index 79% rename from proxy/src/main.rs rename to proxy/src/bin/proxy.rs index 1fd13c9f68..4c66845db6 100644 --- a/proxy/src/main.rs +++ b/proxy/src/bin/proxy.rs @@ -1,49 +1,22 @@ -//! Postgres protocol proxy/router. -//! -//! This service listens psql port and can check auth via external service -//! (control plane API in our case) and can create new databases and accounts -//! in somewhat transparent manner (again via communication with control plane API). +use proxy::auth; +use proxy::console; +use proxy::http; +use proxy::metrics; -mod auth; -mod cache; -mod cancellation; -mod compute; -mod config; -mod console; -mod error; -mod http; -mod logging; -mod metrics; -mod parse; -mod proxy; -mod sasl; -mod scram; -mod stream; -mod url; -mod waiters; - -use anyhow::{bail, Context}; +use anyhow::bail; use clap::{self, Arg}; -use config::ProxyConfig; -use futures::FutureExt; -use std::{borrow::Cow, future::Future, net::SocketAddr}; -use tokio::{net::TcpListener, task::JoinError}; +use proxy::config::{self, ProxyConfig}; +use std::{borrow::Cow, net::SocketAddr}; +use tokio::{net::TcpListener}; use tokio_util::sync::CancellationToken; -use tracing::{info, warn}; +use tracing::info; use utils::{project_git_version, sentry_init::init_sentry}; project_git_version!(GIT_VERSION); -/// Flattens `Result>` into `Result`. -async fn flatten_err( - f: impl Future, JoinError>>, -) -> anyhow::Result<()> { - f.map(|r| r.context("join error").and_then(|x| x)).await -} - #[tokio::main] async fn main() -> anyhow::Result<()> { - let _logging_guard = logging::init().await?; + let _logging_guard = proxy::logging::init().await?; let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook(); let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]); @@ -69,7 +42,7 @@ async fn main() -> anyhow::Result<()> { let proxy_listener = TcpListener::bind(proxy_address).await?; let cancellation_token = CancellationToken::new(); - let mut client_tasks = vec![tokio::spawn(proxy::task_main( + let mut client_tasks = vec![tokio::spawn(proxy::proxy::task_main( config, proxy_listener, cancellation_token.clone(), @@ -88,7 +61,7 @@ async fn main() -> anyhow::Result<()> { } let mut tasks = vec![ - tokio::spawn(handle_signals(cancellation_token)), + tokio::spawn(proxy::handle_signals(cancellation_token)), tokio::spawn(http::server::task_main(http_listener)), tokio::spawn(console::mgmt::task_main(mgmt_listener)), ]; @@ -97,8 +70,9 @@ async fn main() -> anyhow::Result<()> { tasks.push(tokio::spawn(metrics::task_main(metrics_config))); } - let tasks = futures::future::try_join_all(tasks.into_iter().map(flatten_err)); - let client_tasks = futures::future::try_join_all(client_tasks.into_iter().map(flatten_err)); + let tasks = futures::future::try_join_all(tasks.into_iter().map(proxy::flatten_err)); + let client_tasks = + futures::future::try_join_all(client_tasks.into_iter().map(proxy::flatten_err)); tokio::select! { // We are only expecting an error from these forever tasks res = tasks => { res?; }, @@ -107,33 +81,6 @@ async fn main() -> anyhow::Result<()> { Ok(()) } -/// Handle unix signals appropriately. -async fn handle_signals(token: CancellationToken) -> anyhow::Result<()> { - use tokio::signal::unix::{signal, SignalKind}; - - let mut hangup = signal(SignalKind::hangup())?; - let mut interrupt = signal(SignalKind::interrupt())?; - let mut terminate = signal(SignalKind::terminate())?; - - loop { - tokio::select! { - // Hangup is commonly used for config reload. - _ = hangup.recv() => { - warn!("received SIGHUP; config reload is not supported"); - } - // Shut down the whole application. - _ = interrupt.recv() => { - warn!("received SIGINT, exiting immediately"); - bail!("interrupted"); - } - _ = terminate.recv() => { - warn!("received SIGTERM, shutting down once all existing connections have closed"); - token.cancel(); - } - } - } -} - /// ProxyConfig is created at proxy startup, and lives forever. fn build_config(args: &clap::ArgMatches) -> anyhow::Result<&'static ProxyConfig> { let tls_config = match ( diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs new file mode 100644 index 0000000000..148ee67d90 --- /dev/null +++ b/proxy/src/lib.rs @@ -0,0 +1,57 @@ +use anyhow::{bail, Context}; +use futures::{Future, FutureExt}; +use tokio::task::JoinError; +use tokio_util::sync::CancellationToken; +use tracing::warn; + +pub mod auth; +pub mod cache; +pub mod cancellation; +pub mod compute; +pub mod config; +pub mod console; +pub mod error; +pub mod http; +pub mod logging; +pub mod metrics; +pub mod parse; +pub mod proxy; +pub mod sasl; +pub mod scram; +pub mod stream; +pub mod url; +pub mod waiters; + +/// Handle unix signals appropriately. +pub async fn handle_signals(token: CancellationToken) -> anyhow::Result<()> { + use tokio::signal::unix::{signal, SignalKind}; + + let mut hangup = signal(SignalKind::hangup())?; + let mut interrupt = signal(SignalKind::interrupt())?; + let mut terminate = signal(SignalKind::terminate())?; + + loop { + tokio::select! { + // Hangup is commonly used for config reload. + _ = hangup.recv() => { + warn!("received SIGHUP; config reload is not supported"); + } + // Shut down the whole application. + _ = interrupt.recv() => { + warn!("received SIGINT, exiting immediately"); + bail!("interrupted"); + } + _ = terminate.recv() => { + warn!("received SIGTERM, shutting down once all existing connections have closed"); + token.cancel(); + } + } + } +} + +/// Flattens `Result>` into `Result`. +pub async fn flatten_err( + f: impl Future, JoinError>>, +) -> anyhow::Result<()> { + f.map(|r| r.context("join error").and_then(|x| x)).await +} diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index 1169d76160..e20c31e74c 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -5,7 +5,7 @@ use crate::{ auth::{self, backend::AuthSuccess}, cancellation::{self, CancelMap}, compute::{self, PostgresConnection}, - config::{ProxyConfig, TlsConfig}, + config::ProxyConfig, console::{self, messages::MetricsAuxInfo}, error::io_error, stream::{PqStream, Stream}, @@ -174,7 +174,7 @@ async fn handle_client( NUM_CONNECTIONS_CLOSED_COUNTER.inc(); } - let tls = config.tls_config.as_ref(); + let tls = config.tls_config.as_ref().map(|t| t.to_server_config()); let do_handshake = handshake(stream, tls, cancel_map); let (mut stream, params) = match do_handshake.await? { Some(x) => x, @@ -184,7 +184,10 @@ async fn handle_client( // Extract credentials which we're going to use for auth. let creds = { let sni = stream.get_ref().sni_hostname(); - let common_names = tls.and_then(|tls| tls.common_names.clone()); + let common_names = config + .tls_config + .as_ref() + .and_then(|tls| tls.common_names.clone()); let result = config .auth_backend .as_ref() @@ -205,13 +208,14 @@ async fn handle_client( /// It's easier to work with owned `stream` here as we need to upgrade it to TLS; /// we also take an extra care of propagating only the select handshake errors to client. #[tracing::instrument(skip_all)] -async fn handshake( +pub async fn handshake( stream: S, - mut tls: Option<&TlsConfig>, + tls: Option>, cancel_map: &CancelMap, ) -> anyhow::Result>, StartupMessageParams)>> { // Client may try upgrading to each protocol only once let (mut tried_ssl, mut tried_gss) = (false, false); + let mut tls_upgraded = false; let mut stream = PqStream::new(Stream::from_raw(stream)); loop { @@ -226,8 +230,9 @@ async fn handshake( // We can't perform TLS handshake without a config let enc = tls.is_some(); + stream.write_message(&Be::EncryptionResponse(enc)).await?; - if let Some(tls) = tls.take() { + if let Some(tls) = tls.clone() { // Upgrade raw stream into a secure TLS-backed stream. // NOTE: We've consumed `tls`; this fact will be used later. @@ -241,7 +246,8 @@ async fn handshake( if !read_buf.is_empty() { bail!("data is sent before server replied with EncryptionResponse"); } - stream = PqStream::new(raw.upgrade(tls.to_server_config()).await?); + stream = PqStream::new(raw.upgrade(tls).await?); + tls_upgraded = true; } } _ => bail!(ERR_PROTO_VIOLATION), @@ -256,9 +262,8 @@ async fn handshake( _ => bail!(ERR_PROTO_VIOLATION), }, StartupMessage { params, .. } => { - // Check that the config has been consumed during upgrade - // OR we didn't provide it at all (for dev purposes). - if tls.is_some() { + // Check that tls was actually upgraded + if !tls_upgraded { stream.throw_error_str(ERR_INSECURE_CONNECTION).await?; } @@ -340,7 +345,7 @@ async fn connect_to_compute( /// Finish client connection initialization: confirm auth success, send params, etc. #[tracing::instrument(skip_all)] -async fn prepare_client_connection( +pub async fn prepare_client_connection( node: &compute::PostgresConnection, reported_auth_ok: bool, session: cancellation::Session<'_>, @@ -378,7 +383,7 @@ async fn prepare_client_connection( /// Forward bytes in both directions (client <-> compute). #[tracing::instrument(skip_all)] -async fn proxy_pass( +pub async fn proxy_pass( client: impl AsyncRead + AsyncWrite + Unpin, compute: impl AsyncRead + AsyncWrite + Unpin, aux: &MetricsAuxInfo, From 556fb1642a5213d3e747163662c7867cd19edd45 Mon Sep 17 00:00:00 2001 From: Anton Chaporgin Date: Thu, 27 Apr 2023 13:25:45 +0300 Subject: [PATCH 02/26] fixed the way hostname is parsed --- proxy/src/bin/pg_sni_router.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/proxy/src/bin/pg_sni_router.rs b/proxy/src/bin/pg_sni_router.rs index adaf065e1b..3386d763e5 100644 --- a/proxy/src/bin/pg_sni_router.rs +++ b/proxy/src/bin/pg_sni_router.rs @@ -201,15 +201,15 @@ async fn handle_client( // cut off first part of the sni domain let sni = stream.get_ref().sni_hostname().unwrap(); - let dest = sni + let dest: Vec<&str> = sni .split_once('.').context("invalid sni")?.0 - .replace("--", "."); - - let destination = format!("{}.{}", dest, dest_suffix); + .splitn(3, "--").collect(); + let destination = format!("{}.{}.{}", dest[0], dest[1], dest_suffix); info!("destination: {:?}", destination); conn_cfg.host(destination.as_str()); + conn_cfg.port(6432); // TODO: it's a pooler and should be passed externally let mut conn = conn_cfg.connect() .or_else(|e| stream.throw_error(e)) From 81c75586ab2d7a2cc7cf6c303178cf62522e0132 Mon Sep 17 00:00:00 2001 From: Alexey Kondratov Date: Thu, 27 Apr 2023 12:45:54 +0200 Subject: [PATCH 03/26] Take port from SNI, formatting, make clippy happy --- proxy/src/bin/pg_sni_router.rs | 45 ++++++++++++++++++++++------------ proxy/src/bin/proxy.rs | 2 +- proxy/src/compute.rs | 6 +++++ proxy/src/proxy/tests.rs | 9 ++++++- 4 files changed, 44 insertions(+), 18 deletions(-) diff --git a/proxy/src/bin/pg_sni_router.rs b/proxy/src/bin/pg_sni_router.rs index 3386d763e5..84605d4941 100644 --- a/proxy/src/bin/pg_sni_router.rs +++ b/proxy/src/bin/pg_sni_router.rs @@ -1,10 +1,15 @@ use std::{net::SocketAddr, sync::Arc}; -use tokio::{net::TcpListener, io::AsyncWriteExt}; +use tokio::{io::AsyncWriteExt, net::TcpListener}; use anyhow::{bail, ensure, Context}; use clap::{self, Arg}; use futures::TryFutureExt; -use proxy::{cancellation::CancelMap, auth::{AuthFlow, self}, compute::ConnCfg, console::messages::MetricsAuxInfo}; +use proxy::{ + auth::{self, AuthFlow}, + cancellation::CancelMap, + compute::ConnCfg, + console::messages::MetricsAuxInfo, +}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_util::sync::CancellationToken; use utils::{project_git_version, sentry_init::init_sentry}; @@ -199,28 +204,36 @@ async fn handle_client( conn_cfg.set_startup_params(¶ms); conn_cfg.password(password); - // cut off first part of the sni domain + // Cut off first part of the SNI domain + // We receive required destination details in the format of + // `{k8s_service_name}--{k8s_namespace}--{port}.non-sni-domain` let sni = stream.get_ref().sni_hostname().unwrap(); let dest: Vec<&str> = sni - .split_once('.').context("invalid sni")?.0 - .splitn(3, "--").collect(); + .split_once('.') + .context("invalid SNI")? + .0 + .splitn(3, "--") + .collect(); let destination = format!("{}.{}.{}", dest[0], dest[1], dest_suffix); + let port = dest[2].parse::().context("invalid port")?; - info!("destination: {:?}", destination); + info!("destination: {:?}:{}", destination, port); conn_cfg.host(destination.as_str()); - conn_cfg.port(6432); // TODO: it's a pooler and should be passed externally + conn_cfg.port(port); - let mut conn = conn_cfg.connect() + let mut conn = conn_cfg + .connect() .or_else(|e| stream.throw_error(e)) .await?; - cancel_map.with_session(|session| async { - proxy::proxy::prepare_client_connection(&conn, false, session, &mut stream).await?; - let (stream, read_buf) = stream.into_inner(); - conn.stream.write_all(&read_buf).await?; - let metrics_aux: MetricsAuxInfo = Default::default(); - proxy::proxy::proxy_pass(stream, conn.stream, &metrics_aux).await - }) - .await + cancel_map + .with_session(|session| async { + proxy::proxy::prepare_client_connection(&conn, false, session, &mut stream).await?; + let (stream, read_buf) = stream.into_inner(); + conn.stream.write_all(&read_buf).await?; + let metrics_aux: MetricsAuxInfo = Default::default(); + proxy::proxy::proxy_pass(stream, conn.stream, &metrics_aux).await + }) + .await } diff --git a/proxy/src/bin/proxy.rs b/proxy/src/bin/proxy.rs index 4c66845db6..9b5e8916fe 100644 --- a/proxy/src/bin/proxy.rs +++ b/proxy/src/bin/proxy.rs @@ -7,7 +7,7 @@ use anyhow::bail; use clap::{self, Arg}; use proxy::config::{self, ProxyConfig}; use std::{borrow::Cow, net::SocketAddr}; -use tokio::{net::TcpListener}; +use tokio::net::TcpListener; use tokio_util::sync::CancellationToken; use tracing::info; use utils::{project_git_version, sentry_init::init_sentry}; diff --git a/proxy/src/compute.rs b/proxy/src/compute.rs index 0465703ae6..f95c00de14 100644 --- a/proxy/src/compute.rs +++ b/proxy/src/compute.rs @@ -125,6 +125,12 @@ impl std::ops::DerefMut for ConnCfg { } } +impl Default for ConnCfg { + fn default() -> Self { + Self::new() + } +} + impl ConnCfg { /// Establish a raw TCP connection to the compute node. async fn connect_raw(&self) -> io::Result<(SocketAddr, TcpStream)> { diff --git a/proxy/src/proxy/tests.rs b/proxy/src/proxy/tests.rs index 60acb588dc..3ff6a8b63f 100644 --- a/proxy/src/proxy/tests.rs +++ b/proxy/src/proxy/tests.rs @@ -1,6 +1,9 @@ ///! A group of high-level tests for connection establishing logic and auth. use super::*; + +use crate::config::TlsConfig; use crate::{auth, sasl, scram}; + use async_trait::async_trait; use rstest::rstest; use tokio_postgres::config::SslMode; @@ -133,7 +136,11 @@ async fn dummy_proxy( auth: impl TestAuth + Send, ) -> anyhow::Result<()> { let cancel_map = CancelMap::default(); - let (mut stream, _params) = handshake(client, tls.as_ref(), &cancel_map) + let server_config = match tls { + Some(tls) => Some(tls.config), + None => None, + }; + let (mut stream, _params) = handshake(client, server_config, &cancel_map) .await? .context("handshake failed")?; From b15204fa8c05d98385756faf9c1623020566295c Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Wed, 26 Apr 2023 15:18:19 +0300 Subject: [PATCH 04/26] Fix --help, and required args --- proxy/src/bin/pg_sni_router.rs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/proxy/src/bin/pg_sni_router.rs b/proxy/src/bin/pg_sni_router.rs index 84605d4941..b4fcbcefbd 100644 --- a/proxy/src/bin/pg_sni_router.rs +++ b/proxy/src/bin/pg_sni_router.rs @@ -1,3 +1,8 @@ +/// A stand-alone program that routes connections, e.g. from +/// `aaa--bbb--123.external.domain` to `aaa.bbb.123.internal.domain`. +/// +/// This allows connecting to pods/services running in the same Kubernetes cluster from +/// the outside. Similar to an ingress controller for HTTPS. use std::{net::SocketAddr, sync::Arc}; use tokio::{io::AsyncWriteExt, net::TcpListener}; @@ -20,7 +25,6 @@ project_git_version!(GIT_VERSION); fn cli() -> clap::Command { clap::Command::new("Neon proxy/router") - .disable_help_flag(true) .version(GIT_VERSION) .arg( Arg::new("listen") @@ -33,19 +37,22 @@ fn cli() -> clap::Command { Arg::new("tls-key") .short('k') .long("tls-key") - .help("path to TLS key for client postgres connections"), + .help("path to TLS key for client postgres connections") + .required(true), ) .arg( Arg::new("tls-cert") .short('c') .long("tls-cert") - .help("path to TLS cert for client postgres connections"), + .help("path to TLS cert for client postgres connections") + .required(true), ) .arg( Arg::new("dest") .short('d') .long("destination") - .help("append this domain zone to the SNI hostname to get the destination address"), + .help("append this domain zone to the SNI hostname to get the destination address") + .required(true), ) } From 3813c703c9efb232a56ab8637082bd03ff0122f0 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Wed, 26 Apr 2023 15:22:02 +0300 Subject: [PATCH 05/26] Add an option for destination port. Makes it easier to test locally. --- proxy/src/bin/pg_sni_router.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/proxy/src/bin/pg_sni_router.rs b/proxy/src/bin/pg_sni_router.rs index b4fcbcefbd..7e1f6ea940 100644 --- a/proxy/src/bin/pg_sni_router.rs +++ b/proxy/src/bin/pg_sni_router.rs @@ -63,6 +63,7 @@ async fn main() -> anyhow::Result<()> { let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]); let args = cli().get_matches(); + let destination: String = args.get_one::("dest").unwrap().parse()?; // Configure TLS let tls_config: Arc = match ( @@ -103,8 +104,6 @@ async fn main() -> anyhow::Result<()> { _ => bail!("tls-key and tls-cert must be specified"), }; - let destination: String = args.get_one::("dest").unwrap().parse()?; - // Start listening for incoming client connections let proxy_address: SocketAddr = args.get_one::("listen").unwrap().parse()?; info!("Starting proxy on {proxy_address}"); @@ -225,7 +224,6 @@ async fn handle_client( let port = dest[2].parse::().context("invalid port")?; info!("destination: {:?}:{}", destination, port); - conn_cfg.host(destination.as_str()); conn_cfg.port(port); From 53e5d18da51d3616af14df10ade736a4d750c2c0 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Wed, 26 Apr 2023 15:24:19 +0300 Subject: [PATCH 06/26] Start passthrough earlier As soon as we have received the SSLRequest packet, and have figured out the hostname to connect to from the SNI, we can start passing through data. We don't need to parse the StartupPacket that the client will send next. --- proxy/src/bin/pg_sni_router.rs | 99 ++++++++++++++++++---------------- 1 file changed, 53 insertions(+), 46 deletions(-) diff --git a/proxy/src/bin/pg_sni_router.rs b/proxy/src/bin/pg_sni_router.rs index 7e1f6ea940..17660e5b31 100644 --- a/proxy/src/bin/pg_sni_router.rs +++ b/proxy/src/bin/pg_sni_router.rs @@ -4,17 +4,17 @@ /// This allows connecting to pods/services running in the same Kubernetes cluster from /// the outside. Similar to an ingress controller for HTTPS. use std::{net::SocketAddr, sync::Arc}; -use tokio::{io::AsyncWriteExt, net::TcpListener}; -use anyhow::{bail, ensure, Context}; +use tokio::net::TcpListener; +// use tokio::net::TcpListener; + +use anyhow::{anyhow, bail, ensure, Context}; use clap::{self, Arg}; use futures::TryFutureExt; -use proxy::{ - auth::{self, AuthFlow}, - cancellation::CancelMap, - compute::ConnCfg, - console::messages::MetricsAuxInfo, -}; +use proxy::console::messages::MetricsAuxInfo; +// use proxy::console::messages::MetricsAuxInfo; +use proxy::stream::{PqStream, Stream}; + use tokio::io::{AsyncRead, AsyncWrite}; use tokio_util::sync::CancellationToken; use utils::{project_git_version, sentry_init::init_sentry}; @@ -140,7 +140,6 @@ async fn task_main( socket2::SockRef::from(&listener).set_keepalive(true)?; let mut connections = tokio::task::JoinSet::new(); - let cancel_map = Arc::new(CancelMap::default()); loop { tokio::select! { @@ -149,7 +148,6 @@ async fn task_main( info!("accepted postgres client connection from {peer_addr}"); let session_id = uuid::Uuid::new_v4(); - let cancel_map = Arc::clone(&cancel_map); let tls_config = Arc::clone(&tls_config); let dest_suffix = Arc::clone(&dest_suffix); @@ -161,7 +159,7 @@ async fn task_main( .set_nodelay(true) .context("failed to set socket option")?; - handle_client(dest_suffix, tls_config, &cancel_map, session_id, socket).await + handle_client(dest_suffix, tls_config, session_id, socket).await } .unwrap_or_else(|e| { // Acknowledge that the task has finished with an error. @@ -186,59 +184,68 @@ async fn task_main( Ok(()) } +const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)"; + +async fn ssl_handshake( + raw_stream: S, + tls_config: Arc, +) -> anyhow::Result> { + let mut stream = PqStream::new(Stream::from_raw(raw_stream)); + + let msg = stream.read_startup_packet().await?; + info!("received {msg:?}"); + use pq_proto::FeStartupPacket::*; + + match msg { + SslRequest => { + stream + .write_message(&pq_proto::BeMessage::EncryptionResponse(true)) + .await?; + // Upgrade raw stream into a secure TLS-backed stream. + // NOTE: We've consumed `tls`; this fact will be used later. + + let (raw, read_buf) = stream.into_inner(); + // TODO: Normally, client doesn't send any data before + // server says TLS handshake is ok and read_buf is empy. + // However, you could imagine pipelining of postgres + // SSLRequest + TLS ClientHello in one hunk similar to + // pipelining in our node js driver. We should probably + // support that by chaining read_buf with the stream. + if !read_buf.is_empty() { + bail!("data is sent before server replied with EncryptionResponse"); + } + Ok(raw.upgrade(tls_config).await?) + } + _ => stream.throw_error_str(ERR_INSECURE_CONNECTION).await?, + } +} + #[tracing::instrument(fields(session_id = ?session_id), skip_all)] async fn handle_client( dest_suffix: Arc, - tls: Arc, - cancel_map: &CancelMap, + tls_config: Arc, session_id: uuid::Uuid, stream: impl AsyncRead + AsyncWrite + Unpin, ) -> anyhow::Result<()> { - let do_handshake = proxy::proxy::handshake(stream, Some(tls), cancel_map); - let (mut stream, params) = match do_handshake.await? { - Some(x) => x, - None => return Ok(()), // it's a cancellation request - }; - - let password = AuthFlow::new(&mut stream) - .begin(auth::CleartextPassword) - .await? - .authenticate() - .await?; - - let mut conn_cfg = ConnCfg::new(); - conn_cfg.set_startup_params(¶ms); - conn_cfg.password(password); + let tls_stream = ssl_handshake(stream, tls_config).await?; // Cut off first part of the SNI domain // We receive required destination details in the format of // `{k8s_service_name}--{k8s_namespace}--{port}.non-sni-domain` - let sni = stream.get_ref().sni_hostname().unwrap(); + let sni = tls_stream.sni_hostname().ok_or(anyhow!("SNI missing"))?; let dest: Vec<&str> = sni .split_once('.') .context("invalid SNI")? .0 .splitn(3, "--") .collect(); - let destination = format!("{}.{}.{}", dest[0], dest[1], dest_suffix); let port = dest[2].parse::().context("invalid port")?; + let destination = format!("{}.{}.{}:{}", dest[0], dest[1], dest_suffix, port); - info!("destination: {:?}:{}", destination, port); - conn_cfg.host(destination.as_str()); - conn_cfg.port(port); + info!("destination: {}", destination); - let mut conn = conn_cfg - .connect() - .or_else(|e| stream.throw_error(e)) - .await?; + let client = tokio::net::TcpStream::connect(destination).await?; - cancel_map - .with_session(|session| async { - proxy::proxy::prepare_client_connection(&conn, false, session, &mut stream).await?; - let (stream, read_buf) = stream.into_inner(); - conn.stream.write_all(&read_buf).await?; - let metrics_aux: MetricsAuxInfo = Default::default(); - proxy::proxy::proxy_pass(stream, conn.stream, &metrics_aux).await - }) - .await + let metrics_aux: MetricsAuxInfo = Default::default(); + proxy::proxy::proxy_pass(tls_stream, client, &metrics_aux).await } From e947cc119b9414eaa93f59e4622ca42461975e09 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Wed, 26 Apr 2023 15:28:48 +0300 Subject: [PATCH 07/26] Add a small test case for pg_sni_router --- test_runner/regress/test_sni_router.py | 139 +++++++++++++++++++++++++ 1 file changed, 139 insertions(+) create mode 100644 test_runner/regress/test_sni_router.py diff --git a/test_runner/regress/test_sni_router.py b/test_runner/regress/test_sni_router.py new file mode 100644 index 0000000000..334b587c38 --- /dev/null +++ b/test_runner/regress/test_sni_router.py @@ -0,0 +1,139 @@ +import socket +import subprocess +from pathlib import Path +from types import TracebackType +from typing import Any, Dict, Iterator, List, Optional, Tuple, Type, Union, cast + +import backoff # type: ignore +import psycopg2 +import pytest +from fixtures.log_helper import log +from fixtures.neon_fixtures import PgProtocol, PortDistributor, VanillaPostgres + + +def generate_tls_cert(cn, certout, keyout): + subprocess.run( + [ + "openssl", + "req", + "-new", + "-x509", + "-days", + "365", + "-nodes", + "-out", + certout, + "-keyout", + keyout, + "-subj", + f"/CN={cn}", + ] + ) + + +class PgSniRouter(PgProtocol): + def __init__( + self, + neon_binpath: Path, + port: int, + destination: str, + destination_port: int, + tls_cert: Path, + tls_key: Path, + ): + # Must use a hostname rather than IP here, for SNI to work + host = "localhost" + super().__init__(host=host, port=port) + + self.host = host + self.neon_binpath = neon_binpath + self.port = port + self.destination = destination + self.destination_port = destination_port + self.tls_cert = tls_cert + self.tls_key = tls_key + self._popen: Optional[subprocess.Popen[bytes]] = None + + def start(self) -> "PgSniRouter": + assert self._popen is None + args = [ + str(self.neon_binpath / "pg_sni_router"), + *["--listen", f"127.0.0.1:{self.port}"], + *["--tls-cert", self.tls_cert], + *["--tls-key", self.tls_key], + *["--destination", self.destination], + *["--destination-port", str(self.destination_port)], + ] + + self._popen = subprocess.Popen(args) + self._wait_until_ready() + return self + + @backoff.on_exception(backoff.expo, OSError, max_time=10) + def _wait_until_ready(self): + socket.create_connection((self.host, self.port)) + + # Sends SIGTERM to the proxy if it has been started + def terminate(self): + if self._popen: + self._popen.terminate() + + # Waits for proxy to exit if it has been opened with a default timeout of + # two seconds. Raises subprocess.TimeoutExpired if the proxy does not exit in time. + def wait_for_exit(self, timeout=2): + if self._popen: + self._popen.wait(timeout=2) + + def __enter__(self) -> "PgSniRouter": + return self + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc: Optional[BaseException], + tb: Optional[TracebackType], + ): + if self._popen is not None: + self._popen.terminate() + try: + self._popen.wait(timeout=5) + except subprocess.TimeoutExpired: + log.warn("failed to gracefully terminate pg_sni_router; killing") + self._popen.kill() + + +def test_pg_sni_router( + vanilla_pg: VanillaPostgres, + port_distributor: PortDistributor, + neon_binpath: Path, + test_output_dir: Path, +): + + generate_tls_cert( + "external.test", test_output_dir / "router.crt", test_output_dir / "router.key" + ) + + # Start a stand-alone Postgres to test with + vanilla_pg.start() + pg_port = vanilla_pg.default_options["port"] + + router_port = port_distributor.get_port() + + with PgSniRouter( + neon_binpath=neon_binpath, + port=router_port, + destination="localhost", + destination_port=pg_port, + tls_cert=test_output_dir / "router.crt", + tls_key=test_output_dir / "router.key", + ) as router: + router.start() + + out = router.safe_psql( + "select 1", + dbname="postgres", + sslmode="require", + host="localhost.external.test", + hostaddr="127.0.0.1", + ) + assert out[0][0] == 1 From 645e4f6ab94ed4eceb9a8621e19471ed27313fb8 Mon Sep 17 00:00:00 2001 From: Stas Kelvich Date: Fri, 28 Apr 2023 02:13:35 +0300 Subject: [PATCH 08/26] use TLS in link proxy --- Cargo.lock | 114 ++++++++++++++++++++++++++++- Cargo.toml | 12 +-- proxy/Cargo.toml | 2 + proxy/src/auth/backend/link.rs | 4 +- proxy/src/bin/pg_sni_router.rs | 8 +- proxy/src/compute.rs | 34 +++++++-- proxy/src/console/provider/mock.rs | 4 +- proxy/src/console/provider/neon.rs | 3 +- 8 files changed, 156 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5f3a83ce2d..b3705303e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1574,6 +1574,21 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.1.0" @@ -2361,6 +2376,24 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" +[[package]] +name = "native-tls" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e" +dependencies = [ + "lazy_static", + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "nix" version = "0.26.2" @@ -2483,12 +2516,50 @@ version = "11.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" +[[package]] +name = "openssl" +version = "0.10.52" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01b8574602df80f7b85fdfc5392fa884a4e3b3f4f35402c070ab34c3d3f78d56" +dependencies = [ + "bitflags", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.15", +] + [[package]] name = "openssl-probe" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "openssl-sys" +version = "0.9.87" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e17f59264b2809d77ae94f0e1ebabc434773f370d6ca667bd223ea10e06cc7e" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "opentelemetry" version = "0.18.0" @@ -2816,6 +2887,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkg-config" +version = "0.3.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ac9a59f73473f1b8d852421e59e64809f025994837ef743615c6d0c5b305160" + [[package]] name = "plotters" version = "0.3.4" @@ -2847,7 +2924,7 @@ dependencies = [ [[package]] name = "postgres" version = "0.19.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=43e6db254a97fdecbce33d8bc0890accfd74495e#43e6db254a97fdecbce33d8bc0890accfd74495e" +source = "git+https://github.com/neondatabase/rust-postgres.git?rev=27fc5729cc71a042e48eb980c8e736762138efe6#27fc5729cc71a042e48eb980c8e736762138efe6" dependencies = [ "bytes", "fallible-iterator", @@ -2857,10 +2934,21 @@ dependencies = [ "tokio-postgres", ] +[[package]] +name = "postgres-native-tls" +version = "0.5.0" +source = "git+https://github.com/neondatabase/rust-postgres.git?rev=27fc5729cc71a042e48eb980c8e736762138efe6#27fc5729cc71a042e48eb980c8e736762138efe6" +dependencies = [ + "native-tls", + "tokio", + "tokio-native-tls", + "tokio-postgres", +] + [[package]] name = "postgres-protocol" version = "0.6.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=43e6db254a97fdecbce33d8bc0890accfd74495e#43e6db254a97fdecbce33d8bc0890accfd74495e" +source = "git+https://github.com/neondatabase/rust-postgres.git?rev=27fc5729cc71a042e48eb980c8e736762138efe6#27fc5729cc71a042e48eb980c8e736762138efe6" dependencies = [ "base64 0.20.0", "byteorder", @@ -2878,7 +2966,7 @@ dependencies = [ [[package]] name = "postgres-types" version = "0.2.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=43e6db254a97fdecbce33d8bc0890accfd74495e#43e6db254a97fdecbce33d8bc0890accfd74495e" +source = "git+https://github.com/neondatabase/rust-postgres.git?rev=27fc5729cc71a042e48eb980c8e736762138efe6#27fc5729cc71a042e48eb980c8e736762138efe6" dependencies = [ "bytes", "fallible-iterator", @@ -3109,10 +3197,12 @@ dependencies = [ "itertools", "md5", "metrics", + "native-tls", "once_cell", "opentelemetry", "parking_lot", "pin-project-lite", + "postgres-native-tls", "postgres_backend", "pq_proto", "prometheus", @@ -4319,10 +4409,20 @@ dependencies = [ "syn 2.0.15", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-postgres" version = "0.7.7" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=43e6db254a97fdecbce33d8bc0890accfd74495e#43e6db254a97fdecbce33d8bc0890accfd74495e" +source = "git+https://github.com/neondatabase/rust-postgres.git?rev=27fc5729cc71a042e48eb980c8e736762138efe6#27fc5729cc71a042e48eb980c8e736762138efe6" dependencies = [ "async-trait", "byteorder", @@ -4914,6 +5014,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.4" diff --git a/Cargo.toml b/Cargo.toml index f4872433cd..bdfa07f53e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,6 +62,7 @@ jsonwebtoken = "8" libc = "0.2" md5 = "0.7.0" memoffset = "0.8" +native-tls = "0.2" nix = "0.26" notify = "5.0.0" num_cpus = "1.15" @@ -124,10 +125,11 @@ env_logger = "0.10" log = "0.4" ## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed -postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" } -postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" } -postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" } -tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" } +postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="27fc5729cc71a042e48eb980c8e736762138efe6" } +postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="27fc5729cc71a042e48eb980c8e736762138efe6" } +postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="27fc5729cc71a042e48eb980c8e736762138efe6" } +postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="27fc5729cc71a042e48eb980c8e736762138efe6" } +tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="27fc5729cc71a042e48eb980c8e736762138efe6" } tokio-tar = { git = "https://github.com/neondatabase/tokio-tar.git", rev="404df61437de0feef49ba2ccdbdd94eb8ad6e142" } ## Other git libraries @@ -162,7 +164,7 @@ tonic-build = "0.9" # This is only needed for proxy's tests. # TODO: we should probably fork `tokio-postgres-rustls` instead. [patch.crates-io] -tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" } +tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="27fc5729cc71a042e48eb980c8e736762138efe6" } ################# Binary contents sections diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index 9d702b29c3..e7a4fd236e 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -62,6 +62,8 @@ utils.workspace = true uuid.workspace = true webpki-roots.workspace = true x509-parser.workspace = true +native-tls.workspace = true +postgres-native-tls.workspace = true workspace_hack.workspace = true tokio-util.workspace = true diff --git a/proxy/src/auth/backend/link.rs b/proxy/src/auth/backend/link.rs index 7175a23dc1..ee6b781fae 100644 --- a/proxy/src/auth/backend/link.rs +++ b/proxy/src/auth/backend/link.rs @@ -9,6 +9,7 @@ use crate::{ use pq_proto::BeMessage as Be; use thiserror::Error; use tokio::io::{AsyncRead, AsyncWrite}; +use tokio_postgres::config::SslMode; use tracing::{info, info_span}; #[derive(Debug, Error)] @@ -85,7 +86,8 @@ pub(super) async fn authenticate( .host(&db_info.host) .port(db_info.port) .dbname(&db_info.dbname) - .user(&db_info.user); + .user(&db_info.user) + .ssl_mode(SslMode::Require); // we need TLS connection with SNI to properly route it if let Some(password) = db_info.password { config.password(password.as_ref()); diff --git a/proxy/src/bin/pg_sni_router.rs b/proxy/src/bin/pg_sni_router.rs index 17660e5b31..3b8852ecc8 100644 --- a/proxy/src/bin/pg_sni_router.rs +++ b/proxy/src/bin/pg_sni_router.rs @@ -1,18 +1,16 @@ /// A stand-alone program that routes connections, e.g. from -/// `aaa--bbb--123.external.domain` to `aaa.bbb.123.internal.domain`. +/// `aaa--bbb--1234.external.domain` to `aaa.bbb.internal.domain:1234`. /// /// This allows connecting to pods/services running in the same Kubernetes cluster from /// the outside. Similar to an ingress controller for HTTPS. use std::{net::SocketAddr, sync::Arc}; use tokio::net::TcpListener; -// use tokio::net::TcpListener; use anyhow::{anyhow, bail, ensure, Context}; use clap::{self, Arg}; use futures::TryFutureExt; use proxy::console::messages::MetricsAuxInfo; -// use proxy::console::messages::MetricsAuxInfo; use proxy::stream::{PqStream, Stream}; use tokio::io::{AsyncRead, AsyncWrite}; @@ -106,7 +104,7 @@ async fn main() -> anyhow::Result<()> { // Start listening for incoming client connections let proxy_address: SocketAddr = args.get_one::("listen").unwrap().parse()?; - info!("Starting proxy on {proxy_address}"); + info!("Starting sni router on {proxy_address}"); let proxy_listener = TcpListener::bind(proxy_address).await?; let cancellation_token = CancellationToken::new(); @@ -240,7 +238,7 @@ async fn handle_client( .splitn(3, "--") .collect(); let port = dest[2].parse::().context("invalid port")?; - let destination = format!("{}.{}.{}:{}", dest[0], dest[1], dest_suffix, port); + let destination = format!("{}.{}.{}:{}", dest[0], dest[1], dest_suffix, port); info!("destination: {}", destination); diff --git a/proxy/src/compute.rs b/proxy/src/compute.rs index f95c00de14..07dc92c2ec 100644 --- a/proxy/src/compute.rs +++ b/proxy/src/compute.rs @@ -5,7 +5,7 @@ use pq_proto::StartupMessageParams; use std::{io, net::SocketAddr, time::Duration}; use thiserror::Error; use tokio::net::TcpStream; -use tokio_postgres::NoTls; +use tokio_postgres::tls::MakeTlsConnect; use tracing::{error, info, warn}; const COULD_NOT_CONNECT: &str = "Couldn't connect to compute node"; @@ -19,6 +19,9 @@ pub enum ConnectionError { #[error("{COULD_NOT_CONNECT}: {0}")] CouldNotConnect(#[from] io::Error), + + #[error("{COULD_NOT_CONNECT}: {0}")] + TlsError(#[from] native_tls::Error), } impl UserFacingError for ConnectionError { @@ -133,7 +136,7 @@ impl Default for ConnCfg { impl ConnCfg { /// Establish a raw TCP connection to the compute node. - async fn connect_raw(&self) -> io::Result<(SocketAddr, TcpStream)> { + async fn connect_raw(&self) -> io::Result<(SocketAddr, TcpStream, &str)> { use tokio_postgres::config::Host; // wrap TcpStream::connect with timeout @@ -186,7 +189,7 @@ impl ConnCfg { }; match connect_once(host, *port).await { - Ok(socket) => return Ok(socket), + Ok((sockaddr, stream)) => return Ok((sockaddr, stream, host)), Err(err) => { // We can't throw an error here, as there might be more hosts to try. warn!("couldn't connect to compute node at {host}:{port}: {err}"); @@ -206,7 +209,10 @@ impl ConnCfg { pub struct PostgresConnection { /// Socket connected to a compute node. - pub stream: TcpStream, + pub stream: tokio_postgres::maybe_tls_stream::MaybeTlsStream< + tokio::net::TcpStream, + postgres_native_tls::TlsStream, + >, /// PostgreSQL connection parameters. pub params: std::collections::HashMap, /// Query cancellation token. @@ -215,10 +221,22 @@ pub struct PostgresConnection { impl ConnCfg { async fn do_connect(&self) -> Result { - // TODO: establish a secure connection to the DB. - let (socket_addr, mut stream) = self.connect_raw().await?; - let (client, connection) = self.0.connect_raw(&mut stream, NoTls).await?; - info!("connected to compute node at {socket_addr}"); + let (socket_addr, stream, host) = self.connect_raw().await?; + + let tls_connector = native_tls::TlsConnector::builder() + .build() + .unwrap(); + let mut mk_tls = postgres_native_tls::MakeTlsConnector::new(tls_connector); + let tls = MakeTlsConnect::::make_tls_connect(&mut mk_tls, host)?; + + // connect_raw() will not use TLS if sslmode is "disable" + let (client, connection) = self.0.connect_raw(stream, tls).await?; + let stream = connection.stream.into_inner(); + + info!( + "connected to compute node at {host} ({socket_addr}) sslmode={:?}", + self.0.get_ssl_mode() + ); // This is very ugly but as of now there's no better way to // extract the connection parameters from tokio-postgres' connection. diff --git a/proxy/src/console/provider/mock.rs b/proxy/src/console/provider/mock.rs index eaac9c06d9..769b5cedcd 100644 --- a/proxy/src/console/provider/mock.rs +++ b/proxy/src/console/provider/mock.rs @@ -8,6 +8,7 @@ use crate::{auth::ClientCredentials, compute, error::io_error, scram, url::ApiUr use async_trait::async_trait; use futures::TryFutureExt; use thiserror::Error; +use tokio_postgres::config::SslMode; use tracing::{error, info, info_span, warn, Instrument}; #[derive(Debug, Error)] @@ -86,7 +87,8 @@ impl Api { let mut config = compute::ConnCfg::new(); config .host(self.endpoint.host_str().unwrap_or("localhost")) - .port(self.endpoint.port().unwrap_or(5432)); + .port(self.endpoint.port().unwrap_or(5432)) + .ssl_mode(SslMode::Disable); let node = NodeInfo { config, diff --git a/proxy/src/console/provider/neon.rs b/proxy/src/console/provider/neon.rs index 3644db17f7..2ae3bc5157 100644 --- a/proxy/src/console/provider/neon.rs +++ b/proxy/src/console/provider/neon.rs @@ -8,6 +8,7 @@ use super::{ use crate::{auth::ClientCredentials, compute, http, scram}; use async_trait::async_trait; use futures::TryFutureExt; +use tokio_postgres::config::SslMode; use tracing::{error, info, info_span, warn, Instrument}; #[derive(Clone)] @@ -100,7 +101,7 @@ impl Api { // We'll set username and such later using the startup message. // TODO: add more type safety (in progress). let mut config = compute::ConnCfg::new(); - config.host(host).port(port); + config.host(host).port(port).ssl_mode(SslMode::Disable); // TLS is not configured on compute nodes. let node = NodeInfo { config, From 040f736909479f65faa33e4f8bf14515871ea7a9 Mon Sep 17 00:00:00 2001 From: Stas Kelvich Date: Fri, 28 Apr 2023 02:45:23 +0300 Subject: [PATCH 09/26] remove changes in main proxy that are now not needed --- proxy/src/proxy.rs | 27 +++++++++++---------------- proxy/src/proxy/tests.rs | 9 +-------- 2 files changed, 12 insertions(+), 24 deletions(-) diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index e20c31e74c..ebc45ea1ce 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -5,7 +5,7 @@ use crate::{ auth::{self, backend::AuthSuccess}, cancellation::{self, CancelMap}, compute::{self, PostgresConnection}, - config::ProxyConfig, + config::{ProxyConfig, TlsConfig}, console::{self, messages::MetricsAuxInfo}, error::io_error, stream::{PqStream, Stream}, @@ -174,7 +174,7 @@ async fn handle_client( NUM_CONNECTIONS_CLOSED_COUNTER.inc(); } - let tls = config.tls_config.as_ref().map(|t| t.to_server_config()); + let tls = config.tls_config.as_ref(); let do_handshake = handshake(stream, tls, cancel_map); let (mut stream, params) = match do_handshake.await? { Some(x) => x, @@ -184,10 +184,7 @@ async fn handle_client( // Extract credentials which we're going to use for auth. let creds = { let sni = stream.get_ref().sni_hostname(); - let common_names = config - .tls_config - .as_ref() - .and_then(|tls| tls.common_names.clone()); + let common_names = tls.and_then(|tls| tls.common_names.clone()); let result = config .auth_backend .as_ref() @@ -208,14 +205,13 @@ async fn handle_client( /// It's easier to work with owned `stream` here as we need to upgrade it to TLS; /// we also take an extra care of propagating only the select handshake errors to client. #[tracing::instrument(skip_all)] -pub async fn handshake( +async fn handshake( stream: S, - tls: Option>, + mut tls: Option<&TlsConfig>, cancel_map: &CancelMap, ) -> anyhow::Result>, StartupMessageParams)>> { // Client may try upgrading to each protocol only once let (mut tried_ssl, mut tried_gss) = (false, false); - let mut tls_upgraded = false; let mut stream = PqStream::new(Stream::from_raw(stream)); loop { @@ -230,9 +226,8 @@ pub async fn handshake( // We can't perform TLS handshake without a config let enc = tls.is_some(); - stream.write_message(&Be::EncryptionResponse(enc)).await?; - if let Some(tls) = tls.clone() { + if let Some(tls) = tls.take() { // Upgrade raw stream into a secure TLS-backed stream. // NOTE: We've consumed `tls`; this fact will be used later. @@ -246,8 +241,7 @@ pub async fn handshake( if !read_buf.is_empty() { bail!("data is sent before server replied with EncryptionResponse"); } - stream = PqStream::new(raw.upgrade(tls).await?); - tls_upgraded = true; + stream = PqStream::new(raw.upgrade(tls.to_server_config()).await?); } } _ => bail!(ERR_PROTO_VIOLATION), @@ -262,8 +256,9 @@ pub async fn handshake( _ => bail!(ERR_PROTO_VIOLATION), }, StartupMessage { params, .. } => { - // Check that tls was actually upgraded - if !tls_upgraded { + // Check that the config has been consumed during upgrade + // OR we didn't provide it at all (for dev purposes). + if tls.is_some() { stream.throw_error_str(ERR_INSECURE_CONNECTION).await?; } @@ -345,7 +340,7 @@ async fn connect_to_compute( /// Finish client connection initialization: confirm auth success, send params, etc. #[tracing::instrument(skip_all)] -pub async fn prepare_client_connection( +async fn prepare_client_connection( node: &compute::PostgresConnection, reported_auth_ok: bool, session: cancellation::Session<'_>, diff --git a/proxy/src/proxy/tests.rs b/proxy/src/proxy/tests.rs index 3ff6a8b63f..60acb588dc 100644 --- a/proxy/src/proxy/tests.rs +++ b/proxy/src/proxy/tests.rs @@ -1,9 +1,6 @@ ///! A group of high-level tests for connection establishing logic and auth. use super::*; - -use crate::config::TlsConfig; use crate::{auth, sasl, scram}; - use async_trait::async_trait; use rstest::rstest; use tokio_postgres::config::SslMode; @@ -136,11 +133,7 @@ async fn dummy_proxy( auth: impl TestAuth + Send, ) -> anyhow::Result<()> { let cancel_map = CancelMap::default(); - let server_config = match tls { - Some(tls) => Some(tls.config), - None => None, - }; - let (mut stream, _params) = handshake(client, server_config, &cancel_map) + let (mut stream, _params) = handshake(client, tls.as_ref(), &cancel_map) .await? .context("handshake failed")?; From 9486d76b2a5ba645dd6ba2b8e2ba8b78e95f6ec2 Mon Sep 17 00:00:00 2001 From: Stas Kelvich Date: Fri, 28 Apr 2023 13:06:54 +0300 Subject: [PATCH 10/26] Add tests for link auth to compute connection --- proxy/src/auth/backend/link.rs | 1 + proxy/src/bin/proxy.rs | 16 ++++++++++ proxy/src/compute.rs | 13 ++++++-- proxy/src/config.rs | 1 + proxy/src/console/provider.rs | 3 ++ proxy/src/console/provider/mock.rs | 1 + proxy/src/console/provider/neon.rs | 1 + proxy/src/proxy.rs | 23 +++++++++++-- test_runner/fixtures/neon_fixtures.py | 32 +++++++++++++++++-- test_runner/regress/test_metric_collection.py | 4 ++- test_runner/regress/test_sni_router.py | 10 ++---- 11 files changed, 88 insertions(+), 17 deletions(-) diff --git a/proxy/src/auth/backend/link.rs b/proxy/src/auth/backend/link.rs index ee6b781fae..e588f67693 100644 --- a/proxy/src/auth/backend/link.rs +++ b/proxy/src/auth/backend/link.rs @@ -98,6 +98,7 @@ pub(super) async fn authenticate( value: NodeInfo { config, aux: db_info.aux.into(), + allow_self_signed_compute: false, // caller may override }, }) } diff --git a/proxy/src/bin/proxy.rs b/proxy/src/bin/proxy.rs index 9b5e8916fe..28e6e25317 100644 --- a/proxy/src/bin/proxy.rs +++ b/proxy/src/bin/proxy.rs @@ -10,6 +10,7 @@ use std::{borrow::Cow, net::SocketAddr}; use tokio::net::TcpListener; use tokio_util::sync::CancellationToken; use tracing::info; +use tracing::warn; use utils::{project_git_version, sentry_init::init_sentry}; project_git_version!(GIT_VERSION); @@ -96,6 +97,14 @@ fn build_config(args: &clap::ArgMatches) -> anyhow::Result<&'static ProxyConfig> _ => bail!("either both or neither tls-key and tls-cert must be specified"), }; + let allow_self_signed_compute: bool = args + .get_one::("allow-self-signed-compute") + .unwrap() + .parse()?; + if allow_self_signed_compute { + warn!("allowing self-signed compute certificates"); + } + let metric_collection = match ( args.get_one::("metric-collection-endpoint"), args.get_one::("metric-collection-interval"), @@ -145,6 +154,7 @@ fn build_config(args: &clap::ArgMatches) -> anyhow::Result<&'static ProxyConfig> tls_config, auth_backend, metric_collection, + allow_self_signed_compute, })); Ok(config) @@ -235,6 +245,12 @@ fn cli() -> clap::Command { .help("cache for `wake_compute` api method (use `size=0` to disable)") .default_value(config::CacheOptions::DEFAULT_OPTIONS_NODE_INFO), ) + .arg( + Arg::new("allow-self-signed-compute") + .long("allow-self-signed-compute") + .help("Allow self-signed certificates for compute nodes (for testing)") + .default_value("false"), + ) } #[cfg(test)] diff --git a/proxy/src/compute.rs b/proxy/src/compute.rs index 07dc92c2ec..d277940a12 100644 --- a/proxy/src/compute.rs +++ b/proxy/src/compute.rs @@ -220,10 +220,14 @@ pub struct PostgresConnection { } impl ConnCfg { - async fn do_connect(&self) -> Result { + async fn do_connect( + &self, + allow_self_signed_compute: bool, + ) -> Result { let (socket_addr, stream, host) = self.connect_raw().await?; let tls_connector = native_tls::TlsConnector::builder() + .danger_accept_invalid_certs(allow_self_signed_compute) .build() .unwrap(); let mut mk_tls = postgres_native_tls::MakeTlsConnector::new(tls_connector); @@ -257,8 +261,11 @@ impl ConnCfg { } /// Connect to a corresponding compute node. - pub async fn connect(&self) -> Result { - self.do_connect() + pub async fn connect( + &self, + allow_self_signed_compute: bool, + ) -> Result { + self.do_connect(allow_self_signed_compute) .inspect_err(|err| { // Immediately log the error we have at our disposal. error!("couldn't connect to compute node: {err}"); diff --git a/proxy/src/config.rs b/proxy/src/config.rs index 0ceb556ca1..530229b3fd 100644 --- a/proxy/src/config.rs +++ b/proxy/src/config.rs @@ -12,6 +12,7 @@ pub struct ProxyConfig { pub tls_config: Option, pub auth_backend: auth::BackendType<'static, ()>, pub metric_collection: Option, + pub allow_self_signed_compute: bool, } #[derive(Debug)] diff --git a/proxy/src/console/provider.rs b/proxy/src/console/provider.rs index 80cd94d483..44e23e0adf 100644 --- a/proxy/src/console/provider.rs +++ b/proxy/src/console/provider.rs @@ -170,6 +170,9 @@ pub struct NodeInfo { /// Labels for proxy's metrics. pub aux: Arc, + + /// Whether we should accept self-signed certificates (for testing) + pub allow_self_signed_compute: bool, } pub type NodeInfoCache = TimedLru, NodeInfo>; diff --git a/proxy/src/console/provider/mock.rs b/proxy/src/console/provider/mock.rs index 769b5cedcd..3b42c73a34 100644 --- a/proxy/src/console/provider/mock.rs +++ b/proxy/src/console/provider/mock.rs @@ -93,6 +93,7 @@ impl Api { let node = NodeInfo { config, aux: Default::default(), + allow_self_signed_compute: false, }; Ok(node) diff --git a/proxy/src/console/provider/neon.rs b/proxy/src/console/provider/neon.rs index 2ae3bc5157..a8e855b2c8 100644 --- a/proxy/src/console/provider/neon.rs +++ b/proxy/src/console/provider/neon.rs @@ -106,6 +106,7 @@ impl Api { let node = NodeInfo { config, aux: body.aux.into(), + allow_self_signed_compute: false, }; Ok(node) diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index ebc45ea1ce..f3d3524d30 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -155,7 +155,7 @@ pub async fn handle_ws_client( async { result }.or_else(|e| stream.throw_error(e)).await? }; - let client = Client::new(stream, creds, ¶ms, session_id); + let client = Client::new(stream, creds, ¶ms, session_id, false); cancel_map .with_session(|session| client.connect_to_db(session, true)) .await @@ -194,7 +194,15 @@ async fn handle_client( async { result }.or_else(|e| stream.throw_error(e)).await? }; - let client = Client::new(stream, creds, ¶ms, session_id); + let allow_self_signed_compute = config.allow_self_signed_compute; + + let client = Client::new( + stream, + creds, + ¶ms, + session_id, + allow_self_signed_compute, + ); cancel_map .with_session(|session| client.connect_to_db(session, false)) .await @@ -297,9 +305,11 @@ async fn connect_to_compute_once( NUM_CONNECTION_FAILURES.with_label_values(&[label]).inc(); }; + let allow_self_signed_compute = node_info.allow_self_signed_compute; + node_info .config - .connect() + .connect(allow_self_signed_compute) .inspect_err(invalidate_cache) .await } @@ -420,6 +430,8 @@ struct Client<'a, S> { params: &'a StartupMessageParams, /// Unique connection ID. session_id: uuid::Uuid, + /// Allow self-signed certificates (for testing). + allow_self_signed_compute: bool, } impl<'a, S> Client<'a, S> { @@ -429,12 +441,14 @@ impl<'a, S> Client<'a, S> { creds: auth::BackendType<'a, auth::ClientCredentials<'a>>, params: &'a StartupMessageParams, session_id: uuid::Uuid, + allow_self_signed_compute: bool, ) -> Self { Self { stream, creds, params, session_id, + allow_self_signed_compute, } } } @@ -451,6 +465,7 @@ impl Client<'_, S> { mut creds, params, session_id, + allow_self_signed_compute, } = self; let extra = console::ConsoleReqExtra { @@ -473,6 +488,8 @@ impl Client<'_, S> { value: mut node_info, } = auth_result; + node_info.allow_self_signed_compute = allow_self_signed_compute; + let mut node = connect_to_compute(&mut node_info, params, &extra, &creds) .or_else(|e| stream.throw_error(e)) .await?; diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index c71a063cb4..a0914794b2 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1820,6 +1820,24 @@ class VanillaPostgres(PgProtocol): self.pg_bin.run_capture(["initdb", "-D", str(pgdatadir)]) self.configure([f"port = {port}\n"]) + def enable_tls(self): + assert not self.running + # generate self-signed certificate + subprocess.run( + ["openssl", "req", "-new", "-x509", "-days", "365", "-nodes", "-text", + "-out", self.pgdatadir / "server.crt", + "-keyout", self.pgdatadir / "server.key", + "-subj", "/CN=localhost"] + ) + # configure postgresql.conf + self.configure( + [ + "ssl = on", + "ssl_cert_file = 'server.crt'", + "ssl_key_file = 'server.key'", + ] + ) + def configure(self, options: List[str]): """Append lines into postgresql.conf file.""" assert not self.running @@ -1992,6 +2010,7 @@ class NeonProxy(PgProtocol): # Link auth backend params *["--auth-backend", "link"], *["--uri", NeonProxy.link_auth_uri], + *["--allow-self-signed-compute", "true"], ] @dataclass(frozen=True) @@ -2012,6 +2031,7 @@ class NeonProxy(PgProtocol): def __init__( self, neon_binpath: Path, + test_output_dir: Path, proxy_port: int, http_port: int, mgmt_port: int, @@ -2025,6 +2045,7 @@ class NeonProxy(PgProtocol): self.host = host self.http_port = http_port self.neon_binpath = neon_binpath + self.test_output_dir = test_output_dir self.proxy_port = proxy_port self.mgmt_port = mgmt_port self.auth_backend = auth_backend @@ -2051,7 +2072,8 @@ class NeonProxy(PgProtocol): *["--metric-collection-interval", self.metric_collection_interval], ] - self._popen = subprocess.Popen(args) + logfile = open(self.test_output_dir / "proxy.log", "w") + self._popen = subprocess.Popen(args, stdout=logfile, stderr=logfile) self._wait_until_ready() return self @@ -2119,6 +2141,7 @@ class NeonProxy(PgProtocol): if create_user: log.info("creating a new user for link auth test") + local_vanilla_pg.enable_tls() local_vanilla_pg.start() local_vanilla_pg.safe_psql(f"create user {pg_user} with login superuser") @@ -2152,7 +2175,7 @@ class NeonProxy(PgProtocol): @pytest.fixture(scope="function") -def link_proxy(port_distributor: PortDistributor, neon_binpath: Path) -> Iterator[NeonProxy]: +def link_proxy(port_distributor: PortDistributor, neon_binpath: Path, test_output_dir: Path) -> Iterator[NeonProxy]: """Neon proxy that routes through link auth.""" http_port = port_distributor.get_port() @@ -2161,6 +2184,7 @@ def link_proxy(port_distributor: PortDistributor, neon_binpath: Path) -> Iterato with NeonProxy( neon_binpath=neon_binpath, + test_output_dir=test_output_dir, proxy_port=proxy_port, http_port=http_port, mgmt_port=mgmt_port, @@ -2172,7 +2196,8 @@ def link_proxy(port_distributor: PortDistributor, neon_binpath: Path) -> Iterato @pytest.fixture(scope="function") def static_proxy( - vanilla_pg: VanillaPostgres, port_distributor: PortDistributor, neon_binpath: Path + vanilla_pg: VanillaPostgres, port_distributor: PortDistributor, neon_binpath: Path, + test_output_dir: Path ) -> Iterator[NeonProxy]: """Neon proxy that routes directly to vanilla postgres.""" @@ -2191,6 +2216,7 @@ def static_proxy( with NeonProxy( neon_binpath=neon_binpath, + test_output_dir=test_output_dir, proxy_port=proxy_port, http_port=http_port, mgmt_port=mgmt_port, diff --git a/test_runner/regress/test_metric_collection.py b/test_runner/regress/test_metric_collection.py index ecbce1f8f7..d596e32a8a 100644 --- a/test_runner/regress/test_metric_collection.py +++ b/test_runner/regress/test_metric_collection.py @@ -201,7 +201,8 @@ def proxy_metrics_handler(request: Request) -> Response: @pytest.fixture(scope="session") def proxy_with_metric_collector( - port_distributor: PortDistributor, neon_binpath: Path, httpserver_listen_address + port_distributor: PortDistributor, neon_binpath: Path, httpserver_listen_address, + test_output_dir: Path ) -> Iterator[NeonProxy]: """Neon proxy that routes through link auth and has metric collection enabled.""" @@ -215,6 +216,7 @@ def proxy_with_metric_collector( with NeonProxy( neon_binpath=neon_binpath, + test_output_dir=test_output_dir, proxy_port=proxy_port, http_port=http_port, mgmt_port=mgmt_port, diff --git a/test_runner/regress/test_sni_router.py b/test_runner/regress/test_sni_router.py index 334b587c38..59b64492c6 100644 --- a/test_runner/regress/test_sni_router.py +++ b/test_runner/regress/test_sni_router.py @@ -37,7 +37,6 @@ class PgSniRouter(PgProtocol): neon_binpath: Path, port: int, destination: str, - destination_port: int, tls_cert: Path, tls_key: Path, ): @@ -49,7 +48,6 @@ class PgSniRouter(PgProtocol): self.neon_binpath = neon_binpath self.port = port self.destination = destination - self.destination_port = destination_port self.tls_cert = tls_cert self.tls_key = tls_key self._popen: Optional[subprocess.Popen[bytes]] = None @@ -62,7 +60,6 @@ class PgSniRouter(PgProtocol): *["--tls-cert", self.tls_cert], *["--tls-key", self.tls_key], *["--destination", self.destination], - *["--destination-port", str(self.destination_port)], ] self._popen = subprocess.Popen(args) @@ -110,7 +107,7 @@ def test_pg_sni_router( ): generate_tls_cert( - "external.test", test_output_dir / "router.crt", test_output_dir / "router.key" + "endpoint.namespace.localtest.me", test_output_dir / "router.crt", test_output_dir / "router.key" ) # Start a stand-alone Postgres to test with @@ -122,8 +119,7 @@ def test_pg_sni_router( with PgSniRouter( neon_binpath=neon_binpath, port=router_port, - destination="localhost", - destination_port=pg_port, + destination="localtest.me", tls_cert=test_output_dir / "router.crt", tls_key=test_output_dir / "router.key", ) as router: @@ -133,7 +129,7 @@ def test_pg_sni_router( "select 1", dbname="postgres", sslmode="require", - host="localhost.external.test", + host=f"endpoint--namespace--{pg_port}.localtest.me", hostaddr="127.0.0.1", ) assert out[0][0] == 1 From 4ac6a9f0894eb90afdc8740a9fa6707656af47ba Mon Sep 17 00:00:00 2001 From: Stas Kelvich Date: Fri, 28 Apr 2023 13:11:02 +0300 Subject: [PATCH 11/26] add backward compatibility to proxy --- proxy/src/auth/backend/link.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/proxy/src/auth/backend/link.rs b/proxy/src/auth/backend/link.rs index e588f67693..da43cf11c4 100644 --- a/proxy/src/auth/backend/link.rs +++ b/proxy/src/auth/backend/link.rs @@ -86,8 +86,17 @@ pub(super) async fn authenticate( .host(&db_info.host) .port(db_info.port) .dbname(&db_info.dbname) - .user(&db_info.user) - .ssl_mode(SslMode::Require); // we need TLS connection with SNI to properly route it + .user(&db_info.user); + + // Backwards compatibility. pg_sni_proxy uses "--" in domain names + // while direct connections do not. Once we migrate to pg_sni_proxy + // everywhere, we can remove this. + if db_info.host.contains("--") { + // we need TLS connection with SNI info to properly route it + config.ssl_mode(SslMode::Require); + } else { + config.ssl_mode(SslMode::Disable); + } if let Some(password) = db_info.password { config.password(password.as_ref()); From 0364f77b9a85d298c772946b4fdfa67cc936cadb Mon Sep 17 00:00:00 2001 From: Stas Kelvich Date: Fri, 28 Apr 2023 13:51:11 +0300 Subject: [PATCH 12/26] fix python styling --- test_runner/fixtures/neon_fixtures.py | 30 ++++++++++++++----- test_runner/regress/test_metric_collection.py | 6 ++-- test_runner/regress/test_sni_router.py | 13 ++++---- 3 files changed, 33 insertions(+), 16 deletions(-) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index a0914794b2..67838ca218 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1824,10 +1824,22 @@ class VanillaPostgres(PgProtocol): assert not self.running # generate self-signed certificate subprocess.run( - ["openssl", "req", "-new", "-x509", "-days", "365", "-nodes", "-text", - "-out", self.pgdatadir / "server.crt", - "-keyout", self.pgdatadir / "server.key", - "-subj", "/CN=localhost"] + [ + "openssl", + "req", + "-new", + "-x509", + "-days", + "365", + "-nodes", + "-text", + "-out", + self.pgdatadir / "server.crt", + "-keyout", + self.pgdatadir / "server.key", + "-subj", + "/CN=localhost", + ] ) # configure postgresql.conf self.configure( @@ -2175,7 +2187,9 @@ class NeonProxy(PgProtocol): @pytest.fixture(scope="function") -def link_proxy(port_distributor: PortDistributor, neon_binpath: Path, test_output_dir: Path) -> Iterator[NeonProxy]: +def link_proxy( + port_distributor: PortDistributor, neon_binpath: Path, test_output_dir: Path +) -> Iterator[NeonProxy]: """Neon proxy that routes through link auth.""" http_port = port_distributor.get_port() @@ -2196,8 +2210,10 @@ def link_proxy(port_distributor: PortDistributor, neon_binpath: Path, test_outpu @pytest.fixture(scope="function") def static_proxy( - vanilla_pg: VanillaPostgres, port_distributor: PortDistributor, neon_binpath: Path, - test_output_dir: Path + vanilla_pg: VanillaPostgres, + port_distributor: PortDistributor, + neon_binpath: Path, + test_output_dir: Path, ) -> Iterator[NeonProxy]: """Neon proxy that routes directly to vanilla postgres.""" diff --git a/test_runner/regress/test_metric_collection.py b/test_runner/regress/test_metric_collection.py index d596e32a8a..54cd86dc84 100644 --- a/test_runner/regress/test_metric_collection.py +++ b/test_runner/regress/test_metric_collection.py @@ -201,8 +201,10 @@ def proxy_metrics_handler(request: Request) -> Response: @pytest.fixture(scope="session") def proxy_with_metric_collector( - port_distributor: PortDistributor, neon_binpath: Path, httpserver_listen_address, - test_output_dir: Path + port_distributor: PortDistributor, + neon_binpath: Path, + httpserver_listen_address, + test_output_dir: Path, ) -> Iterator[NeonProxy]: """Neon proxy that routes through link auth and has metric collection enabled.""" diff --git a/test_runner/regress/test_sni_router.py b/test_runner/regress/test_sni_router.py index 59b64492c6..d930f8cd4c 100644 --- a/test_runner/regress/test_sni_router.py +++ b/test_runner/regress/test_sni_router.py @@ -2,11 +2,9 @@ import socket import subprocess from pathlib import Path from types import TracebackType -from typing import Any, Dict, Iterator, List, Optional, Tuple, Type, Union, cast +from typing import Optional, Type import backoff # type: ignore -import psycopg2 -import pytest from fixtures.log_helper import log from fixtures.neon_fixtures import PgProtocol, PortDistributor, VanillaPostgres @@ -57,8 +55,8 @@ class PgSniRouter(PgProtocol): args = [ str(self.neon_binpath / "pg_sni_router"), *["--listen", f"127.0.0.1:{self.port}"], - *["--tls-cert", self.tls_cert], - *["--tls-key", self.tls_key], + *["--tls-cert", str(self.tls_cert)], + *["--tls-key", str(self.tls_key)], *["--destination", self.destination], ] @@ -105,9 +103,10 @@ def test_pg_sni_router( neon_binpath: Path, test_output_dir: Path, ): - generate_tls_cert( - "endpoint.namespace.localtest.me", test_output_dir / "router.crt", test_output_dir / "router.key" + "endpoint.namespace.localtest.me", + test_output_dir / "router.crt", + test_output_dir / "router.key", ) # Start a stand-alone Postgres to test with From 5bb971d64e69739bceaf870d25d634811830bdec Mon Sep 17 00:00:00 2001 From: Stas Kelvich Date: Fri, 28 Apr 2023 14:16:04 +0300 Subject: [PATCH 13/26] fix more python tests --- test_runner/fixtures/neon_fixtures.py | 2 +- test_runner/regress/test_metric_collection.py | 2 +- test_runner/regress/test_sni_router.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 67838ca218..af7571cc4d 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2142,7 +2142,7 @@ class NeonProxy(PgProtocol): try: self._popen.wait(timeout=5) except subprocess.TimeoutExpired: - log.warn("failed to gracefully terminate proxy; killing") + log.warning("failed to gracefully terminate proxy; killing") self._popen.kill() @staticmethod diff --git a/test_runner/regress/test_metric_collection.py b/test_runner/regress/test_metric_collection.py index 54cd86dc84..df542fb84a 100644 --- a/test_runner/regress/test_metric_collection.py +++ b/test_runner/regress/test_metric_collection.py @@ -199,7 +199,7 @@ def proxy_metrics_handler(request: Request) -> Response: return Response(status=200) -@pytest.fixture(scope="session") +@pytest.fixture(scope="function") def proxy_with_metric_collector( port_distributor: PortDistributor, neon_binpath: Path, diff --git a/test_runner/regress/test_sni_router.py b/test_runner/regress/test_sni_router.py index d930f8cd4c..64cfd017e6 100644 --- a/test_runner/regress/test_sni_router.py +++ b/test_runner/regress/test_sni_router.py @@ -93,7 +93,7 @@ class PgSniRouter(PgProtocol): try: self._popen.wait(timeout=5) except subprocess.TimeoutExpired: - log.warn("failed to gracefully terminate pg_sni_router; killing") + log.warning("failed to gracefully terminate pg_sni_router; killing") self._popen.kill() From b1329db495df18431b8a8a47bbf0bb7c213c24c9 Mon Sep 17 00:00:00 2001 From: Stas Kelvich Date: Fri, 28 Apr 2023 16:02:01 +0300 Subject: [PATCH 14/26] fix sigterm handling --- proxy/src/bin/pg_sni_router.rs | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/proxy/src/bin/pg_sni_router.rs b/proxy/src/bin/pg_sni_router.rs index 3b8852ecc8..bba2d51caf 100644 --- a/proxy/src/bin/pg_sni_router.rs +++ b/proxy/src/bin/pg_sni_router.rs @@ -108,17 +108,19 @@ async fn main() -> anyhow::Result<()> { let proxy_listener = TcpListener::bind(proxy_address).await?; let cancellation_token = CancellationToken::new(); - let tasks = vec![ - tokio::spawn(proxy::handle_signals(cancellation_token.clone())), - tokio::spawn(task_main( - Arc::new(destination), - tls_config, - proxy_listener, - cancellation_token.clone(), - )), - ]; - let _tasks = futures::future::try_join_all(tasks.into_iter().map(proxy::flatten_err)).await?; + let main = proxy::flatten_err(tokio::spawn(task_main( + Arc::new(destination), + tls_config, + proxy_listener, + cancellation_token.clone(), + ))); + let signals_task = proxy::flatten_err(tokio::spawn(proxy::handle_signals(cancellation_token))); + + tokio::select! { + res = main => { res?; }, + res = signals_task => { res?; }, + } Ok(()) } @@ -129,10 +131,6 @@ async fn task_main( listener: tokio::net::TcpListener, cancellation_token: CancellationToken, ) -> anyhow::Result<()> { - scopeguard::defer! { - info!("proxy has shut down"); - } - // When set for the server socket, the keepalive setting // will be inherited by all accepted client sockets. socket2::SockRef::from(&listener).set_keepalive(true)?; @@ -171,7 +169,9 @@ async fn task_main( } } } + // Drain connections + info!("waiting for all client connections to finish"); while let Some(res) = connections.join_next().await { if let Err(e) = res { if !e.is_panic() && !e.is_cancelled() { @@ -179,6 +179,7 @@ async fn task_main( } } } + info!("all client connections have finished"); Ok(()) } From 94d612195ad9dc88c06e59c58726eab3fe32ad85 Mon Sep 17 00:00:00 2001 From: Stas Kelvich Date: Fri, 28 Apr 2023 16:09:48 +0300 Subject: [PATCH 15/26] bump rust-postgres version, after merging PR in rust-postgres --- Cargo.lock | 10 +++++----- Cargo.toml | 12 ++++++------ 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b3705303e8..d79837a831 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2924,7 +2924,7 @@ dependencies = [ [[package]] name = "postgres" version = "0.19.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=27fc5729cc71a042e48eb980c8e736762138efe6#27fc5729cc71a042e48eb980c8e736762138efe6" +source = "git+https://github.com/neondatabase/rust-postgres.git?rev=0bc41d8503c092b040142214aac3cf7d11d0c19f#0bc41d8503c092b040142214aac3cf7d11d0c19f" dependencies = [ "bytes", "fallible-iterator", @@ -2937,7 +2937,7 @@ dependencies = [ [[package]] name = "postgres-native-tls" version = "0.5.0" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=27fc5729cc71a042e48eb980c8e736762138efe6#27fc5729cc71a042e48eb980c8e736762138efe6" +source = "git+https://github.com/neondatabase/rust-postgres.git?rev=0bc41d8503c092b040142214aac3cf7d11d0c19f#0bc41d8503c092b040142214aac3cf7d11d0c19f" dependencies = [ "native-tls", "tokio", @@ -2948,7 +2948,7 @@ dependencies = [ [[package]] name = "postgres-protocol" version = "0.6.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=27fc5729cc71a042e48eb980c8e736762138efe6#27fc5729cc71a042e48eb980c8e736762138efe6" +source = "git+https://github.com/neondatabase/rust-postgres.git?rev=0bc41d8503c092b040142214aac3cf7d11d0c19f#0bc41d8503c092b040142214aac3cf7d11d0c19f" dependencies = [ "base64 0.20.0", "byteorder", @@ -2966,7 +2966,7 @@ dependencies = [ [[package]] name = "postgres-types" version = "0.2.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=27fc5729cc71a042e48eb980c8e736762138efe6#27fc5729cc71a042e48eb980c8e736762138efe6" +source = "git+https://github.com/neondatabase/rust-postgres.git?rev=0bc41d8503c092b040142214aac3cf7d11d0c19f#0bc41d8503c092b040142214aac3cf7d11d0c19f" dependencies = [ "bytes", "fallible-iterator", @@ -4422,7 +4422,7 @@ dependencies = [ [[package]] name = "tokio-postgres" version = "0.7.7" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=27fc5729cc71a042e48eb980c8e736762138efe6#27fc5729cc71a042e48eb980c8e736762138efe6" +source = "git+https://github.com/neondatabase/rust-postgres.git?rev=0bc41d8503c092b040142214aac3cf7d11d0c19f#0bc41d8503c092b040142214aac3cf7d11d0c19f" dependencies = [ "async-trait", "byteorder", diff --git a/Cargo.toml b/Cargo.toml index bdfa07f53e..a18236d09a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -125,11 +125,11 @@ env_logger = "0.10" log = "0.4" ## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed -postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="27fc5729cc71a042e48eb980c8e736762138efe6" } -postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="27fc5729cc71a042e48eb980c8e736762138efe6" } -postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="27fc5729cc71a042e48eb980c8e736762138efe6" } -postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="27fc5729cc71a042e48eb980c8e736762138efe6" } -tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="27fc5729cc71a042e48eb980c8e736762138efe6" } +postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="0bc41d8503c092b040142214aac3cf7d11d0c19f" } +postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="0bc41d8503c092b040142214aac3cf7d11d0c19f" } +postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="0bc41d8503c092b040142214aac3cf7d11d0c19f" } +postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="0bc41d8503c092b040142214aac3cf7d11d0c19f" } +tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="0bc41d8503c092b040142214aac3cf7d11d0c19f" } tokio-tar = { git = "https://github.com/neondatabase/tokio-tar.git", rev="404df61437de0feef49ba2ccdbdd94eb8ad6e142" } ## Other git libraries @@ -164,7 +164,7 @@ tonic-build = "0.9" # This is only needed for proxy's tests. # TODO: we should probably fork `tokio-postgres-rustls` instead. [patch.crates-io] -tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="27fc5729cc71a042e48eb980c8e736762138efe6" } +tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="0bc41d8503c092b040142214aac3cf7d11d0c19f" } ################# Binary contents sections From ec53c5ca2ecb79be760d604f53dec49595ec5605 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Fri, 28 Apr 2023 17:20:18 +0300 Subject: [PATCH 16/26] revert: "Add check for duplicates of generated image layers" (#4104) This reverts commit 732acc5. Reverted PR: #3869 As noted in PR #4094, we do in fact try to insert duplicates to the layer map, if L0->L1 compaction is interrupted. We do not have a proper fix for that right now, and we are in a hurry to make a release to production, so revert the changes related to this to the state that we have in production currently. We know that we have a bug here, but better to live with the bug that we've had in production for a long time, than rush a fix to production without testing it in staging first. Cc: #4094, #4088 --- pageserver/benches/bench_layer_map.rs | 4 +-- pageserver/src/tenant.rs | 7 +----- pageserver/src/tenant/layer_map.rs | 23 +++++++---------- .../layer_map/historic_layer_coverage.rs | 8 ------ pageserver/src/tenant/timeline.rs | 25 +++++++------------ 5 files changed, 21 insertions(+), 46 deletions(-) diff --git a/pageserver/benches/bench_layer_map.rs b/pageserver/benches/bench_layer_map.rs index 8f139a6596..ee5980212e 100644 --- a/pageserver/benches/bench_layer_map.rs +++ b/pageserver/benches/bench_layer_map.rs @@ -33,7 +33,7 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap { min_lsn = min(min_lsn, lsn_range.start); max_lsn = max(max_lsn, Lsn(lsn_range.end.0 - 1)); - updates.insert_historic(Arc::new(layer)).unwrap(); + updates.insert_historic(Arc::new(layer)); } println!("min: {min_lsn}, max: {max_lsn}"); @@ -215,7 +215,7 @@ fn bench_sequential(c: &mut Criterion) { is_incremental: false, short_id: format!("Layer {}", i), }; - updates.insert_historic(Arc::new(layer)).unwrap(); + updates.insert_historic(Arc::new(layer)); } updates.flush(); println!("Finished layer map init in {:?}", now.elapsed()); diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index d69d5e4b45..5cfc466111 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -271,10 +271,7 @@ impl UninitializedTimeline<'_> { .await .context("Failed to flush after basebackup import")?; - // Initialize without loading the layer map. We started with an empty layer map, and already - // updated it for the layers that we created during the import. - let mut timelines = self.owning_tenant.timelines.lock().unwrap(); - self.initialize_with_lock(ctx, &mut timelines, false, true) + self.initialize(ctx) } fn raw_timeline(&self) -> anyhow::Result<&Arc> { @@ -2355,8 +2352,6 @@ impl Tenant { ) })?; - // Initialize the timeline without loading the layer map, because we already updated the layer - // map above, when we imported the datadir. let timeline = { let mut timelines = self.timelines.lock().unwrap(); raw_timeline.initialize_with_lock(ctx, &mut timelines, false, true)? diff --git a/pageserver/src/tenant/layer_map.rs b/pageserver/src/tenant/layer_map.rs index 0ee0c6f77d..8d06ccd565 100644 --- a/pageserver/src/tenant/layer_map.rs +++ b/pageserver/src/tenant/layer_map.rs @@ -51,7 +51,7 @@ use crate::keyspace::KeyPartitioning; use crate::repository::Key; use crate::tenant::storage_layer::InMemoryLayer; use crate::tenant::storage_layer::Layer; -use anyhow::{bail, Result}; +use anyhow::Result; use std::collections::VecDeque; use std::ops::Range; use std::sync::Arc; @@ -125,7 +125,7 @@ where /// /// Insert an on-disk layer. /// - pub fn insert_historic(&mut self, layer: Arc) -> anyhow::Result<()> { + pub fn insert_historic(&mut self, layer: Arc) { self.layer_map.insert_historic_noflush(layer) } @@ -273,21 +273,16 @@ where /// /// Helper function for BatchedUpdates::insert_historic /// - pub(self) fn insert_historic_noflush(&mut self, layer: Arc) -> anyhow::Result<()> { - let key = historic_layer_coverage::LayerKey::from(&*layer); - if self.historic.contains(&key) { - bail!( - "Attempt to insert duplicate layer {} in layer map", - layer.short_id() - ); - } - self.historic.insert(key, Arc::clone(&layer)); + pub(self) fn insert_historic_noflush(&mut self, layer: Arc) { + // TODO: See #3869, resulting #4088, attempted fix and repro #4094 + self.historic.insert( + historic_layer_coverage::LayerKey::from(&*layer), + Arc::clone(&layer), + ); if Self::is_l0(&layer) { self.l0_delta_layers.push(layer); } - - Ok(()) } /// @@ -839,7 +834,7 @@ mod tests { let expected_in_counts = (1, usize::from(expected_l0)); - map.batch_update().insert_historic(remote.clone()).unwrap(); + map.batch_update().insert_historic(remote.clone()); assert_eq!(count_layer_in(&map, &remote), expected_in_counts); let replaced = map diff --git a/pageserver/src/tenant/layer_map/historic_layer_coverage.rs b/pageserver/src/tenant/layer_map/historic_layer_coverage.rs index 1fdcd5e5a4..b63c361314 100644 --- a/pageserver/src/tenant/layer_map/historic_layer_coverage.rs +++ b/pageserver/src/tenant/layer_map/historic_layer_coverage.rs @@ -417,14 +417,6 @@ impl BufferedHistoricLayerCoverage { } } - pub fn contains(&self, layer_key: &LayerKey) -> bool { - match self.buffer.get(layer_key) { - Some(None) => false, // layer remove was buffered - Some(_) => true, // layer insert was buffered - None => self.layers.contains_key(layer_key), // no buffered ops for this layer - } - } - pub fn insert(&mut self, layer_key: LayerKey, value: Value) { self.buffer.insert(layer_key, Some(value)); } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 5c671ffd63..8768841d87 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1484,7 +1484,7 @@ impl Timeline { trace!("found layer {}", layer.path().display()); total_physical_size += file_size; - updates.insert_historic(Arc::new(layer))?; + updates.insert_historic(Arc::new(layer)); num_layers += 1; } else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) { // Create a DeltaLayer struct for each delta file. @@ -1516,7 +1516,7 @@ impl Timeline { trace!("found layer {}", layer.path().display()); total_physical_size += file_size; - updates.insert_historic(Arc::new(layer))?; + updates.insert_historic(Arc::new(layer)); num_layers += 1; } else if fname == METADATA_FILE_NAME || fname.ends_with(".old") { // ignore these @@ -1590,7 +1590,7 @@ impl Timeline { // remote index file? // If so, rename_to_backup those files & replace their local layer with // a RemoteLayer in the layer map so that we re-download them on-demand. - if let Some(local_layer) = &local_layer { + if let Some(local_layer) = local_layer { let local_layer_path = local_layer .local_path() .expect("caller must ensure that local_layers only contains local layers"); @@ -1615,6 +1615,7 @@ impl Timeline { anyhow::bail!("could not rename file {local_layer_path:?}: {err:?}"); } else { self.metrics.resident_physical_size_gauge.sub(local_size); + updates.remove_historic(local_layer); // fall-through to adding the remote layer } } else { @@ -1650,11 +1651,7 @@ impl Timeline { ); let remote_layer = Arc::new(remote_layer); - if let Some(local_layer) = &local_layer { - updates.replace_historic(local_layer, remote_layer)?; - } else { - updates.insert_historic(remote_layer)?; - } + updates.insert_historic(remote_layer); } LayerFileName::Delta(deltafilename) => { // Create a RemoteLayer for the delta file. @@ -1678,11 +1675,7 @@ impl Timeline { LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted), ); let remote_layer = Arc::new(remote_layer); - if let Some(local_layer) = &local_layer { - updates.replace_historic(local_layer, remote_layer)?; - } else { - updates.insert_historic(remote_layer)?; - } + updates.insert_historic(remote_layer); } } } @@ -2730,7 +2723,7 @@ impl Timeline { .write() .unwrap() .batch_update() - .insert_historic(Arc::new(new_delta))?; + .insert_historic(Arc::new(new_delta)); // update the timeline's physical size let sz = new_delta_path.metadata()?.len(); @@ -2935,7 +2928,7 @@ impl Timeline { self.metrics .resident_physical_size_gauge .add(metadata.len()); - updates.insert_historic(Arc::new(l))?; + updates.insert_historic(Arc::new(l)); } updates.flush(); drop(layers); @@ -3368,7 +3361,7 @@ impl Timeline { new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len())); let x: Arc = Arc::new(l); - updates.insert_historic(x)?; + updates.insert_historic(x); } // Now that we have reshuffled the data to set of new delta layers, we can From 8543485e9201da72a5be727ef55fef754ebd3009 Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Fri, 28 Apr 2023 17:20:46 +0300 Subject: [PATCH 17/26] Pull clone timeline from peer safekeepers (#4089) Add HTTP endpoint to initialize safekeeper timeline from peer safekeepers. This is useful for initializing new safekeeper to replace failed safekeeper. Not fully "correct" in all cases, but should work in most. This code is not suitable for production workloads but can be tested on staging to get started. New endpoint is separated from usual cases and should not affect anything if no one explicitly uses a new endpoint. We can rollback this commit in case of issues. --- Cargo.lock | 2 + safekeeper/Cargo.toml | 6 +- safekeeper/src/http/routes.rs | 52 ++++- safekeeper/src/lib.rs | 1 + safekeeper/src/pull_timeline.rs | 240 +++++++++++++++++++++++ safekeeper/src/timeline.rs | 6 +- safekeeper/src/timelines_global_map.rs | 20 ++ safekeeper/src/wal_storage.rs | 2 +- test_runner/fixtures/neon_fixtures.py | 7 + test_runner/regress/test_wal_acceptor.py | 95 +++++++++ 10 files changed, 424 insertions(+), 7 deletions(-) create mode 100644 safekeeper/src/pull_timeline.rs diff --git a/Cargo.lock b/Cargo.lock index d79837a831..bc63cb0442 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3657,6 +3657,7 @@ dependencies = [ "const_format", "crc32c", "fs2", + "futures", "git-version", "hex", "humantime", @@ -3671,6 +3672,7 @@ dependencies = [ "pq_proto", "regex", "remote_storage", + "reqwest", "safekeeper_api", "serde", "serde_json", diff --git a/safekeeper/Cargo.toml b/safekeeper/Cargo.toml index 00cd111da5..b6e8497809 100644 --- a/safekeeper/Cargo.toml +++ b/safekeeper/Cargo.toml @@ -19,11 +19,13 @@ git-version.workspace = true hex.workspace = true humantime.workspace = true hyper.workspace = true +futures.workspace = true once_cell.workspace = true parking_lot.workspace = true postgres.workspace = true postgres-protocol.workspace = true regex.workspace = true +reqwest = { workspace = true, features = ["json"] } serde.workspace = true serde_json.workspace = true serde_with.workspace = true @@ -33,6 +35,7 @@ tokio = { workspace = true, features = ["fs"] } tokio-io-timeout.workspace = true tokio-postgres.workspace = true toml_edit.workspace = true +tempfile.workspace = true tracing.workspace = true url.workspace = true metrics.workspace = true @@ -45,6 +48,3 @@ storage_broker.workspace = true utils.workspace = true workspace_hack.workspace = true - -[dev-dependencies] -tempfile.workspace = true diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index eeb08d2733..a498d868af 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -11,11 +11,13 @@ use std::str::FromStr; use std::sync::Arc; use storage_broker::proto::SafekeeperTimelineInfo; use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId; +use tokio::fs::File; +use tokio::io::AsyncReadExt; use tokio::task::JoinError; -use crate::debug_dump; use crate::safekeeper::ServerInfo; use crate::safekeeper::Term; +use crate::{debug_dump, pull_timeline}; use crate::timelines_global_map::TimelineDeleteForceResult; use crate::GlobalTimelines; @@ -177,6 +179,49 @@ async fn timeline_create_handler(mut request: Request) -> Result) -> Result, ApiError> { + check_permission(&request, None)?; + + let data: pull_timeline::Request = json_request(&mut request).await?; + + let resp = pull_timeline::handle_request(data) + .await + .map_err(ApiError::InternalServerError)?; + json_response(StatusCode::OK, resp) +} + +/// Download a file from the timeline directory. +// TODO: figure out a better way to copy files between safekeepers +async fn timeline_files_handler(request: Request) -> Result, ApiError> { + let ttid = TenantTimelineId::new( + parse_request_param(&request, "tenant_id")?, + parse_request_param(&request, "timeline_id")?, + ); + check_permission(&request, Some(ttid.tenant_id))?; + + let filename: String = parse_request_param(&request, "filename")?; + + let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?; + + let filepath = tli.timeline_dir.join(filename); + let mut file = File::open(&filepath) + .await + .map_err(|e| ApiError::InternalServerError(e.into()))?; + + let mut content = Vec::new(); + // TODO: don't store files in memory + file.read_to_end(&mut content) + .await + .map_err(|e| ApiError::InternalServerError(e.into()))?; + + Response::builder() + .status(StatusCode::OK) + .header("Content-Type", "application/octet-stream") + .body(Body::from(content)) + .map_err(|e| ApiError::InternalServerError(e.into())) +} + /// Deactivates the timeline and removes its data directory. async fn timeline_delete_force_handler( mut request: Request, @@ -353,6 +398,11 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder timeline_delete_force_handler, ) .delete("/v1/tenant/:tenant_id", tenant_delete_force_handler) + .post("/v1/pull_timeline", timeline_pull_handler) + .get( + "/v1/tenant/:tenant_id/timeline/:timeline_id/file/:filename", + timeline_files_handler, + ) // for tests .post( "/v1/record_safekeeper_info/:tenant_id/:timeline_id", diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index 2c28c5218d..ff621fdbc0 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -15,6 +15,7 @@ pub mod handler; pub mod http; pub mod json_ctrl; pub mod metrics; +pub mod pull_timeline; pub mod receive_wal; pub mod remove_wal; pub mod safekeeper; diff --git a/safekeeper/src/pull_timeline.rs b/safekeeper/src/pull_timeline.rs new file mode 100644 index 0000000000..344b760fd3 --- /dev/null +++ b/safekeeper/src/pull_timeline.rs @@ -0,0 +1,240 @@ +use serde::{Deserialize, Serialize}; + +use anyhow::{bail, Context, Result}; +use tokio::io::AsyncWriteExt; +use tracing::info; +use utils::id::{TenantId, TenantTimelineId, TimelineId}; + +use serde_with::{serde_as, DisplayFromStr}; + +use crate::{ + control_file, debug_dump, + http::routes::TimelineStatus, + wal_storage::{self, Storage}, + GlobalTimelines, +}; + +/// Info about timeline on safekeeper ready for reporting. +#[serde_as] +#[derive(Debug, Serialize, Deserialize)] +pub struct Request { + #[serde_as(as = "DisplayFromStr")] + pub tenant_id: TenantId, + #[serde_as(as = "DisplayFromStr")] + pub timeline_id: TimelineId, + pub http_hosts: Vec, +} + +#[derive(Debug, Serialize)] +pub struct Response { + // Donor safekeeper host + pub safekeeper_host: String, + // TODO: add more fields? +} + +/// Find the most advanced safekeeper and pull timeline from it. +pub async fn handle_request(request: Request) -> Result { + let existing_tli = GlobalTimelines::get(TenantTimelineId::new( + request.tenant_id, + request.timeline_id, + )); + if existing_tli.is_ok() { + bail!("Timeline {} already exists", request.timeline_id); + } + + let client = reqwest::Client::new(); + let http_hosts = request.http_hosts.clone(); + + // Send request to /v1/tenant/:tenant_id/timeline/:timeline_id + let responses = futures::future::join_all(http_hosts.iter().map(|url| { + let url = format!( + "{}/v1/tenant/{}/timeline/{}", + url, request.tenant_id, request.timeline_id + ); + client.get(url).send() + })) + .await; + + let mut statuses = Vec::new(); + for (i, response) in responses.into_iter().enumerate() { + let response = response.context(format!("Failed to get status from {}", http_hosts[i]))?; + let status: crate::http::routes::TimelineStatus = response.json().await?; + statuses.push((status, i)); + } + + // Find the most advanced safekeeper + // TODO: current logic may be wrong, fix it later + let (status, i) = statuses + .into_iter() + .max_by_key(|(status, _)| { + ( + status.acceptor_state.epoch, + status.flush_lsn, + status.commit_lsn, + ) + }) + .unwrap(); + let safekeeper_host = http_hosts[i].clone(); + + assert!(status.tenant_id == request.tenant_id); + assert!(status.timeline_id == request.timeline_id); + + pull_timeline(status, safekeeper_host).await +} + +async fn pull_timeline(status: TimelineStatus, host: String) -> Result { + let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id); + info!( + "Pulling timeline {} from safekeeper {}, commit_lsn={}, flush_lsn={}, term={}, epoch={}", + ttid, + host, + status.commit_lsn, + status.flush_lsn, + status.acceptor_state.term, + status.acceptor_state.epoch + ); + + let conf = &GlobalTimelines::get_global_config(); + + let client = reqwest::Client::new(); + // TODO: don't use debug dump, it should be used only in tests. + // This is a proof of concept, we should figure out a way + // to use scp without implementing it manually. + + // Implementing our own scp over HTTP. + // At first, we need to fetch list of files from safekeeper. + let dump: debug_dump::Response = client + .get(format!( + "{}/v1/debug_dump?dump_all=true&tenant_id={}&timeline_id={}", + host, status.tenant_id, status.timeline_id + )) + .send() + .await? + .json() + .await?; + + if dump.timelines.len() != 1 { + bail!( + "Expected to fetch single timeline, got {} timelines", + dump.timelines.len() + ); + } + + let timeline = dump.timelines.into_iter().next().unwrap(); + let disk_content = timeline.disk_content.ok_or(anyhow::anyhow!( + "Timeline {} doesn't have disk content", + ttid + ))?; + + let mut filenames = disk_content + .files + .iter() + .map(|file| file.name.clone()) + .collect::>(); + + // Sort filenames to make sure we pull files in correct order + // After sorting, we should have: + // - 000000010000000000000001 + // - ... + // - 000000010000000000000002.partial + // - safekeeper.control + filenames.sort(); + + // safekeeper.control should be the first file, so we need to move it to the beginning + let control_file_index = filenames + .iter() + .position(|name| name == "safekeeper.control") + .ok_or(anyhow::anyhow!("safekeeper.control not found"))?; + filenames.remove(control_file_index); + filenames.insert(0, "safekeeper.control".to_string()); + + info!( + "Downloading {} files from safekeeper {}", + filenames.len(), + host + ); + + // Creating temp directory for a new timeline. It needs to be + // located on the same filesystem as the rest of the timelines. + + // conf.workdir is usually /storage/safekeeper/data + // will try to transform it into /storage/safekeeper/tmp + let temp_base = conf + .workdir + .parent() + .ok_or(anyhow::anyhow!("workdir has no parent"))? + .join("tmp"); + + tokio::fs::create_dir_all(&temp_base).await?; + + let tli_dir = tempfile::Builder::new() + .suffix("_temptli") + .prefix(&format!("{}_{}_", ttid.tenant_id, ttid.timeline_id)) + .tempdir_in(temp_base)?; + let tli_dir_path = tli_dir.path().to_owned(); + + // Note: some time happens between fetching list of files and fetching files themselves. + // It's possible that some files will be removed from safekeeper and we will fail to fetch them. + // This function will fail in this case, should be retried by the caller. + for filename in filenames { + let file_path = tli_dir_path.join(&filename); + // /v1/tenant/:tenant_id/timeline/:timeline_id/file/:filename + let http_url = format!( + "{}/v1/tenant/{}/timeline/{}/file/{}", + host, status.tenant_id, status.timeline_id, filename + ); + + let mut file = tokio::fs::File::create(&file_path).await?; + let mut response = client.get(&http_url).send().await?; + while let Some(chunk) = response.chunk().await? { + file.write_all(&chunk).await?; + } + } + + // TODO: fsync? + + // Let's create timeline from temp directory and verify that it's correct + + let control_path = tli_dir_path.join("safekeeper.control"); + + let control_store = control_file::FileStorage::load_control_file(control_path)?; + if control_store.server.wal_seg_size == 0 { + bail!("wal_seg_size is not set"); + } + + let wal_store = + wal_storage::PhysicalStorage::new(&ttid, tli_dir_path.clone(), conf, &control_store)?; + + let commit_lsn = status.commit_lsn; + let flush_lsn = wal_store.flush_lsn(); + + info!( + "Finished downloading timeline {}, commit_lsn={}, flush_lsn={}", + ttid, commit_lsn, flush_lsn + ); + assert!(status.commit_lsn <= status.flush_lsn); + + // Move timeline dir to the correct location + let timeline_path = conf.timeline_dir(&ttid); + + info!( + "Moving timeline {} from {} to {}", + ttid, + tli_dir_path.display(), + timeline_path.display() + ); + tokio::fs::create_dir_all(conf.tenant_dir(&ttid.tenant_id)).await?; + tokio::fs::rename(tli_dir_path, &timeline_path).await?; + + let tli = GlobalTimelines::load_timeline(ttid).context("Failed to load timeline after copy")?; + + info!( + "Loaded timeline {}, flush_lsn={}", + ttid, + tli.get_flush_lsn() + ); + + Ok(Response { + safekeeper_host: host, + }) +} diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 64ca6967df..2dbf215998 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -129,7 +129,8 @@ impl SharedState { // We don't want to write anything to disk, because we may have existing timeline there. // These functions should not change anything on disk. let control_store = control_file::FileStorage::create_new(ttid, conf, state)?; - let wal_store = wal_storage::PhysicalStorage::new(ttid, conf, &control_store)?; + let wal_store = + wal_storage::PhysicalStorage::new(ttid, conf.timeline_dir(ttid), conf, &control_store)?; let sk = SafeKeeper::new(control_store, wal_store, conf.my_id)?; Ok(Self { @@ -149,7 +150,8 @@ impl SharedState { bail!(TimelineError::UninitializedWalSegSize(*ttid)); } - let wal_store = wal_storage::PhysicalStorage::new(ttid, conf, &control_store)?; + let wal_store = + wal_storage::PhysicalStorage::new(ttid, conf.timeline_dir(ttid), conf, &control_store)?; Ok(Self { sk: SafeKeeper::new(control_store, wal_store, conf.my_id)?, diff --git a/safekeeper/src/timelines_global_map.rs b/safekeeper/src/timelines_global_map.rs index 868ee97645..41809794dc 100644 --- a/safekeeper/src/timelines_global_map.rs +++ b/safekeeper/src/timelines_global_map.rs @@ -159,6 +159,26 @@ impl GlobalTimelines { Ok(()) } + /// Load timeline from disk to the memory. + pub fn load_timeline(ttid: TenantTimelineId) -> Result> { + let (conf, wal_backup_launcher_tx) = TIMELINES_STATE.lock().unwrap().get_dependencies(); + + match Timeline::load_timeline(conf, ttid, wal_backup_launcher_tx) { + Ok(timeline) => { + let tli = Arc::new(timeline); + // TODO: prevent concurrent timeline creation/loading + TIMELINES_STATE + .lock() + .unwrap() + .timelines + .insert(ttid, tli.clone()); + Ok(tli) + } + // If we can't load a timeline, it's bad. Caller will figure it out. + Err(e) => bail!("failed to load timeline {}, reason: {:?}", ttid, e), + } + } + /// Get the number of timelines in the map. pub fn timelines_count() -> usize { TIMELINES_STATE.lock().unwrap().timelines.len() diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index 54e27714ea..5ef22b2f6a 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -112,10 +112,10 @@ impl PhysicalStorage { /// the disk. Otherwise, all LSNs are set to zero. pub fn new( ttid: &TenantTimelineId, + timeline_dir: PathBuf, conf: &SafeKeeperConf, state: &SafeKeeperState, ) -> Result { - let timeline_dir = conf.timeline_dir(ttid); let wal_seg_size = state.server.wal_seg_size as usize; // Find out where stored WAL ends, starting at commit_lsn which is a diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index af7571cc4d..79b2e5b290 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2661,6 +2661,13 @@ class SafekeeperHttpClient(requests.Session): assert isinstance(res_json, dict) return res_json + def pull_timeline(self, body: Dict[str, Any]) -> Dict[str, Any]: + res = self.post(f"http://localhost:{self.port}/v1/pull_timeline", json=body) + res.raise_for_status() + res_json = res.json() + assert isinstance(res_json, dict) + return res_json + def timeline_create( self, tenant_id: TenantId, timeline_id: TimelineId, pg_version: int, commit_lsn: Lsn ): diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index e8cfa4f318..fed5f325ca 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -1254,3 +1254,98 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): with closing(endpoint_other.connect()) as conn: with conn.cursor() as cur: cur.execute("INSERT INTO t (key) VALUES (123)") + + +def test_pull_timeline(neon_env_builder: NeonEnvBuilder): + def safekeepers_guc(env: NeonEnv, sk_names: List[int]) -> str: + return ",".join([f"localhost:{sk.port.pg}" for sk in env.safekeepers if sk.id in sk_names]) + + def execute_payload(endpoint: Endpoint): + with closing(endpoint.connect()) as conn: + with conn.cursor() as cur: + # we rely upon autocommit after each statement + # as waiting for acceptors happens there + cur.execute("CREATE TABLE IF NOT EXISTS t(key int, value text)") + cur.execute("INSERT INTO t VALUES (0, 'something')") + sum_before = query_scalar(cur, "SELECT SUM(key) FROM t") + + cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'") + sum_after = query_scalar(cur, "SELECT SUM(key) FROM t") + assert sum_after == sum_before + 5000050000 + + def show_statuses(safekeepers: List[Safekeeper], tenant_id: TenantId, timeline_id: TimelineId): + for sk in safekeepers: + http_cli = sk.http_client() + try: + status = http_cli.timeline_status(tenant_id, timeline_id) + log.info(f"Safekeeper {sk.id} status: {status}") + except Exception as e: + log.info(f"Safekeeper {sk.id} status error: {e}") + + neon_env_builder.num_safekeepers = 4 + env = neon_env_builder.init_start() + env.neon_cli.create_branch("test_pull_timeline") + + log.info("Use only first 3 safekeepers") + env.safekeepers[3].stop() + active_safekeepers = [1, 2, 3] + endpoint = env.endpoints.create("test_pull_timeline") + endpoint.adjust_for_safekeepers(safekeepers_guc(env, active_safekeepers)) + endpoint.start() + + # learn neon timeline from compute + tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0]) + timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0]) + + execute_payload(endpoint) + show_statuses(env.safekeepers, tenant_id, timeline_id) + + log.info("Kill safekeeper 2, continue with payload") + env.safekeepers[1].stop(immediate=True) + execute_payload(endpoint) + + log.info("Initialize new safekeeper 4, pull data from 1 & 3") + env.safekeepers[3].start() + + res = ( + env.safekeepers[3] + .http_client() + .pull_timeline( + { + "tenant_id": str(tenant_id), + "timeline_id": str(timeline_id), + "http_hosts": [ + f"http://localhost:{env.safekeepers[0].port.http}", + f"http://localhost:{env.safekeepers[2].port.http}", + ], + } + ) + ) + log.info("Finished pulling timeline") + log.info(res) + + show_statuses(env.safekeepers, tenant_id, timeline_id) + + log.info("Restarting compute with new config to verify that it works") + active_safekeepers = [1, 3, 4] + + endpoint.stop_and_destroy().create("test_pull_timeline") + endpoint.adjust_for_safekeepers(safekeepers_guc(env, active_safekeepers)) + endpoint.start() + + execute_payload(endpoint) + show_statuses(env.safekeepers, tenant_id, timeline_id) + + log.info("Stop sk1 (simulate failure) and use only quorum of sk3 and sk4") + env.safekeepers[0].stop(immediate=True) + execute_payload(endpoint) + show_statuses(env.safekeepers, tenant_id, timeline_id) + + log.info("Restart sk4 and and use quorum of sk1 and sk4") + env.safekeepers[3].stop() + env.safekeepers[2].stop() + env.safekeepers[0].start() + env.safekeepers[3].start() + + execute_payload(endpoint) + show_statuses(env.safekeepers, tenant_id, timeline_id) From 2617e700081d35a2a20ca050015555fcd173da72 Mon Sep 17 00:00:00 2001 From: Shany Pozin Date: Sat, 29 Apr 2023 12:42:52 +0300 Subject: [PATCH 18/26] Add 4 new Pageservers for retool launch (#4115) ## Describe your changes Adding 4 new pageserves to us-west TF apply output: module.pageserver-us-west-2.aws_instance.this["7"]: Creation complete after 21s [id=i-02eec9b40617db5bc] module.pageserver-us-west-2.aws_instance.this["5"]: Creation complete after 21s [id=i-00ca6417c7bf96820] module.pageserver-us-west-2.aws_instance.this["4"]: Creation complete after 21s [id=i-013263dd1c239adcc] module.pageserver-us-west-2.aws_instance.this["6"]: Creation complete after 22s [id=i-01cdf7d2bc1433b6a] --- .github/ansible/prod.us-west-2.hosts.yaml | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/.github/ansible/prod.us-west-2.hosts.yaml b/.github/ansible/prod.us-west-2.hosts.yaml index 9cf847bcb1..1fde83520e 100644 --- a/.github/ansible/prod.us-west-2.hosts.yaml +++ b/.github/ansible/prod.us-west-2.hosts.yaml @@ -41,6 +41,14 @@ storage: ansible_host: i-051642d372c0a4f32 pageserver-3.us-west-2.aws.neon.tech: ansible_host: i-00c3844beb9ad1c6b + pageserver-4.us-west-2.aws.neon.tech: + ansible_host: i-013263dd1c239adcc + pageserver-5.us-west-2.aws.neon.tech: + ansible_host: i-00ca6417c7bf96820 + pageserver-6.us-west-2.aws.neon.tech: + ansible_host: i-01cdf7d2bc1433b6a + pageserver-7.us-west-2.aws.neon.tech: + ansible_host: i-02eec9b40617db5bc safekeepers: hosts: @@ -50,4 +58,3 @@ storage: ansible_host: i-074682f9d3c712e7c safekeeper-2.us-west-2.aws.neon.tech: ansible_host: i-042b7efb1729d7966 - From 95244912c53dd83caf6c85afb6a781fc75ae9db5 Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Sat, 29 Apr 2023 13:31:04 +0300 Subject: [PATCH 19/26] Override sharded-slab to increase MAX_THREADS (#4122) Add patch directive to Cargo.toml to use patched version of sharded-slab: https://github.com/neondatabase/sharded-slab/commit/98d16753ab01c61f0a028de44167307a00efea00 Patch changes the MAX_THREADS limit from 4096 to 32768. This is a temporary workaround for using tracing from many threads in safekeepers code, until async safekeepers patch is merged to the main. Note that patch can affect other rust services, not only the safekeeper binary. --- Cargo.lock | 3 +-- Cargo.toml | 8 +++++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bc63cb0442..bce2d11188 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3960,8 +3960,7 @@ dependencies = [ [[package]] name = "sharded-slab" version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31" +source = "git+https://github.com/neondatabase/sharded-slab.git?rev=98d16753ab01c61f0a028de44167307a00efea00#98d16753ab01c61f0a028de44167307a00efea00" dependencies = [ "lazy_static", ] diff --git a/Cargo.toml b/Cargo.toml index a18236d09a..b73e29ef6c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -161,11 +161,17 @@ rstest = "0.17" tempfile = "3.4" tonic-build = "0.9" +[patch.crates-io] + # This is only needed for proxy's tests. # TODO: we should probably fork `tokio-postgres-rustls` instead. -[patch.crates-io] tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="0bc41d8503c092b040142214aac3cf7d11d0c19f" } +# Changes the MAX_THREADS limit from 4096 to 32768. +# This is a temporary workaround for using tracing from many threads in safekeepers code, +# until async safekeepers patch is merged to the main. +sharded-slab = { git = "https://github.com/neondatabase/sharded-slab.git", rev="98d16753ab01c61f0a028de44167307a00efea00" } + ################# Binary contents sections [profile.release] From 21eb944b5e63599841cc8fb555ff9fb801165530 Mon Sep 17 00:00:00 2001 From: Rahul Patil Date: Sat, 29 Apr 2023 14:25:57 +0200 Subject: [PATCH 20/26] Staging: Add safekeeper nodes [3-8] to eu-west-1 (#4123) --- .github/ansible/staging.eu-west-1.hosts.yaml | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/.github/ansible/staging.eu-west-1.hosts.yaml b/.github/ansible/staging.eu-west-1.hosts.yaml index 39f5613935..2a00b3f726 100644 --- a/.github/ansible/staging.eu-west-1.hosts.yaml +++ b/.github/ansible/staging.eu-west-1.hosts.yaml @@ -44,3 +44,15 @@ storage: ansible_host: i-06969ee1bf2958bfc safekeeper-2.eu-west-1.aws.neon.build: ansible_host: i-087892e9625984a0b + safekeeper-3.eu-west-1.aws.neon.build: + ansible_host: i-0a6f91660e99e8891 + safekeeper-4.eu-west-1.aws.neon.build: + ansible_host: i-0012e309e28e7c249 + safekeeper-5.eu-west-1.aws.neon.build: + ansible_host: i-085a2b1193287b32e + safekeeper-6.eu-west-1.aws.neon.build: + ansible_host: i-0c713248465ed0fbd + safekeeper-7.eu-west-1.aws.neon.build: + ansible_host: i-02ad231aed2a80b7a + safekeeper-8.eu-west-1.aws.neon.build: + ansible_host: i-0dbbd8ffef66efda8 From 6f472df0d0fe8fd2795534bbb39cb37c59b9ff14 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Sun, 30 Apr 2023 14:34:55 +0300 Subject: [PATCH 21/26] fix: restore not logging ignored io errors as errors (#4120) the fix is rather indirect due to the accidental applying of too much `anyhow`: if handle_pagerequests returns a `QueryError` it will now be bubbled up as-is `QueryError`. `QueryError` allows the inner `std::io::Error` to be inspected and thus we can filter certain error kinds which are perfectly normal without a huge log message. for a very long time (b2f5102) the errors were converted to `anyhow` by mistake which made this difficult or impossible, even though from the types it would *appear* that we propagate wrapped `std::io::Error`s and can filter them. Fixes #4113, most likely filters some other errors as well. --- pageserver/src/page_service.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 8b0795db3c..a7a0d1a22e 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -352,7 +352,7 @@ impl PageServerHandler { tenant_id: TenantId, timeline_id: TimelineId, ctx: RequestContext, - ) -> anyhow::Result<()> + ) -> Result<(), QueryError> where IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, { @@ -398,7 +398,9 @@ impl PageServerHandler { Some(FeMessage::CopyData(bytes)) => bytes, Some(FeMessage::Terminate) => break, Some(m) => { - anyhow::bail!("unexpected message: {m:?} during COPY"); + return Err(QueryError::Other(anyhow::anyhow!( + "unexpected message: {m:?} during COPY" + ))); } None => break, // client disconnected }; From d53f81b44907b2192df8509d4986ebf00bc139a1 Mon Sep 17 00:00:00 2001 From: Stas Kelvich Date: Sun, 30 Apr 2023 21:36:50 +0300 Subject: [PATCH 22/26] Add one more pageserver to staging --- .github/ansible/staging.eu-west-1.hosts.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/ansible/staging.eu-west-1.hosts.yaml b/.github/ansible/staging.eu-west-1.hosts.yaml index 2a00b3f726..a54ced7f3a 100644 --- a/.github/ansible/staging.eu-west-1.hosts.yaml +++ b/.github/ansible/staging.eu-west-1.hosts.yaml @@ -35,6 +35,8 @@ storage: hosts: pageserver-0.eu-west-1.aws.neon.build: ansible_host: i-01d496c5041c7f34c + pageserver-1.eu-west-1.aws.neon.build: + ansible_host: i-0e8013e239ce3928c safekeepers: hosts: From 138bc028ed123c3658d9324a25ad221fb617b23b Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Mon, 1 May 2023 11:54:09 +0300 Subject: [PATCH 23/26] fix: quick and dirty panic avoidance on drop path (#4128) Sentry caught a panic on load testing server related to metric removals: https://neondatabase.sentry.io/issues/4142396994 Turn the `expect` into logging, but also add logging for each removal, so we could identify in which cases we do double-remove. The double-removal (or never adding) cause is not obvious or expected. Original added in #3837. --- pageserver/src/metrics.rs | 35 +++++++++++++++++++++++++++-------- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index deb20f21f8..b5d7eb0132 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -287,14 +287,33 @@ impl EvictionsWithLowResidenceDuration { let Some(_counter) = self.counter.take() else { return; }; - EVICTIONS_WITH_LOW_RESIDENCE_DURATION - .remove_label_values(&[ - tenant_id, - timeline_id, - self.data_source, - &Self::threshold_label_value(self.threshold), - ]) - .expect("we own the metric, no-one else should remove it"); + + let threshold = Self::threshold_label_value(self.threshold); + + let removed = EVICTIONS_WITH_LOW_RESIDENCE_DURATION.remove_label_values(&[ + tenant_id, + timeline_id, + self.data_source, + &threshold, + ]); + + match removed { + Err(e) => { + // this has been hit in staging as + // , but we don't know how. + // because we can be in the drop path already, don't risk: + // - "double-panic => illegal instruction" or + // - future "drop panick => abort" + // + // so just nag: (the error has the labels) + tracing::warn!("failed to remove EvictionsWithLowResidenceDuration, it was already removed? {e:#?}"); + } + Ok(()) => { + // to help identify cases where we double-remove the same values, let's log all + // deletions? + tracing::info!("removed EvictionsWithLowResidenceDuration with {tenant_id}, {timeline_id}, {}, {threshold}", self.data_source); + } + } } } From 7e368f3edfe520a1595ce700aeb43664e2474cc2 Mon Sep 17 00:00:00 2001 From: Anton Chaporgin Date: Mon, 1 May 2023 13:14:31 +0300 Subject: [PATCH 24/26] build pg-sni-router binary (#4129) ## Describe your changes This adds pg-sni-router binary to the build pipeline and neon image. ## Issue ticket number and link https://github.com/neondatabase/cloud/issues/1461 --- Dockerfile | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 6f7d2c32a5..f83f3b1c21 100644 --- a/Dockerfile +++ b/Dockerfile @@ -44,7 +44,15 @@ COPY --chown=nonroot . . # Show build caching stats to check if it was used in the end. # Has to be the part of the same RUN since cachepot daemon is killed in the end of this RUN, losing the compilation stats. RUN set -e \ -&& mold -run cargo build --bin pageserver --bin pageserver_binutils --bin draw_timeline_dir --bin safekeeper --bin storage_broker --bin proxy --locked --release \ + && mold -run cargo build \ + --bin pg_sni_router \ + --bin pageserver \ + --bin pageserver_binutils \ + --bin draw_timeline_dir \ + --bin safekeeper \ + --bin storage_broker \ + --bin proxy \ + --locked --release \ && cachepot -s # Build final image @@ -63,6 +71,7 @@ RUN set -e \ && useradd -d /data neon \ && chown -R neon:neon /data +COPY --from=build --chown=neon:neon /home/nonroot/target/release/pg_sni_router /usr/local/bin COPY --from=build --chown=neon:neon /home/nonroot/target/release/pageserver /usr/local/bin COPY --from=build --chown=neon:neon /home/nonroot/target/release/pageserver_binutils /usr/local/bin COPY --from=build --chown=neon:neon /home/nonroot/target/release/draw_timeline_dir /usr/local/bin From e3ae2661ee289dc9f9a94215bfef4315f4b2eb50 Mon Sep 17 00:00:00 2001 From: Shany Pozin Date: Mon, 1 May 2023 14:22:59 +0300 Subject: [PATCH 25/26] Add 2 new sets of safekeepers to us-west2 (#4130) ## Describe your changes TF output: module.safekeeper-us-west-2.aws_instance.this["3"]: Creation complete after 13s [id=i-089f6b9ef426dff76] module.safekeeper-us-west-2.aws_instance.this["4"]: Creation complete after 13s [id=i-0fe6bf912c4710c82] module.safekeeper-us-west-2.aws_instance.this["5"]: Creation complete after 13s [id=i-0a83c1c46d2b4e409] module.safekeeper-us-west-2.aws_instance.this["6"]: Creation complete after 13s [id=i-0fef5317b8fdc9f8d] module.safekeeper-us-west-2.aws_instance.this["7"]: Creation complete after 13s [id=i-0be739190d4289bf9] module.safekeeper-us-west-2.aws_instance.this["8"]: Creation complete after 13s [id=i-00e851803669e5cfe] --- .github/ansible/prod.us-west-2.hosts.yaml | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/.github/ansible/prod.us-west-2.hosts.yaml b/.github/ansible/prod.us-west-2.hosts.yaml index 1fde83520e..be65d8e63c 100644 --- a/.github/ansible/prod.us-west-2.hosts.yaml +++ b/.github/ansible/prod.us-west-2.hosts.yaml @@ -58,3 +58,15 @@ storage: ansible_host: i-074682f9d3c712e7c safekeeper-2.us-west-2.aws.neon.tech: ansible_host: i-042b7efb1729d7966 + safekeeper-3.us-west-2.aws.neon.tech: + ansible_host: i-089f6b9ef426dff76 + safekeeper-4.us-west-2.aws.neon.tech: + ansible_host: i-0fe6bf912c4710c82 + safekeeper-5.us-west-2.aws.neon.tech: + ansible_host: i-0a83c1c46d2b4e409 + safekeeper-6.us-west-2.aws.neon.tech: + ansible_host: i-0fef5317b8fdc9f8d + safekeeper-7.us-west-2.aws.neon.tech: + ansible_host: i-0be739190d4289bf9 + safekeeper-8.us-west-2.aws.neon.tech: + ansible_host: i-00e851803669e5cfe From 093fafd6bd5c53722227cea33dfbf3df5230f09d Mon Sep 17 00:00:00 2001 From: Sergey Melnikov Date: Mon, 1 May 2023 17:18:45 +0200 Subject: [PATCH 26/26] Deploy pg-sni-router (#4132) --- .../dev-eu-central-1-alpha.pg-sni-router.yaml | 19 +++++++ .../dev-eu-west-1-zeta.pg-sni-router.yaml | 19 +++++++ .../dev-us-east-2-beta.pg-sni-router.yaml | 19 +++++++ ...-ap-southeast-1-epsilon.pg-sni-router.yaml | 19 +++++++ ...prod-eu-central-1-gamma.pg-sni-router.yaml | 19 +++++++ .../prod-us-east-1-theta.pg-sni-router.yaml | 19 +++++++ .../prod-us-east-2-delta.pg-sni-router.yaml | 19 +++++++ .../prod-us-west-2-eta.pg-sni-router.yaml | 19 +++++++ .github/workflows/deploy-dev.yml | 51 +++++++++++++++++++ .github/workflows/deploy-prod.yml | 44 ++++++++++++++++ 10 files changed, 247 insertions(+) create mode 100644 .github/helm-values/dev-eu-central-1-alpha.pg-sni-router.yaml create mode 100644 .github/helm-values/dev-eu-west-1-zeta.pg-sni-router.yaml create mode 100644 .github/helm-values/dev-us-east-2-beta.pg-sni-router.yaml create mode 100644 .github/helm-values/prod-ap-southeast-1-epsilon.pg-sni-router.yaml create mode 100644 .github/helm-values/prod-eu-central-1-gamma.pg-sni-router.yaml create mode 100644 .github/helm-values/prod-us-east-1-theta.pg-sni-router.yaml create mode 100644 .github/helm-values/prod-us-east-2-delta.pg-sni-router.yaml create mode 100644 .github/helm-values/prod-us-west-2-eta.pg-sni-router.yaml diff --git a/.github/helm-values/dev-eu-central-1-alpha.pg-sni-router.yaml b/.github/helm-values/dev-eu-central-1-alpha.pg-sni-router.yaml new file mode 100644 index 0000000000..a80423b12d --- /dev/null +++ b/.github/helm-values/dev-eu-central-1-alpha.pg-sni-router.yaml @@ -0,0 +1,19 @@ +useCertManager: true + +replicaCount: 3 + +exposedService: + # exposedService.port -- Exposed Service proxy port + port: 4432 + annotations: + external-dns.alpha.kubernetes.io/hostname: "*.snirouter.alpha.eu-central-1.internal.aws.neon.build" + +settings: + domain: "*.snirouter.alpha.eu-central-1.internal.aws.neon.build" + sentryEnvironment: "staging" + +imagePullSecrets: + - name: docker-hub-neon + +metrics: + enabled: false diff --git a/.github/helm-values/dev-eu-west-1-zeta.pg-sni-router.yaml b/.github/helm-values/dev-eu-west-1-zeta.pg-sni-router.yaml new file mode 100644 index 0000000000..c9c628af0c --- /dev/null +++ b/.github/helm-values/dev-eu-west-1-zeta.pg-sni-router.yaml @@ -0,0 +1,19 @@ +useCertManager: true + +replicaCount: 3 + +exposedService: + # exposedService.port -- Exposed Service proxy port + port: 4432 + annotations: + external-dns.alpha.kubernetes.io/hostname: "*.snirouter.zeta.eu-west-1.internal.aws.neon.build" + +settings: + domain: "*.snirouter.zeta.eu-west-1.internal.aws.neon.build" + sentryEnvironment: "staging" + +imagePullSecrets: + - name: docker-hub-neon + +metrics: + enabled: false diff --git a/.github/helm-values/dev-us-east-2-beta.pg-sni-router.yaml b/.github/helm-values/dev-us-east-2-beta.pg-sni-router.yaml new file mode 100644 index 0000000000..68ad096df7 --- /dev/null +++ b/.github/helm-values/dev-us-east-2-beta.pg-sni-router.yaml @@ -0,0 +1,19 @@ +useCertManager: true + +replicaCount: 3 + +exposedService: + # exposedService.port -- Exposed Service proxy port + port: 4432 + annotations: + external-dns.alpha.kubernetes.io/hostname: "*.snirouter.beta.us-east-2.internal.aws.neon.build" + +settings: + domain: "*.snirouter.beta.us-east-2.internal.aws.neon.build" + sentryEnvironment: "staging" + +imagePullSecrets: + - name: docker-hub-neon + +metrics: + enabled: false diff --git a/.github/helm-values/prod-ap-southeast-1-epsilon.pg-sni-router.yaml b/.github/helm-values/prod-ap-southeast-1-epsilon.pg-sni-router.yaml new file mode 100644 index 0000000000..478ad5631c --- /dev/null +++ b/.github/helm-values/prod-ap-southeast-1-epsilon.pg-sni-router.yaml @@ -0,0 +1,19 @@ +useCertManager: true + +replicaCount: 3 + +exposedService: + # exposedService.port -- Exposed Service proxy port + port: 4432 + annotations: + external-dns.alpha.kubernetes.io/hostname: "*.snirouter.epsilon.ap-southeast-1.internal.aws.neon.tech" + +settings: + domain: "*.snirouter.epsilon.ap-southeast-1.internal.aws.neon.tech" + sentryEnvironment: "production" + +imagePullSecrets: + - name: docker-hub-neon + +metrics: + enabled: false diff --git a/.github/helm-values/prod-eu-central-1-gamma.pg-sni-router.yaml b/.github/helm-values/prod-eu-central-1-gamma.pg-sni-router.yaml new file mode 100644 index 0000000000..08a0a163bc --- /dev/null +++ b/.github/helm-values/prod-eu-central-1-gamma.pg-sni-router.yaml @@ -0,0 +1,19 @@ +useCertManager: true + +replicaCount: 3 + +exposedService: + # exposedService.port -- Exposed Service proxy port + port: 4432 + annotations: + external-dns.alpha.kubernetes.io/hostname: "*.snirouter.gamma.eu-central-1.internal.aws.neon.tech" + +settings: + domain: "*.snirouter.gamma.eu-central-1.internal.aws.neon.tech" + sentryEnvironment: "production" + +imagePullSecrets: + - name: docker-hub-neon + +metrics: + enabled: false diff --git a/.github/helm-values/prod-us-east-1-theta.pg-sni-router.yaml b/.github/helm-values/prod-us-east-1-theta.pg-sni-router.yaml new file mode 100644 index 0000000000..ab308131bc --- /dev/null +++ b/.github/helm-values/prod-us-east-1-theta.pg-sni-router.yaml @@ -0,0 +1,19 @@ +useCertManager: true + +replicaCount: 3 + +exposedService: + # exposedService.port -- Exposed Service proxy port + port: 4432 + annotations: + external-dns.alpha.kubernetes.io/hostname: "*.snirouter.theta.us-east-1.internal.aws.neon.tech" + +settings: + domain: "*.snirouter.theta.us-east-1.internal.aws.neon.tech" + sentryEnvironment: "production" + +imagePullSecrets: + - name: docker-hub-neon + +metrics: + enabled: false diff --git a/.github/helm-values/prod-us-east-2-delta.pg-sni-router.yaml b/.github/helm-values/prod-us-east-2-delta.pg-sni-router.yaml new file mode 100644 index 0000000000..ecb3f156ec --- /dev/null +++ b/.github/helm-values/prod-us-east-2-delta.pg-sni-router.yaml @@ -0,0 +1,19 @@ +useCertManager: true + +replicaCount: 3 + +exposedService: + # exposedService.port -- Exposed Service proxy port + port: 4432 + annotations: + external-dns.alpha.kubernetes.io/hostname: "*.snirouter.delta.us-east-2.internal.aws.neon.tech" + +settings: + domain: "*.snirouter.delta.us-east-2.internal.aws.neon.tech" + sentryEnvironment: "production" + +imagePullSecrets: + - name: docker-hub-neon + +metrics: + enabled: false diff --git a/.github/helm-values/prod-us-west-2-eta.pg-sni-router.yaml b/.github/helm-values/prod-us-west-2-eta.pg-sni-router.yaml new file mode 100644 index 0000000000..942250c419 --- /dev/null +++ b/.github/helm-values/prod-us-west-2-eta.pg-sni-router.yaml @@ -0,0 +1,19 @@ +useCertManager: true + +replicaCount: 3 + +exposedService: + # exposedService.port -- Exposed Service proxy port + port: 4432 + annotations: + external-dns.alpha.kubernetes.io/hostname: "*.snirouter.eta.us-west-2.internal.aws.neon.tech" + +settings: + domain: "*.snirouter.eta.us-west-2.internal.aws.neon.tech" + sentryEnvironment: "production" + +imagePullSecrets: + - name: docker-hub-neon + +metrics: + enabled: false diff --git a/.github/workflows/deploy-dev.yml b/.github/workflows/deploy-dev.yml index 5d1c6e0e16..f37e1b344d 100644 --- a/.github/workflows/deploy-dev.yml +++ b/.github/workflows/deploy-dev.yml @@ -27,6 +27,11 @@ on: required: true type: boolean default: true + deployPgSniRouter: + description: 'Deploy pg-sni-router' + required: true + type: boolean + default: true env: AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_DEV }} @@ -227,3 +232,49 @@ jobs: - name: Cleanup helm folder run: rm -rf ~/.cache + + deploy-pg-sni-router: + runs-on: [ self-hosted, gen3, small ] + container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/ansible:pinned + if: inputs.deployPgSniRouter + defaults: + run: + shell: bash + strategy: + matrix: + include: + - target_region: us-east-2 + target_cluster: dev-us-east-2-beta + - target_region: eu-west-1 + target_cluster: dev-eu-west-1-zeta + - target_region: eu-central-1 + target_cluster: dev-eu-central-1-alpha + environment: + name: dev-${{ matrix.target_region }} + steps: + - name: Checkout + uses: actions/checkout@v3 + with: + submodules: true + fetch-depth: 0 + ref: ${{ inputs.branch }} + + - name: Configure AWS Credentials + uses: aws-actions/configure-aws-credentials@v1-node16 + with: + role-to-assume: arn:aws:iam::369495373322:role/github-runner + aws-region: eu-central-1 + role-skip-session-tagging: true + role-duration-seconds: 1800 + + - name: Configure environment + run: | + helm repo add neondatabase https://neondatabase.github.io/helm-charts + aws --region ${{ matrix.target_region }} eks update-kubeconfig --name ${{ matrix.target_cluster }} + + - name: Deploy pg-sni-router + run: + helm upgrade neon-pg-sni-router neondatabase/neon-pg-sni-router --namespace neon-pg-sni-router --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.pg-sni-router.yaml --set image.tag=${{ inputs.dockerTag }} --set settings.sentryUrl=${{ secrets.SENTRY_URL_BROKER }} --wait --timeout 15m0s + + - name: Cleanup helm folder + run: rm -rf ~/.cache diff --git a/.github/workflows/deploy-prod.yml b/.github/workflows/deploy-prod.yml index 9fa31b3225..c5d690db3a 100644 --- a/.github/workflows/deploy-prod.yml +++ b/.github/workflows/deploy-prod.yml @@ -27,6 +27,11 @@ on: required: true type: boolean default: true + deployPgSniRouter: + description: 'Deploy pg-sni-router' + required: true + type: boolean + default: true disclamerAcknowledged: description: 'I confirm that there is an emergency and I can not use regular release workflow' required: true @@ -171,3 +176,42 @@ jobs: - name: Deploy storage-broker run: helm upgrade neon-storage-broker-lb neondatabase/neon-storage-broker --namespace neon-storage-broker-lb --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.neon-storage-broker.yaml --set image.tag=${{ inputs.dockerTag }} --set settings.sentryUrl=${{ secrets.SENTRY_URL_BROKER }} --wait --timeout 5m0s + + deploy-pg-sni-router: + runs-on: prod + container: 093970136003.dkr.ecr.eu-central-1.amazonaws.com/ansible:latest + if: inputs.deployPgSniRouter && inputs.disclamerAcknowledged + defaults: + run: + shell: bash + strategy: + matrix: + include: + - target_region: us-east-2 + target_cluster: prod-us-east-2-delta + - target_region: us-west-2 + target_cluster: prod-us-west-2-eta + - target_region: eu-central-1 + target_cluster: prod-eu-central-1-gamma + - target_region: ap-southeast-1 + target_cluster: prod-ap-southeast-1-epsilon + - target_region: us-east-1 + target_cluster: prod-us-east-1-theta + environment: + name: prod-${{ matrix.target_region }} + steps: + - name: Checkout + uses: actions/checkout@v3 + with: + submodules: true + fetch-depth: 0 + ref: ${{ inputs.branch }} + + - name: Configure environment + run: | + helm repo add neondatabase https://neondatabase.github.io/helm-charts + aws --region ${{ matrix.target_region }} eks update-kubeconfig --name ${{ matrix.target_cluster }} + + - name: Deploy pg-sni-router + run: + helm upgrade neon-pg-sni-router neondatabase/neon-pg-sni-router --namespace neon-pg-sni-router --create-namespace --install --atomic -f .github/helm-values/${{ matrix.target_cluster }}.pg-sni-router.yaml --set image.tag=${{ inputs.dockerTag }} --set settings.sentryUrl=${{ secrets.SENTRY_URL_BROKER }} --wait --timeout 15m0s