From 2b0248cd76b469deebdf862ba3ebea4a1802ef51 Mon Sep 17 00:00:00 2001 From: Em Sharnoff Date: Fri, 25 Apr 2025 04:09:56 -0700 Subject: [PATCH 1/7] fix(proxy): s/Console/Control plane/ in cplane error (#11716) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit I got bamboozled by the error message while debugging, seems no objections to updating it. ref https://neondb.slack.com/archives/C060N3SEF9D/p1745570961111509 ref https://neondb.slack.com/archives/C039YKBRZB4/p1745570811957019?thread_ts=1745393368.283599 --- proxy/src/control_plane/errors.rs | 2 +- test_runner/random_ops/test_random_ops.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/proxy/src/control_plane/errors.rs b/proxy/src/control_plane/errors.rs index 337ed665cc..850d061333 100644 --- a/proxy/src/control_plane/errors.rs +++ b/proxy/src/control_plane/errors.rs @@ -8,7 +8,7 @@ use crate::error::{ErrorKind, ReportableError, UserFacingError}; use crate::proxy::retry::CouldRetry; /// A go-to error message which doesn't leak any detail. -pub(crate) const REQUEST_FAILED: &str = "Console request failed"; +pub(crate) const REQUEST_FAILED: &str = "Control plane request failed"; /// Common console API error. #[derive(Debug, Error)] diff --git a/test_runner/random_ops/test_random_ops.py b/test_runner/random_ops/test_random_ops.py index b3078ecac1..643151fa11 100644 --- a/test_runner/random_ops/test_random_ops.py +++ b/test_runner/random_ops/test_random_ops.py @@ -323,6 +323,7 @@ class NeonProject: if self.restart_pgbench_on_console_errors and ( "ERROR: Couldn't connect to compute node" in err or "ERROR: Console request failed" in err + or "ERROR: Control plane request failed" in err ): log.info("Restarting benchmark for %s", target) self.benchmarks.pop(target) From 6f0046b688ad2b4df311dcf035884b21d715a08c Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Fri, 25 Apr 2025 12:46:15 +0100 Subject: [PATCH 2/7] storage_controller: ensure mutual exclusion for imports and shard splits (#11632) ## Problem Shard splits break timeline imports. ## Summary of Changes Ensure mutual exclusion for imports and shard splits. On the shard split code path: 1. Right before shard splitting, check the database to ensure that no-import is on-going for the tenant. Exclusion is guaranteed because this validation is done while holding the exclusive tenant lock. Timeline creation (and import creation implicitly) requires a shared tenant lock. 2. When selecting a shard to split, use the in-mem state to exclude shards with an on-going import. This is opportunistic since an import might start after the check, but allows shard splits to make progres instead of continously retrying to split the same shard. On the timeline creation code path: 1. Check the in-memory splitting flag on all shards of the tenant. If any of them are splitting, error out asking the client to retry. On the happy path this is not required, due to the tenant lock set-up described above, but it covers the case where we restart with a pending shard-split. Closes https://github.com/neondatabase/neon/issues/11567 --- libs/pageserver_api/src/controller_api.rs | 2 + pageserver/src/tenant.rs | 18 ++++ storage_controller/src/persistence.rs | 29 ++++-- storage_controller/src/service.rs | 108 +++++++++++++++++++--- storage_controller/src/tenant_shard.rs | 8 ++ storage_controller/src/timeline_import.rs | 6 ++ 6 files changed, 153 insertions(+), 18 deletions(-) diff --git a/libs/pageserver_api/src/controller_api.rs b/libs/pageserver_api/src/controller_api.rs index 91f9c03ba4..c5b49edba0 100644 --- a/libs/pageserver_api/src/controller_api.rs +++ b/libs/pageserver_api/src/controller_api.rs @@ -169,6 +169,8 @@ pub struct TenantDescribeResponseShard { pub is_pending_compute_notification: bool, /// A shard split is currently underway pub is_splitting: bool, + /// A timeline is being imported into this tenant + pub is_importing: bool, pub scheduling_policy: ShardSchedulingPolicy, diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 997fc24052..698579e8fb 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3816,6 +3816,24 @@ impl TenantShard { MaybeDeletedIndexPart::IndexPart(p) => p, }; + // A shard split may not take place while a timeline import is on-going + // for the tenant. Timeline imports run as part of each tenant shard + // and rely on the sharding scheme to split the work among pageservers. + // If we were to split in the middle of this process, we would have to + // either ensure that it's driven to completion on the old shard set + // or transfer it to the new shard set. It's technically possible, but complex. + match index_part.import_pgdata { + Some(ref import) if !import.is_done() => { + anyhow::bail!( + "Cannot split due to import with idempotency key: {:?}", + import.idempotency_key() + ); + } + Some(_) | None => { + // fallthrough + } + } + for child_shard in child_shards { tracing::info!(%timeline_id, "Uploading index_part for child {}", child_shard.to_index()); upload_index_part( diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index 64a8846a9d..9ffcf9b9e6 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -134,6 +134,7 @@ pub(crate) enum DatabaseOperation { UpdateTimelineImport, DeleteTimelineImport, ListTimelineImports, + IsTenantImportingTimeline, } #[must_use] @@ -1641,9 +1642,7 @@ impl Persistence { .await } - pub(crate) async fn list_complete_timeline_imports( - &self, - ) -> DatabaseResult> { + pub(crate) async fn list_timeline_imports(&self) -> DatabaseResult> { use crate::schema::timeline_imports::dsl; let persistent = self .with_measured_conn(DatabaseOperation::ListTimelineImports, move |conn| { @@ -1660,10 +1659,7 @@ impl Persistence { .map(TimelineImport::from_persistent) .collect(); match imports { - Ok(ok) => Ok(ok - .into_iter() - .filter(|import| import.is_complete()) - .collect()), + Ok(ok) => Ok(ok.into_iter().collect()), Err(err) => Err(DatabaseError::Logical(format!( "failed to deserialize import: {err}" ))), @@ -1773,6 +1769,25 @@ impl Persistence { }) .await } + + pub(crate) async fn is_tenant_importing_timeline( + &self, + tenant_id: TenantId, + ) -> DatabaseResult { + use crate::schema::timeline_imports::dsl; + self.with_measured_conn(DatabaseOperation::IsTenantImportingTimeline, move |conn| { + Box::pin(async move { + let imports: i64 = dsl::timeline_imports + .filter(dsl::tenant_id.eq(tenant_id.to_string())) + .count() + .get_result(conn) + .await?; + + Ok(imports > 0) + }) + }) + .await + } } pub(crate) fn load_certs() -> anyhow::Result> { diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 7e5e3fd8f4..ca9b911c4d 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -97,7 +97,9 @@ use crate::tenant_shard::{ ReconcileNeeded, ReconcileResult, ReconcileWaitError, ReconcilerStatus, ReconcilerWaiter, ScheduleOptimization, ScheduleOptimizationAction, TenantShard, }; -use crate::timeline_import::{ShardImportStatuses, TimelineImport, UpcallClient}; +use crate::timeline_import::{ + ShardImportStatuses, TimelineImport, TimelineImportState, UpcallClient, +}; const WAITER_FILL_DRAIN_POLL_TIMEOUT: Duration = Duration::from_millis(500); @@ -878,15 +880,33 @@ impl Service { }); } - // Fetch the list of completed imports and attempt to finalize them in the background. - // This handles the case where the previous storage controller instance shut down - // whilst finalizing imports. - let complete_imports = self.persistence.list_complete_timeline_imports().await; - match complete_imports { - Ok(ok) => { + // Reconcile the timeline imports: + // 1. Mark each tenant shard of tenants with an importing timeline as importing. + // 2. Finalize the completed imports in the background. This handles the case where + // the previous storage controller instance shut down whilst finalizing imports. + let imports = self.persistence.list_timeline_imports().await; + match imports { + Ok(mut imports) => { + { + let mut locked = self.inner.write().unwrap(); + for import in &imports { + locked + .tenants + .range_mut(TenantShardId::tenant_range(import.tenant_id)) + .for_each(|(_id, shard)| { + shard.importing = TimelineImportState::Importing + }); + } + } + + imports.retain(|import| import.is_complete()); tokio::task::spawn({ let finalize_imports_self = self.clone(); - async move { finalize_imports_self.finalize_timeline_imports(ok).await } + async move { + finalize_imports_self + .finalize_timeline_imports(imports) + .await + } }); } Err(err) => { @@ -3772,6 +3792,22 @@ impl Service { failpoint_support::sleep_millis_async!("tenant-create-timeline-shared-lock"); let is_import = create_req.is_import(); + if is_import { + // Ensure that there is no split on-going. + // [`Self::tenant_shard_split`] holds the exclusive tenant lock + // for the duration of the split, but here we handle the case + // where we restarted and the split is being aborted. + let locked = self.inner.read().unwrap(); + let splitting = locked + .tenants + .range(TenantShardId::tenant_range(tenant_id)) + .any(|(_id, shard)| shard.splitting != SplitState::Idle); + + if splitting { + return Err(ApiError::Conflict("Tenant is splitting shard".to_string())); + } + } + let timeline_info = self .tenant_timeline_create_pageservers(tenant_id, create_req) .await?; @@ -3809,6 +3845,14 @@ impl Service { .context("timeline import insert") .map_err(ApiError::InternalServerError)?; + // Set the importing flag on the tenant shards + self.inner + .write() + .unwrap() + .tenants + .range_mut(TenantShardId::tenant_range(tenant_id)) + .for_each(|(_id, shard)| shard.importing = TimelineImportState::Importing); + match inserted { true => { tracing::info!(%tenant_id, %timeline_id, "Inserted timeline import"); @@ -3931,6 +3975,13 @@ impl Service { tracing::warn!("Failed to delete timeline import entry from database: {err}"); } + self.inner + .write() + .unwrap() + .tenants + .range_mut(TenantShardId::tenant_range(import.tenant_id)) + .for_each(|(_id, shard)| shard.importing = TimelineImportState::Idle); + // TODO(vlad): Timeline creations in import mode do not return a correct initdb lsn, // so we can't create the timeline on the safekeepers. Fix by moving creation here. // https://github.com/neondatabase/neon/issues/11569 @@ -4914,6 +4965,7 @@ impl Service { is_reconciling: shard.reconciler.is_some(), is_pending_compute_notification: shard.pending_compute_notification, is_splitting: matches!(shard.splitting, SplitState::Splitting), + is_importing: shard.importing == TimelineImportState::Importing, scheduling_policy: shard.get_scheduling_policy(), preferred_az_id: shard.preferred_az().map(ToString::to_string), }) @@ -5404,6 +5456,27 @@ impl Service { .enter() .map_err(|_| ApiError::ShuttingDown)?; + // Timeline imports on the pageserver side can't handle shard-splits. + // If the tenant is importing a timeline, dont't shard split it. + match self + .persistence + .is_tenant_importing_timeline(tenant_id) + .await + { + Ok(importing) => { + if importing { + return Err(ApiError::Conflict( + "Cannot shard split during timeline import".to_string(), + )); + } + } + Err(err) => { + return Err(ApiError::InternalServerError(anyhow::anyhow!( + "Failed to check for running imports: {err}" + ))); + } + } + let new_shard_count = ShardCount::new(split_req.new_shard_count); let new_stripe_size = split_req.new_stripe_size; @@ -8076,12 +8149,25 @@ impl Service { candidates.extend(size_candidates); } - // Filter out tenants in a prohibiting scheduling mode. + // Filter out tenants in a prohibiting scheduling modes + // and tenants with an ongoing import. + // + // Note that the import check here is oportunistic. An import might start + // after the check before we actually update [`TenantShard::splitting`]. + // [`Self::tenant_shard_split`] checks the database whilst holding the exclusive + // tenant lock. Imports might take a long time, so the check here allows us + // to split something else instead of trying the same shard over and over. { let state = self.inner.read().unwrap(); candidates.retain(|i| { - let policy = state.tenants.get(&i.id).map(|s| s.get_scheduling_policy()); - policy == Some(ShardSchedulingPolicy::Active) + let shard = state.tenants.get(&i.id); + match shard { + Some(t) => { + t.get_scheduling_policy() == ShardSchedulingPolicy::Active + && t.importing == TimelineImportState::Idle + } + None => false, + } }); } diff --git a/storage_controller/src/tenant_shard.rs b/storage_controller/src/tenant_shard.rs index 3a75e96cb2..c7b2628ec4 100644 --- a/storage_controller/src/tenant_shard.rs +++ b/storage_controller/src/tenant_shard.rs @@ -33,6 +33,7 @@ use crate::scheduler::{ RefCountUpdate, ScheduleContext, ScheduleError, Scheduler, SecondaryShardTag, ShardTag, }; use crate::service::ReconcileResultRequest; +use crate::timeline_import::TimelineImportState; use crate::{Sequence, service}; /// Serialization helper @@ -100,6 +101,10 @@ pub(crate) struct TenantShard { /// reconciliation, and timeline creation. pub(crate) splitting: SplitState, + /// Flag indicating whether the tenant has an in-progress timeline import. + /// Used to disallow shard splits while an import is in progress. + pub(crate) importing: TimelineImportState, + /// If a tenant was enqueued for later reconcile due to hitting concurrency limit, this flag /// is set. This flag is cleared when the tenant is popped off the delay queue. pub(crate) delayed_reconcile: bool, @@ -583,6 +588,7 @@ impl TenantShard { config: TenantConfig::default(), reconciler: None, splitting: SplitState::Idle, + importing: TimelineImportState::Idle, sequence: Sequence(1), delayed_reconcile: false, waiter: Arc::new(SeqWait::new(Sequence(0))), @@ -1844,6 +1850,8 @@ impl TenantShard { config: serde_json::from_str(&tsp.config).unwrap(), reconciler: None, splitting: tsp.splitting, + // Filled in during [`Service::startup_reconcile`] + importing: TimelineImportState::Idle, waiter: Arc::new(SeqWait::new(Sequence::initial())), error_waiter: Arc::new(SeqWait::new(Sequence::initial())), last_error: Arc::default(), diff --git a/storage_controller/src/timeline_import.rs b/storage_controller/src/timeline_import.rs index b6dd4b252e..6dcc538c4b 100644 --- a/storage_controller/src/timeline_import.rs +++ b/storage_controller/src/timeline_import.rs @@ -14,6 +14,12 @@ use utils::{ use crate::{persistence::TimelineImportPersistence, service::Config}; +#[derive(Deserialize, Serialize, PartialEq, Eq)] +pub(crate) enum TimelineImportState { + Importing, + Idle, +} + #[derive(Serialize, Deserialize, Clone, Debug)] pub(crate) struct ShardImportStatuses(pub(crate) HashMap); From ef53a7643452cd44a60773239da9d786f1e30b35 Mon Sep 17 00:00:00 2001 From: Dmitrii Kovalkov <34828390+DimasKovas@users.noreply.github.com> Date: Fri, 25 Apr 2025 18:28:56 +0400 Subject: [PATCH 3/7] storage_broker: https handler (#11603) ## Problem Broker supports only HTTP, no HTTPS - Closes: https://github.com/neondatabase/cloud/issues/27492 ## Summary of changes - Add `listen_https_addr`, `ssl_key_file`, `ssl_cert_file`, `ssl_cert_reload_period` arguments to storage broker - Make `listen_addr` argument optional - Listen https in storage broker - Support https for storage broker request in neon_local - Add `use_https_storage_broker_api` option to NeonEnvBuilder --- Cargo.lock | 3 + control_plane/src/bin/neon_local.rs | 22 +++-- control_plane/src/broker.rs | 118 ++++++++++++++--------- control_plane/src/local_env.rs | 64 +++++++++--- control_plane/src/pageserver.rs | 16 +-- control_plane/src/safekeeper.rs | 2 +- control_plane/src/storage_controller.rs | 17 +--- storage_broker/Cargo.toml | 3 + storage_broker/src/bin/storage_broker.rs | 106 +++++++++++++++++--- storage_controller/src/main.rs | 2 +- test_runner/fixtures/neon_fixtures.py | 22 +++-- test_runner/regress/test_ssl.py | 22 +++++ 12 files changed, 278 insertions(+), 119 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2cf260c88c..4c464c62b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6616,12 +6616,14 @@ dependencies = [ "anyhow", "async-stream", "bytes", + "camino", "clap", "const_format", "futures", "futures-core", "futures-util", "http-body-util", + "http-utils", "humantime", "hyper 1.4.1", "hyper-util", @@ -6631,6 +6633,7 @@ dependencies = [ "prost 0.13.3", "rustls 0.23.18", "tokio", + "tokio-rustls 0.26.0", "tonic", "tonic-build", "tracing", diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 5cf6767361..6f55c0310f 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -17,8 +17,10 @@ use std::time::Duration; use anyhow::{Context, Result, anyhow, bail}; use clap::Parser; use compute_api::spec::ComputeMode; +use control_plane::broker::StorageBroker; use control_plane::endpoint::ComputeControlPlane; use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_PORT, EndpointStorage}; +use control_plane::local_env; use control_plane::local_env::{ EndpointStorageConf, InitForceMode, LocalEnv, NeonBroker, NeonLocalInitConf, NeonLocalInitPageserverConf, SafekeeperConf, @@ -28,7 +30,6 @@ use control_plane::safekeeper::SafekeeperNode; use control_plane::storage_controller::{ NeonStorageControllerStartArgs, NeonStorageControllerStopArgs, StorageController, }; -use control_plane::{broker, local_env}; use nix::fcntl::{FlockArg, flock}; use pageserver_api::config::{ DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT, @@ -988,7 +989,8 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result { NeonLocalInitConf { control_plane_api: Some(DEFAULT_PAGESERVER_CONTROL_PLANE_API.parse().unwrap()), broker: NeonBroker { - listen_addr: DEFAULT_BROKER_ADDR.parse().unwrap(), + listen_addr: Some(DEFAULT_BROKER_ADDR.parse().unwrap()), + listen_https_addr: None, }, safekeepers: vec![SafekeeperConf { id: DEFAULT_SAFEKEEPER_ID, @@ -1777,7 +1779,8 @@ async fn handle_endpoint_storage( async fn handle_storage_broker(subcmd: &StorageBrokerCmd, env: &local_env::LocalEnv) -> Result<()> { match subcmd { StorageBrokerCmd::Start(args) => { - if let Err(e) = broker::start_broker_process(env, &args.start_timeout).await { + let storage_broker = StorageBroker::from_env(env); + if let Err(e) = storage_broker.start(&args.start_timeout).await { eprintln!("broker start failed: {e}"); exit(1); } @@ -1785,7 +1788,8 @@ async fn handle_storage_broker(subcmd: &StorageBrokerCmd, env: &local_env::Local StorageBrokerCmd::Stop(_args) => { // FIXME: stop_mode unused - if let Err(e) = broker::stop_broker_process(env) { + let storage_broker = StorageBroker::from_env(env); + if let Err(e) = storage_broker.stop() { eprintln!("broker stop failed: {e}"); exit(1); } @@ -1835,8 +1839,11 @@ async fn handle_start_all_impl( #[allow(clippy::redundant_closure_call)] (|| { js.spawn(async move { - let retry_timeout = retry_timeout; - broker::start_broker_process(env, &retry_timeout).await + let storage_broker = StorageBroker::from_env(env); + storage_broker + .start(&retry_timeout) + .await + .map_err(|e| e.context("start storage_broker")) }); js.spawn(async move { @@ -1991,7 +1998,8 @@ async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) { } } - if let Err(e) = broker::stop_broker_process(env) { + let storage_broker = StorageBroker::from_env(env); + if let Err(e) = storage_broker.stop() { eprintln!("neon broker stop failed: {e:#}"); } diff --git a/control_plane/src/broker.rs b/control_plane/src/broker.rs index 1b507bb384..f43f459636 100644 --- a/control_plane/src/broker.rs +++ b/control_plane/src/broker.rs @@ -3,60 +3,86 @@ //! In the local test environment, the storage broker stores its data directly in //! //! ```text -//! .neon +//! .neon/storage_broker //! ``` use std::time::Duration; use anyhow::Context; use camino::Utf8PathBuf; -use crate::{background_process, local_env}; +use crate::{background_process, local_env::LocalEnv}; -pub async fn start_broker_process( - env: &local_env::LocalEnv, - retry_timeout: &Duration, -) -> anyhow::Result<()> { - let broker = &env.broker; - let listen_addr = &broker.listen_addr; - - print!("Starting neon broker at {}", listen_addr); - - let args = [format!("--listen-addr={listen_addr}")]; - - let client = reqwest::Client::new(); - background_process::start_process( - "storage_broker", - &env.base_data_dir, - &env.storage_broker_bin(), - args, - [], - background_process::InitialPidFile::Create(storage_broker_pid_file_path(env)), - retry_timeout, - || async { - let url = broker.client_url(); - let status_url = url.join("status").with_context(|| { - format!("Failed to append /status path to broker endpoint {url}") - })?; - let request = client - .get(status_url) - .build() - .with_context(|| format!("Failed to construct request to broker endpoint {url}"))?; - match client.execute(request).await { - Ok(resp) => Ok(resp.status().is_success()), - Err(_) => Ok(false), - } - }, - ) - .await - .context("Failed to spawn storage_broker subprocess")?; - Ok(()) +pub struct StorageBroker { + env: LocalEnv, } -pub fn stop_broker_process(env: &local_env::LocalEnv) -> anyhow::Result<()> { - background_process::stop_process(true, "storage_broker", &storage_broker_pid_file_path(env)) -} +impl StorageBroker { + /// Create a new `StorageBroker` instance from the environment. + pub fn from_env(env: &LocalEnv) -> Self { + Self { env: env.clone() } + } -fn storage_broker_pid_file_path(env: &local_env::LocalEnv) -> Utf8PathBuf { - Utf8PathBuf::from_path_buf(env.base_data_dir.join("storage_broker.pid")) - .expect("non-Unicode path") + pub fn initialize(&self) -> anyhow::Result<()> { + if self.env.generate_local_ssl_certs { + self.env.generate_ssl_cert( + &self.env.storage_broker_data_dir().join("server.crt"), + &self.env.storage_broker_data_dir().join("server.key"), + )?; + } + Ok(()) + } + + /// Start the storage broker process. + pub async fn start(&self, retry_timeout: &Duration) -> anyhow::Result<()> { + let broker = &self.env.broker; + + print!("Starting neon broker at {}", broker.client_url()); + + let mut args = Vec::new(); + + if let Some(addr) = &broker.listen_addr { + args.push(format!("--listen-addr={addr}")); + } + if let Some(addr) = &broker.listen_https_addr { + args.push(format!("--listen-https-addr={addr}")); + } + + let client = self.env.create_http_client(); + background_process::start_process( + "storage_broker", + &self.env.storage_broker_data_dir(), + &self.env.storage_broker_bin(), + args, + [], + background_process::InitialPidFile::Create(self.pid_file_path()), + retry_timeout, + || async { + let url = broker.client_url(); + let status_url = url.join("status").with_context(|| { + format!("Failed to append /status path to broker endpoint {url}") + })?; + let request = client.get(status_url).build().with_context(|| { + format!("Failed to construct request to broker endpoint {url}") + })?; + match client.execute(request).await { + Ok(resp) => Ok(resp.status().is_success()), + Err(_) => Ok(false), + } + }, + ) + .await + .context("Failed to spawn storage_broker subprocess")?; + Ok(()) + } + + /// Stop the storage broker process. + pub fn stop(&self) -> anyhow::Result<()> { + background_process::stop_process(true, "storage_broker", &self.pid_file_path()) + } + + /// Get the path to the PID file for the storage broker. + fn pid_file_path(&self) -> Utf8PathBuf { + Utf8PathBuf::from_path_buf(self.env.base_data_dir.join("storage_broker.pid")) + .expect("non-Unicode path") + } } diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 77d5c1c922..a18b34daa4 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -4,7 +4,7 @@ //! script which will use local paths. use std::collections::HashMap; -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::net::SocketAddr; use std::path::{Path, PathBuf}; use std::process::{Command, Stdio}; use std::time::Duration; @@ -14,11 +14,12 @@ use anyhow::{Context, bail}; use clap::ValueEnum; use pem::Pem; use postgres_backend::AuthType; -use reqwest::Url; +use reqwest::{Certificate, Url}; use serde::{Deserialize, Serialize}; use utils::auth::encode_from_key_file; use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId}; +use crate::broker::StorageBroker; use crate::endpoint_storage::{ENDPOINT_STORAGE_REMOTE_STORAGE_DIR, EndpointStorage}; use crate::pageserver::{PAGESERVER_REMOTE_STORAGE_DIR, PageServerNode}; use crate::safekeeper::SafekeeperNode; @@ -157,11 +158,16 @@ pub struct EndpointStorageConf { } /// Broker config for cluster internal communication. -#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)] +#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug, Default)] #[serde(default)] pub struct NeonBroker { - /// Broker listen address for storage nodes coordination, e.g. '127.0.0.1:50051'. - pub listen_addr: SocketAddr, + /// Broker listen HTTP address for storage nodes coordination, e.g. '127.0.0.1:50051'. + /// At least one of listen_addr or listen_https_addr must be set. + pub listen_addr: Option, + /// Broker listen HTTPS address for storage nodes coordination, e.g. '127.0.0.1:50051'. + /// At least one of listen_addr or listen_https_addr must be set. + /// listen_https_addr is preferred over listen_addr in neon_local. + pub listen_https_addr: Option, } /// A part of storage controller's config the neon_local knows about. @@ -235,18 +241,19 @@ impl Default for NeonStorageControllerConf { } } -// Dummy Default impl to satisfy Deserialize derive. -impl Default for NeonBroker { - fn default() -> Self { - NeonBroker { - listen_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0), - } - } -} - impl NeonBroker { pub fn client_url(&self) -> Url { - Url::parse(&format!("http://{}", self.listen_addr)).expect("failed to construct url") + let url = if let Some(addr) = self.listen_https_addr { + format!("https://{}", addr) + } else { + format!( + "http://{}", + self.listen_addr + .expect("at least one address should be set") + ) + }; + + Url::parse(&url).expect("failed to construct url") } } @@ -441,6 +448,10 @@ impl LocalEnv { self.base_data_dir.join("endpoints") } + pub fn storage_broker_data_dir(&self) -> PathBuf { + self.base_data_dir.join("storage_broker") + } + pub fn pageserver_data_dir(&self, pageserver_id: NodeId) -> PathBuf { self.base_data_dir .join(format!("pageserver_{pageserver_id}")) @@ -503,6 +514,23 @@ impl LocalEnv { ) } + /// Creates HTTP client with local SSL CA certificates. + pub fn create_http_client(&self) -> reqwest::Client { + let ssl_ca_certs = self.ssl_ca_cert_path().map(|ssl_ca_file| { + let buf = std::fs::read(ssl_ca_file).expect("SSL CA file should exist"); + Certificate::from_pem_bundle(&buf).expect("SSL CA file should be valid") + }); + + let mut http_client = reqwest::Client::builder(); + for ssl_ca_cert in ssl_ca_certs.unwrap_or_default() { + http_client = http_client.add_root_certificate(ssl_ca_cert); + } + + http_client + .build() + .expect("HTTP client should construct with no error") + } + /// Inspect the base data directory and extract the instance id and instance directory path /// for all storage controller instances pub async fn storage_controller_instances(&self) -> std::io::Result> { @@ -911,6 +939,12 @@ impl LocalEnv { // create endpoints dir fs::create_dir_all(env.endpoints_path())?; + // create storage broker dir + fs::create_dir_all(env.storage_broker_data_dir())?; + StorageBroker::from_env(&env) + .initialize() + .context("storage broker init failed")?; + // create safekeeper dirs for safekeeper in &env.safekeepers { fs::create_dir_all(SafekeeperNode::datadir_path_by_id(&env, safekeeper.id))?; diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index b9257a27bf..79e87eba9b 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -21,7 +21,6 @@ use pageserver_api::shard::TenantShardId; use pageserver_client::mgmt_api; use postgres_backend::AuthType; use postgres_connection::{PgConnectionConfig, parse_host_port}; -use reqwest::Certificate; use utils::auth::{Claims, Scope}; use utils::id::{NodeId, TenantId, TimelineId}; use utils::lsn::Lsn; @@ -51,19 +50,6 @@ impl PageServerNode { parse_host_port(&conf.listen_pg_addr).expect("Unable to parse listen_pg_addr"); let port = port.unwrap_or(5432); - let ssl_ca_certs = env.ssl_ca_cert_path().map(|ssl_ca_file| { - let buf = std::fs::read(ssl_ca_file).expect("SSL root CA file should exist"); - Certificate::from_pem_bundle(&buf).expect("SSL CA file should be valid") - }); - - let mut http_client = reqwest::Client::builder(); - for ssl_ca_cert in ssl_ca_certs.unwrap_or_default() { - http_client = http_client.add_root_certificate(ssl_ca_cert); - } - let http_client = http_client - .build() - .expect("Client constructs with no errors"); - let endpoint = if env.storage_controller.use_https_pageserver_api { format!( "https://{}", @@ -80,7 +66,7 @@ impl PageServerNode { conf: conf.clone(), env: env.clone(), http_client: mgmt_api::Client::new( - http_client, + env.create_http_client(), endpoint, { match conf.http_auth_type { diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index 231871852e..948e3c8c93 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -87,7 +87,7 @@ impl SafekeeperNode { conf: conf.clone(), pg_connection_config: Self::safekeeper_connection_config(&listen_addr, conf.pg_port), env: env.clone(), - http_client: reqwest::Client::new(), + http_client: env.create_http_client(), http_base_url: format!("http://{}:{}/v1", listen_addr, conf.http_port), listen_addr, } diff --git a/control_plane/src/storage_controller.rs b/control_plane/src/storage_controller.rs index 62ad5fa8d6..a36815d27e 100644 --- a/control_plane/src/storage_controller.rs +++ b/control_plane/src/storage_controller.rs @@ -20,7 +20,7 @@ use pageserver_api::shard::TenantShardId; use pageserver_client::mgmt_api::ResponseErrorMessageExt; use pem::Pem; use postgres_backend::AuthType; -use reqwest::{Certificate, Method}; +use reqwest::Method; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use tokio::process::Command; @@ -153,24 +153,11 @@ impl StorageController { } }; - let ssl_ca_certs = env.ssl_ca_cert_path().map(|ssl_ca_file| { - let buf = std::fs::read(ssl_ca_file).expect("SSL CA file should exist"); - Certificate::from_pem_bundle(&buf).expect("SSL CA file should be valid") - }); - - let mut http_client = reqwest::Client::builder(); - for ssl_ca_cert in ssl_ca_certs.unwrap_or_default() { - http_client = http_client.add_root_certificate(ssl_ca_cert); - } - let http_client = http_client - .build() - .expect("HTTP client should construct with no error"); - Self { env: env.clone(), private_key, public_key, - client: http_client, + client: env.create_http_client(), config: env.storage_controller.clone(), listen_port: OnceLock::default(), } diff --git a/storage_broker/Cargo.toml b/storage_broker/Cargo.toml index e4db9a317d..67b276c8fe 100644 --- a/storage_broker/Cargo.toml +++ b/storage_broker/Cargo.toml @@ -11,6 +11,7 @@ bench = [] anyhow.workspace = true async-stream.workspace = true bytes.workspace = true +camino.workspace = true clap = { workspace = true, features = ["derive"] } const_format.workspace = true futures.workspace = true @@ -19,12 +20,14 @@ futures-util.workspace = true humantime.workspace = true hyper = { workspace = true, features = ["full"] } http-body-util.workspace = true +http-utils.workspace = true hyper-util = "0.1" once_cell.workspace = true parking_lot.workspace = true prost.workspace = true tonic.workspace = true tokio = { workspace = true, features = ["rt-multi-thread"] } +tokio-rustls.workspace = true tracing.workspace = true metrics.workspace = true utils.workspace = true diff --git a/storage_broker/src/bin/storage_broker.rs b/storage_broker/src/bin/storage_broker.rs index a7e0c986e6..476d5f03ea 100644 --- a/storage_broker/src/bin/storage_broker.rs +++ b/storage_broker/src/bin/storage_broker.rs @@ -17,10 +17,13 @@ use std::pin::Pin; use std::sync::Arc; use std::time::Duration; +use camino::Utf8PathBuf; use clap::{Parser, command}; +use futures::future::OptionFuture; use futures_core::Stream; use futures_util::StreamExt; use http_body_util::Full; +use http_utils::tls_certs::ReloadingCertificateResolver; use hyper::body::Incoming; use hyper::header::CONTENT_TYPE; use hyper::service::service_fn; @@ -38,7 +41,7 @@ use storage_broker::proto::{ FilterTenantTimelineId, MessageType, SafekeeperDiscoveryRequest, SafekeeperDiscoveryResponse, SafekeeperTimelineInfo, SubscribeByFilterRequest, SubscribeSafekeeperInfoRequest, TypedMessage, }; -use storage_broker::{DEFAULT_KEEPALIVE_INTERVAL, DEFAULT_LISTEN_ADDR, parse_proto_ttid}; +use storage_broker::{DEFAULT_KEEPALIVE_INTERVAL, parse_proto_ttid}; use tokio::net::TcpListener; use tokio::sync::broadcast; use tokio::sync::broadcast::error::RecvError; @@ -59,12 +62,25 @@ project_build_tag!(BUILD_TAG); const DEFAULT_CHAN_SIZE: usize = 32; const DEFAULT_ALL_KEYS_CHAN_SIZE: usize = 16384; +const DEFAULT_SSL_KEY_FILE: &str = "server.key"; +const DEFAULT_SSL_CERT_FILE: &str = "server.crt"; +const DEFAULT_SSL_CERT_RELOAD_PERIOD: &str = "60s"; + #[derive(Parser, Debug)] #[command(version = GIT_VERSION, about = "Broker for neon storage nodes communication", long_about = None)] +#[clap(group( + clap::ArgGroup::new("listen-addresses") + .required(true) + .multiple(true) + .args(&["listen_addr", "listen_https_addr"]), +))] struct Args { - /// Endpoint to listen on. - #[arg(short, long, default_value = DEFAULT_LISTEN_ADDR)] - listen_addr: SocketAddr, + /// Endpoint to listen HTTP on. + #[arg(short, long)] + listen_addr: Option, + /// Endpoint to listen HTTPS on. + #[arg(long)] + listen_https_addr: Option, /// Size of the queue to the per timeline subscriber. #[arg(long, default_value_t = DEFAULT_CHAN_SIZE)] timeline_chan_size: usize, @@ -72,11 +88,20 @@ struct Args { #[arg(long, default_value_t = DEFAULT_ALL_KEYS_CHAN_SIZE)] all_keys_chan_size: usize, /// HTTP/2 keepalive interval. - #[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_KEEPALIVE_INTERVAL)] + #[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_KEEPALIVE_INTERVAL)] http2_keepalive_interval: Duration, /// Format for logging, either 'plain' or 'json'. #[arg(long, default_value = "plain")] log_format: String, + /// Path to a file with certificate's private key for https API. + #[arg(long, default_value = DEFAULT_SSL_KEY_FILE)] + ssl_key_file: Utf8PathBuf, + /// Path to a file with a X509 certificate for https API. + #[arg(long, default_value = DEFAULT_SSL_CERT_FILE)] + ssl_cert_file: Utf8PathBuf, + /// Period to reload certificate and private key from files. + #[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_SSL_CERT_RELOAD_PERIOD)] + ssl_cert_reload_period: Duration, } /// Id of publisher for registering in maps @@ -674,12 +699,50 @@ async fn main() -> Result<(), Box> { }; let storage_broker_server = BrokerServiceServer::new(storage_broker_impl); + let http_listener = match &args.listen_addr { + Some(addr) => { + info!("listening HTTP on {}", addr); + Some(TcpListener::bind(addr).await?) + } + None => None, + }; + + let (https_listener, tls_acceptor) = match &args.listen_https_addr { + Some(addr) => { + let listener = TcpListener::bind(addr).await?; + + let cert_resolver = ReloadingCertificateResolver::new( + "main", + &args.ssl_key_file, + &args.ssl_cert_file, + args.ssl_cert_reload_period, + ) + .await?; + + let mut tls_config = rustls::ServerConfig::builder() + .with_no_client_auth() + .with_cert_resolver(cert_resolver); + + // Tonic is HTTP/2 only and it negotiates it with ALPN. + tls_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()]; + + let acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(tls_config)); + + info!("listening HTTPS on {}", addr); + (Some(listener), Some(acceptor)) + } + None => (None, None), + }; + // grpc is served along with http1 for metrics on a single port, hence we // don't use tonic's Server. - let tcp_listener = TcpListener::bind(&args.listen_addr).await?; - info!("listening on {}", &args.listen_addr); loop { - let (stream, addr) = match tcp_listener.accept().await { + let (conn, is_https) = tokio::select! { + Some(conn) = OptionFuture::from(http_listener.as_ref().map(|l| l.accept())) => (conn, false), + Some(conn) = OptionFuture::from(https_listener.as_ref().map(|l| l.accept())) => (conn, true), + }; + + let (tcp_stream, addr) = match conn { Ok(v) => v, Err(e) => { info!("couldn't accept connection: {e}"); @@ -734,13 +797,32 @@ async fn main() -> Result<(), Box> { } .await; + let tls_acceptor = tls_acceptor.clone(); + tokio::task::spawn(async move { - let res = builder - .serve_connection(TokioIo::new(stream), service_fn_) - .await; + let res = if is_https { + let tls_acceptor = + tls_acceptor.expect("tls_acceptor is set together with https_listener"); + + let tls_stream = match tls_acceptor.accept(tcp_stream).await { + Ok(tls_stream) => tls_stream, + Err(e) => { + info!("error accepting TLS connection from {addr}: {e}"); + return; + } + }; + + builder + .serve_connection(TokioIo::new(tls_stream), service_fn_) + .await + } else { + builder + .serve_connection(TokioIo::new(tcp_stream), service_fn_) + .await + }; if let Err(e) = res { - info!("error serving connection from {addr}: {e}"); + info!(%is_https, "error serving connection from {addr}: {e}"); } }); } diff --git a/storage_controller/src/main.rs b/storage_controller/src/main.rs index a924e5b6c5..71dde9e126 100644 --- a/storage_controller/src/main.rs +++ b/storage_controller/src/main.rs @@ -196,7 +196,7 @@ struct Cli { ssl_cert_reload_period: humantime::Duration, /// Trusted root CA certificates to use in https APIs. #[arg(long)] - ssl_ca_file: Option, + ssl_ca_file: Option, /// Neon local specific flag. When set, ignore [`Cli::control_plane_url`] and deliver /// the compute notification directly (instead of via control plane). diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 48aa739ce4..1d668d4b2d 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -501,6 +501,9 @@ class NeonEnvBuilder: # Flag to use https listener in storage controller, generate local ssl certs, # and force pageservers and neon_local to use https for storage controller api. self.use_https_storage_controller_api: bool = False + # Flag to use https listener in storage broker, generate local ssl certs, + # and force pageservers and safekeepers to use https for storage broker api. + self.use_https_storage_broker_api: bool = False self.pageserver_virtual_file_io_engine: str | None = pageserver_virtual_file_io_engine self.pageserver_get_vectored_concurrent_io: str | None = ( @@ -1086,7 +1089,7 @@ class NeonEnv: self.safekeepers: list[Safekeeper] = [] self.pageservers: list[NeonPageserver] = [] self.num_azs = config.num_azs - self.broker = NeonBroker(self) + self.broker = NeonBroker(self, config.use_https_storage_broker_api) self.pageserver_remote_storage = config.pageserver_remote_storage self.safekeepers_remote_storage = config.safekeepers_remote_storage self.pg_version = config.pg_version @@ -1106,6 +1109,7 @@ class NeonEnv: config.use_https_pageserver_api or config.use_https_safekeeper_api or config.use_https_storage_controller_api + or config.use_https_storage_broker_api ) self.ssl_ca_file = ( self.repo_dir.joinpath("rootCA.crt") if self.generate_local_ssl_certs else None @@ -1178,15 +1182,18 @@ class NeonEnv: # Create the neon_local's `NeonLocalInitConf` cfg: dict[str, Any] = { "default_tenant_id": str(self.initial_tenant), - "broker": { - "listen_addr": self.broker.listen_addr(), - }, + "broker": {}, "safekeepers": [], "pageservers": [], "endpoint_storage": {"port": self.port_distributor.get_port()}, "generate_local_ssl_certs": self.generate_local_ssl_certs, } + if config.use_https_storage_broker_api: + cfg["broker"]["listen_https_addr"] = self.broker.listen_addr() + else: + cfg["broker"]["listen_addr"] = self.broker.listen_addr() + if self.control_plane_api is not None: cfg["control_plane_api"] = self.control_plane_api @@ -4933,9 +4940,10 @@ class Safekeeper(LogUtils): class NeonBroker(LogUtils): """An object managing storage_broker instance""" - def __init__(self, env: NeonEnv): - super().__init__(logfile=env.repo_dir / "storage_broker.log") + def __init__(self, env: NeonEnv, use_https: bool): + super().__init__(logfile=env.repo_dir / "storage_broker" / "storage_broker.log") self.env = env + self.scheme = "https" if use_https else "http" self.port: int = self.env.port_distributor.get_port() self.running = False @@ -4958,7 +4966,7 @@ class NeonBroker(LogUtils): return f"127.0.0.1:{self.port}" def client_url(self): - return f"http://{self.listen_addr()}" + return f"{self.scheme}://{self.listen_addr()}" def assert_no_errors(self): assert_no_errors(self.logfile, "storage_controller", []) diff --git a/test_runner/regress/test_ssl.py b/test_runner/regress/test_ssl.py index 39c94c05a9..62879834c3 100644 --- a/test_runner/regress/test_ssl.py +++ b/test_runner/regress/test_ssl.py @@ -6,6 +6,7 @@ import pytest import requests from fixtures.neon_fixtures import NeonEnvBuilder, StorageControllerApiException from fixtures.utils import wait_until +from fixtures.workload import Workload def test_pageserver_https_api(neon_env_builder: NeonEnvBuilder): @@ -212,3 +213,24 @@ def test_server_and_cert_metrics(neon_env_builder: NeonEnvBuilder): assert reload_error_cnt > 0 wait_until(reload_failed) + + +def test_storage_broker_https_api(neon_env_builder: NeonEnvBuilder): + """ + Test HTTPS storage broker API. + 1. Make /status request to HTTPS API to ensure it's appropriately configured. + 2. Generate simple workload to ensure that SK -> broker -> PS communication works well. + """ + neon_env_builder.use_https_storage_broker_api = True + env = neon_env_builder.init_start() + + # 1. Simple check that HTTPS is enabled and works. + url = env.broker.client_url() + "/status" + assert url.startswith("https://") + requests.get(url, verify=str(env.ssl_ca_file)).raise_for_status() + + # 2. Simple workload to check that SK -> broker -> PS communication works over HTTPS. + workload = Workload(env, env.initial_tenant, env.initial_timeline) + workload.init() + workload.write_rows(10) + workload.validate() From 902d36110719a0904fd73fe91e72161a7fd3895c Mon Sep 17 00:00:00 2001 From: StepSecurity Bot Date: Fri, 25 Apr 2025 07:36:45 -0700 Subject: [PATCH 4/7] CI/CD Hardening: Fixing StepSecurity Flagged Issues (#11724) ### Summary I'm fixing one or more of the following CI/CD misconfigurations to improve security. Please feel free to leave a comment if you think the current permissions for the GITHUB_TOKEN should not be restricted so I can take a note of it as accepted behaviour. - Restrict permissions for GITHUB_TOKEN - Add step-security/harden-runner - Pin Actions to a full length commit SHA ### Security Fixes will fix https://github.com/neondatabase/cloud/issues/26141 --- .github/workflows/check-permissions.yml | 2 +- .github/workflows/cleanup-caches-by-a-branch.yml | 2 +- .github/workflows/fast-forward.yml | 2 +- .github/workflows/label-for-external-users.yml | 4 ++-- .github/workflows/pin-build-tools-image.yml | 2 +- .github/workflows/trigger-e2e-tests.yml | 4 ++-- 6 files changed, 8 insertions(+), 8 deletions(-) diff --git a/.github/workflows/check-permissions.yml b/.github/workflows/check-permissions.yml index 407f612887..a61a37ea4c 100644 --- a/.github/workflows/check-permissions.yml +++ b/.github/workflows/check-permissions.yml @@ -19,7 +19,7 @@ jobs: runs-on: ubuntu-22.04 steps: - name: Harden the runner (Audit all outbound calls) - uses: step-security/harden-runner@v2 + uses: step-security/harden-runner@0634a2670c59f64b4a01f0f96f84700a4088b9f0 # v2.12.0 with: egress-policy: audit diff --git a/.github/workflows/cleanup-caches-by-a-branch.yml b/.github/workflows/cleanup-caches-by-a-branch.yml index 3608d8b074..abac0d95e4 100644 --- a/.github/workflows/cleanup-caches-by-a-branch.yml +++ b/.github/workflows/cleanup-caches-by-a-branch.yml @@ -12,7 +12,7 @@ jobs: runs-on: ubuntu-22.04 steps: - name: Harden the runner (Audit all outbound calls) - uses: step-security/harden-runner@v2 + uses: step-security/harden-runner@0634a2670c59f64b4a01f0f96f84700a4088b9f0 # v2.12.0 with: egress-policy: audit diff --git a/.github/workflows/fast-forward.yml b/.github/workflows/fast-forward.yml index f80596a7a6..22dacc429f 100644 --- a/.github/workflows/fast-forward.yml +++ b/.github/workflows/fast-forward.yml @@ -14,7 +14,7 @@ jobs: steps: - name: Harden the runner (Audit all outbound calls) - uses: step-security/harden-runner@v2 + uses: step-security/harden-runner@0634a2670c59f64b4a01f0f96f84700a4088b9f0 # v2.12.0 with: egress-policy: audit diff --git a/.github/workflows/label-for-external-users.yml b/.github/workflows/label-for-external-users.yml index 02d128179d..f9daa19ad9 100644 --- a/.github/workflows/label-for-external-users.yml +++ b/.github/workflows/label-for-external-users.yml @@ -28,7 +28,7 @@ jobs: steps: - name: Harden the runner (Audit all outbound calls) - uses: step-security/harden-runner@v2 + uses: step-security/harden-runner@0634a2670c59f64b4a01f0f96f84700a4088b9f0 # v2.12.0 with: egress-policy: audit @@ -75,7 +75,7 @@ jobs: steps: - name: Harden the runner (Audit all outbound calls) - uses: step-security/harden-runner@v2 + uses: step-security/harden-runner@0634a2670c59f64b4a01f0f96f84700a4088b9f0 # v2.12.0 with: egress-policy: audit diff --git a/.github/workflows/pin-build-tools-image.yml b/.github/workflows/pin-build-tools-image.yml index f8d8172cb0..82bbc722a7 100644 --- a/.github/workflows/pin-build-tools-image.yml +++ b/.github/workflows/pin-build-tools-image.yml @@ -41,7 +41,7 @@ jobs: steps: - name: Harden the runner (Audit all outbound calls) - uses: step-security/harden-runner@v2 + uses: step-security/harden-runner@0634a2670c59f64b4a01f0f96f84700a4088b9f0 # v2.12.0 with: egress-policy: audit diff --git a/.github/workflows/trigger-e2e-tests.yml b/.github/workflows/trigger-e2e-tests.yml index ca4c465931..a951b4b258 100644 --- a/.github/workflows/trigger-e2e-tests.yml +++ b/.github/workflows/trigger-e2e-tests.yml @@ -35,7 +35,7 @@ jobs: steps: - name: Harden the runner (Audit all outbound calls) - uses: step-security/harden-runner@v2 + uses: step-security/harden-runner@0634a2670c59f64b4a01f0f96f84700a4088b9f0 # v2.12.0 with: egress-policy: audit @@ -73,7 +73,7 @@ jobs: }} steps: - name: Harden the runner (Audit all outbound calls) - uses: step-security/harden-runner@v2 + uses: step-security/harden-runner@0634a2670c59f64b4a01f0f96f84700a4088b9f0 # v2.12.0 with: egress-policy: audit From 459d51974c772a3eb7565e0a61e74251ae5c594d Mon Sep 17 00:00:00 2001 From: Lokesh Date: Fri, 25 Apr 2025 21:46:40 +0200 Subject: [PATCH 5/7] doc: minor updates to consumption-metrics document (#7153) ## Problem Proposed minor changes to the `consumption_metrics` document. ## Summary of changes - Fixed minor typos in the document. - Minor formatting in the description of metrics `timeline_logical_size` and `synthetic_storage_size`. Makes this consistent as with description of other metrics in the document. ## Checklist before requesting a review - [x] I have performed a self-review of my code. - [ ] If it is a core feature, I have added thorough tests. - [ ] Do we need to implement analytics? if so did you add the relevant metrics to the dashboard? - [ ] If this PR requires public announcement, mark it with /release-notes label and add several sentences in this section. ## Checklist before merging - [ ] Do not forget to reformat commit message to not include the above checklist Co-authored-by: Mikhail Kot --- docs/consumption_metrics.md | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/docs/consumption_metrics.md b/docs/consumption_metrics.md index dd364f8750..6bcd28ab10 100644 --- a/docs/consumption_metrics.md +++ b/docs/consumption_metrics.md @@ -13,7 +13,7 @@ For design details see [the RFC](./rfcs/021-metering.md) and [the discussion on batch format is ```json -{ "events" : [metric1, metric2, ...]]} +{ "events" : [metric1, metric2, ...] } ``` See metric format examples below. @@ -49,11 +49,13 @@ Size of the remote storage (S3) directory. This is an absolute, per-tenant metric. - `timeline_logical_size` -Logical size of the data in the timeline + +Logical size of the data in the timeline. This is an absolute, per-timeline metric. - `synthetic_storage_size` -Size of all tenant's branches including WAL + +Size of all tenant's branches including WAL. This is the same metric that `tenant/{tenant_id}/size` endpoint returns. This is an absolute, per-tenant metric. @@ -106,10 +108,10 @@ This is an incremental, per-endpoint metric. ``` The metric is incremental, so the value is the difference between the current and the previous value. -If there is no previous value, the value, the value is the current value and the `start_time` equals `stop_time`. +If there is no previous value, the value is the current value and the `start_time` equals `stop_time`. ### TODO - [ ] Handle errors better: currently if one tenant fails to gather metrics, the whole iteration fails and metrics are not sent for any tenant. - [ ] Add retries -- [ ] Tune the interval \ No newline at end of file +- [ ] Tune the interval From 97e01ae6fdd6c1b5ac0405b15e9f467076b6b2de Mon Sep 17 00:00:00 2001 From: Alexander Lakhin Date: Mon, 28 Apr 2025 07:04:37 +0300 Subject: [PATCH 6/7] Add workflow to run particular test(s) N times (#11050) ## Problem Provide an easy way to run particular test(s) N times on CI. ## Summary of changes * Allow for passing the test selection and the number of test runs to the existing "Build and Test Locally" workflow * Allow for running multiple selected tests by the "Pytest regression tests" step * Introduce a new workflow to run specified test(s) several times * Store results in a separate database to distinguish between testing tests for stability and usual testing --- .github/workflows/_build-and-test-locally.yml | 15 ++- .../workflows/build_and_run_selected_test.yml | 120 ++++++++++++++++++ 2 files changed, 133 insertions(+), 2 deletions(-) create mode 100644 .github/workflows/build_and_run_selected_test.yml diff --git a/.github/workflows/_build-and-test-locally.yml b/.github/workflows/_build-and-test-locally.yml index e31d3dec5b..4f7d6026f2 100644 --- a/.github/workflows/_build-and-test-locally.yml +++ b/.github/workflows/_build-and-test-locally.yml @@ -28,6 +28,16 @@ on: required: false default: 'disabled' type: string + test-selection: + description: 'specification of selected test(s) to run' + required: false + default: '' + type: string + test-run-count: + description: 'number of runs to perform for selected tests' + required: false + default: 1 + type: number defaults: run: @@ -381,14 +391,15 @@ jobs: run_with_real_s3: true real_s3_bucket: neon-github-ci-tests real_s3_region: eu-central-1 - rerun_failed: true + rerun_failed: ${{ inputs.test-run-count == 1 }} pg_version: ${{ matrix.pg_version }} sanitizers: ${{ inputs.sanitizers }} aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }} # `--session-timeout` is equal to (timeout-minutes - 10 minutes) * 60 seconds. # Attempt to stop tests gracefully to generate test reports # until they are forcibly stopped by the stricter `timeout-minutes` limit. - extra_params: --session-timeout=${{ inputs.sanitizers != 'enabled' && 3000 || 10200 }} + extra_params: --session-timeout=${{ inputs.sanitizers != 'enabled' && 3000 || 10200 }} --count=${{ inputs.test-run-count }} + ${{ inputs.test-selection != '' && format('-k "{0}"', inputs.test-selection) || '' }} env: TEST_RESULT_CONNSTR: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }} CHECK_ONDISK_DATA_COMPATIBILITY: nonempty diff --git a/.github/workflows/build_and_run_selected_test.yml b/.github/workflows/build_and_run_selected_test.yml new file mode 100644 index 0000000000..f22fe310ab --- /dev/null +++ b/.github/workflows/build_and_run_selected_test.yml @@ -0,0 +1,120 @@ +name: Build and Run Selected Test + +on: + workflow_dispatch: + inputs: + test-selection: + description: 'Specification of selected test(s), as accepted by pytest -k' + required: true + type: string + run-count: + description: 'Number of test runs to perform' + required: true + type: number + archs: + description: 'Archs to run tests on, e. g.: ["x64", "arm64"]' + default: '["x64"]' + required: true + type: string + build-types: + description: 'Build types to run tests on, e. g.: ["debug", "release"]' + default: '["release"]' + required: true + type: string + pg-versions: + description: 'Postgres versions to use for testing, e.g,: [{"pg_version":"v16"}, {"pg_version":"v17"}])' + default: '[{"pg_version":"v17"}]' + required: true + type: string + +defaults: + run: + shell: bash -euxo pipefail {0} + +env: + RUST_BACKTRACE: 1 + COPT: '-Werror' + +jobs: + meta: + uses: ./.github/workflows/_meta.yml + with: + github-event-name: ${{ github.event_name }} + github-event-json: ${{ toJSON(github.event) }} + + build-and-test-locally: + needs: [ meta ] + strategy: + fail-fast: false + matrix: + arch: ${{ fromJson(inputs.archs) }} + build-type: ${{ fromJson(inputs.build-types) }} + uses: ./.github/workflows/_build-and-test-locally.yml + with: + arch: ${{ matrix.arch }} + build-tools-image: ghcr.io/neondatabase/build-tools:pinned-bookworm + build-tag: ${{ needs.meta.outputs.build-tag }} + build-type: ${{ matrix.build-type }} + test-cfg: ${{ inputs.pg-versions }} + test-selection: ${{ inputs.test-selection }} + test-run-count: ${{ fromJson(inputs.run-count) }} + secrets: inherit + + create-test-report: + needs: [ build-and-test-locally ] + if: ${{ !cancelled() }} + permissions: + id-token: write # aws-actions/configure-aws-credentials + statuses: write + contents: write + pull-requests: write + outputs: + report-url: ${{ steps.create-allure-report.outputs.report-url }} + + runs-on: [ self-hosted, small ] + container: + image: ghcr.io/neondatabase/build-tools:pinned-bookworm + credentials: + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + options: --init + + steps: + - name: Harden the runner (Audit all outbound calls) + uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0 + with: + egress-policy: audit + + - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 + + - name: Create Allure report + if: ${{ !cancelled() }} + id: create-allure-report + uses: ./.github/actions/allure-report-generate + with: + store-test-results-into-db: true + aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }} + env: + REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_DEV }} + + - uses: actions/github-script@v7 + if: ${{ !cancelled() }} + with: + # Retry script for 5XX server errors: https://github.com/actions/github-script#retries + retries: 5 + script: | + const report = { + reportUrl: "${{ steps.create-allure-report.outputs.report-url }}", + reportJsonUrl: "${{ steps.create-allure-report.outputs.report-json-url }}", + } + + const coverage = {} + + const script = require("./scripts/comment-test-report.js") + await script({ + github, + context, + fetch, + report, + coverage, + }) From b8d47b5acf70f50f97e2f65e680582a8e661ceb5 Mon Sep 17 00:00:00 2001 From: a-masterov <72613290+a-masterov@users.noreply.github.com> Date: Mon, 28 Apr 2025 10:13:49 +0200 Subject: [PATCH 7/7] Run the extensions' tests on staging (#11164) ## Problem We currently don't run end-to-end tests for PostgreSQL extensions on our cloud infrastructure, which means we might miss problems that only occur in a real cloud environment. ## Summary of changes - Added a workflow to run extension tests against a cloud staging instance - Set up proper project configuration for extension testing - Implemented test execution with appropriate environment settings - Added error handling and reporting for test failures --------- Co-authored-by: Alexander Bayandin Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> --- .../actions/neon-project-create/action.yml | 34 +- .github/workflows/cloud-extensions.yml | 112 +++++++ compute/compute-node.Dockerfile | 4 +- docker-compose/docker_compose_test.sh | 2 +- docker-compose/ext-src/README.md | 99 ++++++ .../ext-src/hll-src/regular-test.sh | 7 + .../ext-src/hypopg-src/regular-test.sh | 7 + .../ext-src/ip4r-src/regular-test.sh | 7 + .../ext-src/pg_cron-src/regular-test.sh | 7 + .../ext-src/pg_graphql-src/regular-test.sh | 23 ++ .../ext-src/pg_hint_plan-src/regular-test.sh | 7 + .../ext-src/pg_ivm-src/regular-test.sh | 9 + .../ext-src/pg_ivm-src/regular.patch | 309 ++++++++++++++++++ .../ext-src/pg_jsonschema-src/Makefile | 9 +- .../pg_roaringbitmap-src/regular-test.sh | 7 + .../ext-src/pg_semver-src/regular-test.sh | 12 + .../ext-src/pg_session_jwt-src/Makefile | 8 +- .../ext-src/pg_tiktoken-src/Makefile | 4 +- .../ext-src/pg_uuidv7-src/regular-test.sh | 7 + docker-compose/ext-src/pgjwt-src/neon-test.sh | 4 +- .../ext-src/pgrag-src/regular-test.sh | 8 + .../ext-src/pgtap-src/regular-test.sh | 10 + .../ext-src/pgvector-src/regular-test.sh | 8 + docker-compose/ext-src/pgx_ulid-src/Makefile | 14 +- .../ext-src/plv8-src/regular-test.sh | 12 + .../postgresql-unit-src/regular-test.sh | 7 + .../ext-src/prefix-src/regular-test.sh | 7 + .../ext-src/rag_bge_small_en_v15-src/Makefile | 9 +- .../rag_jina_reranker_v1_tiny_en-src/Makefile | 9 +- .../expected/reranking_functions.out | 36 +- .../expected/reranking_functions_enhanced.out | 104 +++--- .../sql/reranking_functions.sql | 10 +- .../sql/reranking_functions_enhanced.sql | 32 +- .../ext-src/rum-src/regular-test.sh | 7 + docker-compose/run-tests.sh | 44 +++ 35 files changed, 889 insertions(+), 106 deletions(-) create mode 100644 .github/workflows/cloud-extensions.yml create mode 100644 docker-compose/ext-src/README.md create mode 100755 docker-compose/ext-src/hll-src/regular-test.sh create mode 100755 docker-compose/ext-src/hypopg-src/regular-test.sh create mode 100755 docker-compose/ext-src/ip4r-src/regular-test.sh create mode 100755 docker-compose/ext-src/pg_cron-src/regular-test.sh create mode 100755 docker-compose/ext-src/pg_graphql-src/regular-test.sh create mode 100755 docker-compose/ext-src/pg_hint_plan-src/regular-test.sh create mode 100755 docker-compose/ext-src/pg_ivm-src/regular-test.sh create mode 100644 docker-compose/ext-src/pg_ivm-src/regular.patch create mode 100755 docker-compose/ext-src/pg_roaringbitmap-src/regular-test.sh create mode 100755 docker-compose/ext-src/pg_semver-src/regular-test.sh create mode 100755 docker-compose/ext-src/pg_uuidv7-src/regular-test.sh create mode 100755 docker-compose/ext-src/pgrag-src/regular-test.sh create mode 100755 docker-compose/ext-src/pgtap-src/regular-test.sh create mode 100755 docker-compose/ext-src/pgvector-src/regular-test.sh create mode 100755 docker-compose/ext-src/plv8-src/regular-test.sh create mode 100755 docker-compose/ext-src/postgresql-unit-src/regular-test.sh create mode 100755 docker-compose/ext-src/prefix-src/regular-test.sh create mode 100755 docker-compose/ext-src/rum-src/regular-test.sh mode change 100644 => 100755 docker-compose/run-tests.sh diff --git a/.github/actions/neon-project-create/action.yml b/.github/actions/neon-project-create/action.yml index a393aa6106..a5b4104908 100644 --- a/.github/actions/neon-project-create/action.yml +++ b/.github/actions/neon-project-create/action.yml @@ -49,6 +49,10 @@ inputs: description: 'A JSON object with project settings' required: false default: '{}' + default_endpoint_settings: + description: 'A JSON object with the default endpoint settings' + required: false + default: '{}' outputs: dsn: @@ -66,9 +70,9 @@ runs: # A shell without `set -x` to not to expose password/dsn in logs shell: bash -euo pipefail {0} run: | - project=$(curl \ + res=$(curl \ "https://${API_HOST}/api/v2/projects" \ - --fail \ + -w "%{http_code}" \ --header "Accept: application/json" \ --header "Content-Type: application/json" \ --header "Authorization: Bearer ${API_KEY}" \ @@ -83,6 +87,15 @@ runs: \"settings\": ${PROJECT_SETTINGS} } }") + + code=${res: -3} + if [[ ${code} -ge 400 ]]; then + echo Request failed with error code ${code} + echo ${res::-3} + exit 1 + else + project=${res::-3} + fi # Mask password echo "::add-mask::$(echo $project | jq --raw-output '.roles[] | select(.name != "web_access") | .password')" @@ -126,6 +139,22 @@ runs: -H "Accept: application/json" -H "Content-Type: application/json" -H "Authorization: Bearer ${ADMIN_API_KEY}" \ -d "{\"scheduling\": \"Essential\"}" fi + # XXX + # This is a workaround for the default endpoint settings, which currently do not allow some settings in the public API. + # https://github.com/neondatabase/cloud/issues/27108 + if [[ -n ${DEFAULT_ENDPOINT_SETTINGS} && ${DEFAULT_ENDPOINT_SETTINGS} != "{}" ]] ; then + PROJECT_DATA=$(curl -X GET \ + "https://${API_HOST}/regions/${REGION_ID}/api/v1/admin/projects/${project_id}" \ + -H "Accept: application/json" -H "Content-Type: application/json" -H "Authorization: Bearer ${ADMIN_API_KEY}" \ + -d "{\"scheduling\": \"Essential\"}" + ) + NEW_DEFAULT_ENDPOINT_SETTINGS=$(echo ${PROJECT_DATA} | jq -rc ".project.default_endpoint_settings + ${DEFAULT_ENDPOINT_SETTINGS}") + curl -X POST --fail \ + "https://${API_HOST}/regions/${REGION_ID}/api/v1/admin/projects/${project_id}/default_endpoint_settings" \ + -H "Accept: application/json" -H "Content-Type: application/json" -H "Authorization: Bearer ${ADMIN_API_KEY}" \ + --data "${NEW_DEFAULT_ENDPOINT_SETTINGS}" + fi + env: API_HOST: ${{ inputs.api_host }} @@ -142,3 +171,4 @@ runs: PSQL: ${{ inputs.psql_path }} LD_LIBRARY_PATH: ${{ inputs.libpq_lib_path }} PROJECT_SETTINGS: ${{ inputs.project_settings }} + DEFAULT_ENDPOINT_SETTINGS: ${{ inputs.default_endpoint_settings }} diff --git a/.github/workflows/cloud-extensions.yml b/.github/workflows/cloud-extensions.yml new file mode 100644 index 0000000000..7d60469f92 --- /dev/null +++ b/.github/workflows/cloud-extensions.yml @@ -0,0 +1,112 @@ +name: Cloud Extensions Test +on: + schedule: + # * is a special character in YAML so you have to quote this string + # ┌───────────── minute (0 - 59) + # │ ┌───────────── hour (0 - 23) + # │ │ ┌───────────── day of the month (1 - 31) + # │ │ │ ┌───────────── month (1 - 12 or JAN-DEC) + # │ │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT) + - cron: '45 1 * * *' # run once a day, timezone is utc + workflow_dispatch: # adds ability to run this manually + inputs: + region_id: + description: 'Project region id. If not set, the default region will be used' + required: false + default: 'aws-us-east-2' + +defaults: + run: + shell: bash -euxo pipefail {0} + +permissions: + id-token: write # aws-actions/configure-aws-credentials + statuses: write + contents: write + +jobs: + regress: + env: + POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install + TEST_OUTPUT: /tmp/test_output + BUILD_TYPE: remote + strategy: + fail-fast: false + matrix: + pg-version: [16, 17] + + runs-on: [ self-hosted, small ] + container: + # We use the neon-test-extensions image here as it contains the source code for the extensions. + image: ghcr.io/neondatabase/neon-test-extensions-v${{ matrix.pg-version }}:latest + credentials: + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + options: --init + + steps: + - name: Harden the runner (Audit all outbound calls) + uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0 + with: + egress-policy: audit + + - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 + + - name: Evaluate the settings + id: project-settings + run: | + if [[ $((${{ matrix.pg-version }})) -lt 17 ]]; then + ULID=ulid + else + ULID=pgx_ulid + fi + LIBS=timescaledb:rag_bge_small_en_v15,rag_jina_reranker_v1_tiny_en:$ULID + settings=$(jq -c -n --arg libs $LIBS '{preload_libraries:{use_defaults:false,enabled_libraries:($libs| split(":"))}}') + echo settings=$settings >> $GITHUB_OUTPUT + + - name: Create Neon Project + id: create-neon-project + uses: ./.github/actions/neon-project-create + with: + region_id: ${{ inputs.region_id }} + postgres_version: ${{ matrix.pg-version }} + project_settings: ${{ steps.project-settings.outputs.settings }} + # We need these settings to get the expected output results. + # We cannot use the environment variables e.g. PGTZ due to + # https://github.com/neondatabase/neon/issues/1287 + default_endpoint_settings: > + { + "pg_settings": { + "DateStyle": "Postgres,MDY", + "TimeZone": "America/Los_Angeles", + "compute_query_id": "off", + "neon.allow_unstable_extensions": "on" + } + } + api_key: ${{ secrets.NEON_STAGING_API_KEY }} + admin_api_key: ${{ secrets.NEON_STAGING_ADMIN_API_KEY }} + + - name: Run the regression tests + run: /run-tests.sh -r /ext-src + env: + BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }} + SKIP: "pg_hint_plan-src,pg_repack-src,pg_cron-src,plpgsql_check-src" + + - name: Delete Neon Project + if: ${{ always() }} + uses: ./.github/actions/neon-project-delete + with: + project_id: ${{ steps.create-neon-project.outputs.project_id }} + api_key: ${{ secrets.NEON_STAGING_API_KEY }} + + - name: Post to a Slack channel + if: ${{ github.event.schedule && failure() }} + uses: slackapi/slack-github-action@fcfb566f8b0aab22203f066d80ca1d7e4b5d05b3 # v1.27.1 + with: + channel-id: ${{ vars.SLACK_ON_CALL_QA_STAGING_STREAM }} + slack-message: | + Periodic extensions test on staging: ${{ job.status }} + <${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run> + env: + SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }} + diff --git a/compute/compute-node.Dockerfile b/compute/compute-node.Dockerfile index d8db627521..b9299eee90 100644 --- a/compute/compute-node.Dockerfile +++ b/compute/compute-node.Dockerfile @@ -1800,8 +1800,8 @@ COPY compute/patches/pg_repack.patch /ext-src RUN cd /ext-src/pg_repack-src && patch -p1