diff --git a/.dockerignore b/.dockerignore index 8b378b5dab..f7a6232ba1 100644 --- a/.dockerignore +++ b/.dockerignore @@ -22,6 +22,7 @@ !s3_scrubber/ !safekeeper/ !storage_broker/ +!storage_controller/ !trace/ !vendor/postgres-*/ !workspace_hack/ diff --git a/CODEOWNERS b/CODEOWNERS index 9a23e8c958..af2fa6088e 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -1,5 +1,5 @@ /compute_tools/ @neondatabase/control-plane @neondatabase/compute -/control_plane/attachment_service @neondatabase/storage +/storage_controller @neondatabase/storage /libs/pageserver_api/ @neondatabase/storage /libs/postgres_ffi/ @neondatabase/compute @neondatabase/safekeepers /libs/remote_storage/ @neondatabase/storage diff --git a/Cargo.lock b/Cargo.lock index 7fef2ebf22..dae406e4ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -270,45 +270,6 @@ dependencies = [ "critical-section", ] -[[package]] -name = "attachment_service" -version = "0.1.0" -dependencies = [ - "anyhow", - "aws-config", - "bytes", - "camino", - "clap", - "control_plane", - "diesel", - "diesel_migrations", - "fail", - "futures", - "git-version", - "hex", - "humantime", - "hyper", - "itertools", - "lasso", - "measured", - "metrics", - "once_cell", - "pageserver_api", - "pageserver_client", - "postgres_connection", - "r2d2", - "reqwest", - "routerify", - "serde", - "serde_json", - "thiserror", - "tokio", - "tokio-util", - "tracing", - "utils", - "workspace_hack", -] - [[package]] name = "autocfg" version = "1.1.0" @@ -5623,6 +5584,45 @@ dependencies = [ "workspace_hack", ] +[[package]] +name = "storage_controller" +version = "0.1.0" +dependencies = [ + "anyhow", + "aws-config", + "bytes", + "camino", + "clap", + "control_plane", + "diesel", + "diesel_migrations", + "fail", + "futures", + "git-version", + "hex", + "humantime", + "hyper", + "itertools", + "lasso", + "measured", + "metrics", + "once_cell", + "pageserver_api", + "pageserver_client", + "postgres_connection", + "r2d2", + "reqwest", + "routerify", + "serde", + "serde_json", + "thiserror", + "tokio", + "tokio-util", + "tracing", + "utils", + "workspace_hack", +] + [[package]] name = "storcon_cli" version = "0.1.0" @@ -5799,23 +5799,23 @@ dependencies = [ [[package]] name = "test-context" -version = "0.1.4" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "055831a02a4f5aa28fede67f2902014273eb8c21b958ac5ebbd59b71ef30dbc3" +checksum = "6676ab8513edfd2601a108621103fdb45cac9098305ca25ec93f7023b06b05d9" dependencies = [ - "async-trait", "futures", "test-context-macros", ] [[package]] name = "test-context-macros" -version = "0.1.4" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8901a55b0a7a06ebc4a674dcca925170da8e613fa3b163a1df804ed10afb154d" +checksum = "78ea17a2dc368aeca6f554343ced1b1e31f76d63683fa8016e5844bd7a5144a1" dependencies = [ + "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.52", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 9f24176c65..3c6077648e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,6 @@ resolver = "2" members = [ "compute_tools", "control_plane", - "control_plane/attachment_service", "control_plane/storcon_cli", "pageserver", "pageserver/compaction", @@ -13,6 +12,7 @@ members = [ "proxy", "safekeeper", "storage_broker", + "storage_controller", "s3_scrubber", "workspace_hack", "trace", @@ -159,7 +159,7 @@ svg_fmt = "0.4.1" sync_wrapper = "0.1.2" tar = "0.4" task-local-extensions = "0.1.4" -test-context = "0.1" +test-context = "0.3" thiserror = "1.0" tikv-jemallocator = "0.5" tikv-jemalloc-ctl = "0.5" diff --git a/control_plane/storcon_cli/src/main.rs b/control_plane/storcon_cli/src/main.rs index f72bc9a2a9..2edd09eac1 100644 --- a/control_plane/storcon_cli/src/main.rs +++ b/control_plane/storcon_cli/src/main.rs @@ -223,7 +223,7 @@ impl Client { } } - /// Simple HTTP request wrapper for calling into attachment service + /// Simple HTTP request wrapper for calling into storage controller async fn dispatch( &self, method: hyper::Method, diff --git a/diesel.toml b/diesel.toml index 30ed4444d7..558c54a1e1 100644 --- a/diesel.toml +++ b/diesel.toml @@ -2,8 +2,8 @@ # see https://diesel.rs/guides/configuring-diesel-cli [print_schema] -file = "control_plane/attachment_service/src/schema.rs" +file = "storage_controller/src/schema.rs" custom_type_derives = ["diesel::query_builder::QueryId"] [migrations_directory] -dir = "control_plane/attachment_service/migrations" +dir = "storage_controller/migrations" diff --git a/docs/sourcetree.md b/docs/sourcetree.md index 12fa80349e..3732bfdab2 100644 --- a/docs/sourcetree.md +++ b/docs/sourcetree.md @@ -7,6 +7,11 @@ Below you will find a brief overview of each subdir in the source tree in alphab Neon storage broker, providing messaging between safekeepers and pageservers. [storage_broker.md](./storage_broker.md) +`storage_controller`: + +Neon storage controller, manages a cluster of pageservers and exposes an API that enables +managing a many-sharded tenant as a single entity. + `/control_plane`: Local control plane. diff --git a/libs/pageserver_api/src/controller_api.rs b/libs/pageserver_api/src/controller_api.rs index be24d452b6..1278f17ad2 100644 --- a/libs/pageserver_api/src/controller_api.rs +++ b/libs/pageserver_api/src/controller_api.rs @@ -2,7 +2,7 @@ use std::str::FromStr; /// Request/response types for the storage controller /// API (`/control/v1` prefix). Implemented by the server -/// in [`attachment_service::http`] +/// in [`storage_controller::http`] use serde::{Deserialize, Serialize}; use utils::id::{NodeId, TenantId}; diff --git a/libs/remote_storage/tests/test_real_azure.rs b/libs/remote_storage/tests/test_real_azure.rs index 6adddf52a9..6aa02868e6 100644 --- a/libs/remote_storage/tests/test_real_azure.rs +++ b/libs/remote_storage/tests/test_real_azure.rs @@ -57,7 +57,6 @@ enum MaybeEnabledStorage { Disabled, } -#[async_trait::async_trait] impl AsyncTestContext for MaybeEnabledStorage { async fn setup() -> Self { ensure_logging_ready(); @@ -86,7 +85,6 @@ struct AzureWithTestBlobs { remote_blobs: HashSet, } -#[async_trait::async_trait] impl AsyncTestContext for MaybeEnabledStorageWithTestBlobs { async fn setup() -> Self { ensure_logging_ready(); @@ -148,7 +146,6 @@ struct AzureWithSimpleTestBlobs { remote_blobs: HashSet, } -#[async_trait::async_trait] impl AsyncTestContext for MaybeEnabledStorageWithSimpleTestBlobs { async fn setup() -> Self { ensure_logging_ready(); diff --git a/libs/remote_storage/tests/test_real_s3.rs b/libs/remote_storage/tests/test_real_s3.rs index bc5e40e70f..c5d5216f00 100644 --- a/libs/remote_storage/tests/test_real_s3.rs +++ b/libs/remote_storage/tests/test_real_s3.rs @@ -219,7 +219,6 @@ enum MaybeEnabledStorage { Disabled, } -#[async_trait::async_trait] impl AsyncTestContext for MaybeEnabledStorage { async fn setup() -> Self { ensure_logging_ready(); @@ -248,7 +247,6 @@ struct S3WithTestBlobs { remote_blobs: HashSet, } -#[async_trait::async_trait] impl AsyncTestContext for MaybeEnabledStorageWithTestBlobs { async fn setup() -> Self { ensure_logging_ready(); @@ -310,7 +308,6 @@ struct S3WithSimpleTestBlobs { remote_blobs: HashSet, } -#[async_trait::async_trait] impl AsyncTestContext for MaybeEnabledStorageWithSimpleTestBlobs { async fn setup() -> Self { ensure_logging_ready(); diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 26cab9338b..0903b206ff 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -18,6 +18,7 @@ use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING}; use pageserver::task_mgr::WALRECEIVER_RUNTIME; use pageserver::tenant::{secondary, TenantSharedResources}; use remote_storage::GenericRemoteStorage; +use tokio::signal::unix::SignalKind; use tokio::time::Instant; use tracing::*; @@ -671,46 +672,37 @@ fn start_pageserver( let mut shutdown_pageserver = Some(shutdown_pageserver.drop_guard()); // All started up! Now just sit and wait for shutdown signal. - { - use signal_hook::consts::*; - let signal_handler = BACKGROUND_RUNTIME.spawn_blocking(move || { - let mut signals = - signal_hook::iterator::Signals::new([SIGINT, SIGTERM, SIGQUIT]).unwrap(); - return signals - .forever() - .next() - .expect("forever() never returns None unless explicitly closed"); - }); - let signal = BACKGROUND_RUNTIME - // NB: in `NEON_PAGESERVER_USE_ONE_RUNTIME=current_thread`, this - // is where the executor is actually driven. In multi-threaded runtime - // modes, the executor threads are spawned internally, so, async execution - // is driven even before we reach here. - .block_on(signal_handler) - .expect("join error"); - match signal { - SIGQUIT => { - info!("Got signal {signal}. Terminating in immediate shutdown mode",); - std::process::exit(111); - } - SIGINT | SIGTERM => { - info!("Got signal {signal}. Terminating gracefully in fast shutdown mode",); - // This cancels the `shutdown_pageserver` cancellation tree. - // Right now that tree doesn't reach very far, and `task_mgr` is used instead. - // The plan is to change that over time. - shutdown_pageserver.take(); - let bg_remote_storage = remote_storage.clone(); - let bg_deletion_queue = deletion_queue.clone(); - BACKGROUND_RUNTIME.block_on(pageserver::shutdown_pageserver( - &tenant_manager, - bg_remote_storage.map(|_| bg_deletion_queue), - 0, - )); - unreachable!() - } - _ => unreachable!(), - } + { + BACKGROUND_RUNTIME.block_on(async move { + let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt()).unwrap(); + let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate()).unwrap(); + let mut sigquit = tokio::signal::unix::signal(SignalKind::quit()).unwrap(); + let signal = tokio::select! { + _ = sigquit.recv() => { + info!("Got signal SIGQUIT. Terminating in immediate shutdown mode",); + std::process::exit(111); + } + _ = sigint.recv() => { "SIGINT" }, + _ = sigterm.recv() => { "SIGTERM" }, + }; + + info!("Got signal {signal}. Terminating gracefully in fast shutdown mode",); + + // This cancels the `shutdown_pageserver` cancellation tree. + // Right now that tree doesn't reach very far, and `task_mgr` is used instead. + // The plan is to change that over time. + shutdown_pageserver.take(); + let bg_remote_storage = remote_storage.clone(); + let bg_deletion_queue = deletion_queue.clone(); + pageserver::shutdown_pageserver( + &tenant_manager, + bg_remote_storage.map(|_| bg_deletion_queue), + 0, + ) + .await; + unreachable!() + }) } } diff --git a/pageserver/src/control_plane_client.rs b/pageserver/src/control_plane_client.rs index 42c800822b..f0ed46ce23 100644 --- a/pageserver/src/control_plane_client.rs +++ b/pageserver/src/control_plane_client.rs @@ -12,7 +12,7 @@ use pageserver_api::{ use serde::{de::DeserializeOwned, Serialize}; use tokio_util::sync::CancellationToken; use url::Url; -use utils::{backoff, generation::Generation, id::NodeId}; +use utils::{backoff, failpoint_support, generation::Generation, id::NodeId}; use crate::{ config::{NodeMetadata, PageServerConf}, @@ -210,7 +210,10 @@ impl ControlPlaneGenerationsApi for ControlPlaneClient { .collect(), }; - fail::fail_point!("control-plane-client-validate"); + failpoint_support::sleep_millis_async!("control-plane-client-validate-sleep", &self.cancel); + if self.cancel.is_cancelled() { + return Err(RetryForeverError::ShuttingDown); + } let response: ValidateResponse = self.retry_http_forever(&re_attach_path, request).await?; diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index bb477f89c5..2713309824 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -1629,7 +1629,7 @@ components: type: integer format: int64 minimum: 0 - description: The amount of disk space currently utilized by layer files. + description: The amount of disk space currently used. free_space_bytes: type: integer format: int64 diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index 343dec2ca1..ed409d3130 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -8,6 +8,7 @@ use anyhow::{bail, ensure, Context, Result}; use bytes::Bytes; use camino::Utf8Path; use futures::StreamExt; +use pageserver_api::key::rel_block_to_key; use tokio::io::{AsyncRead, AsyncReadExt}; use tokio_tar::Archive; use tracing::*; @@ -170,7 +171,10 @@ async fn import_rel( let r = reader.read_exact(&mut buf).await; match r { Ok(_) => { - modification.put_rel_page_image(rel, blknum, Bytes::copy_from_slice(&buf))?; + let key = rel_block_to_key(rel, blknum); + if modification.tline.get_shard_identity().is_key_local(&key) { + modification.put_rel_page_image(rel, blknum, Bytes::copy_from_slice(&buf))?; + } } // TODO: UnexpectedEof is expected diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 9b1b5e7ed5..3879135f26 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -593,14 +593,14 @@ impl RemoteTimelineClient { upload_queue: &mut UploadQueueInitialized, metadata: TimelineMetadata, ) { + let disk_consistent_lsn = upload_queue.latest_metadata.disk_consistent_lsn(); + info!( - "scheduling metadata upload with {} files ({} changed)", + "scheduling metadata upload up to consistent LSN {disk_consistent_lsn} with {} files ({} changed)", upload_queue.latest_files.len(), upload_queue.latest_files_changes_since_metadata_upload_scheduled, ); - let disk_consistent_lsn = upload_queue.latest_metadata.disk_consistent_lsn(); - let index_part = IndexPart::new( upload_queue.latest_files.clone(), disk_consistent_lsn, diff --git a/pageserver/src/utilization.rs b/pageserver/src/utilization.rs index 830c9897ca..5eccf185ac 100644 --- a/pageserver/src/utilization.rs +++ b/pageserver/src/utilization.rs @@ -15,11 +15,23 @@ pub(crate) fn regenerate(tenants_path: &Path) -> anyhow::Result 0 { + statvfs.fragment_size() + } else { + statvfs.block_size() + }; #[cfg_attr(not(target_os = "macos"), allow(clippy::unnecessary_cast))] let free = statvfs.blocks_available() as u64 * blocksz; - let used = crate::metrics::RESIDENT_PHYSICAL_SIZE_GLOBAL.get(); + + #[cfg_attr(not(target_os = "macos"), allow(clippy::unnecessary_cast))] + let used = statvfs + .blocks() + // use blocks_free instead of available here to match df in case someone compares + .saturating_sub(statvfs.blocks_free()) as u64 + * blocksz; + let captured_at = std::time::SystemTime::now(); let doc = PageserverUtilization { diff --git a/proxy/src/auth/backend/link.rs b/proxy/src/auth/backend/link.rs index 7db76f3d9e..415a4b7d85 100644 --- a/proxy/src/auth/backend/link.rs +++ b/proxy/src/auth/backend/link.rs @@ -102,8 +102,7 @@ pub(super) async fn authenticate( ctx.set_user(db_info.user.into()); ctx.set_project(db_info.aux.clone()); - let cold_start_info = db_info.aux.cold_start_info.clone().unwrap_or_default(); - info!(?cold_start_info, "woken up a compute node"); + info!("woken up a compute node"); // Backwards compatibility. pg_sni_proxy uses "--" in domain names // while direct connections do not. Once we migrate to pg_sni_proxy diff --git a/proxy/src/bin/pg_sni_router.rs b/proxy/src/bin/pg_sni_router.rs index 385f7820cb..c28814b1c8 100644 --- a/proxy/src/bin/pg_sni_router.rs +++ b/proxy/src/bin/pg_sni_router.rs @@ -10,6 +10,7 @@ use itertools::Itertools; use proxy::config::TlsServerEndPoint; use proxy::context::RequestMonitoring; use proxy::proxy::run_until_cancelled; +use proxy::{BranchId, EndpointId, ProjectId}; use rustls::pki_types::PrivateKeyDer; use tokio::net::TcpListener; @@ -269,7 +270,12 @@ async fn handle_client( let client = tokio::net::TcpStream::connect(destination).await?; - let metrics_aux: MetricsAuxInfo = Default::default(); + let metrics_aux: MetricsAuxInfo = MetricsAuxInfo { + endpoint_id: (&EndpointId::from("")).into(), + project_id: (&ProjectId::from("")).into(), + branch_id: (&BranchId::from("")).into(), + cold_start_info: proxy::console::messages::ColdStartInfo::Unknown, + }; // doesn't yet matter as pg-sni-router doesn't report analytics logs ctx.set_success(); diff --git a/proxy/src/cache/project_info.rs b/proxy/src/cache/project_info.rs index 5a3660520b..d8a1d261ce 100644 --- a/proxy/src/cache/project_info.rs +++ b/proxy/src/cache/project_info.rs @@ -16,7 +16,7 @@ use crate::{ config::ProjectInfoCacheOptions, console::AuthSecret, intern::{EndpointIdInt, ProjectIdInt, RoleNameInt}, - EndpointId, ProjectId, RoleName, + EndpointId, RoleName, }; use super::{Cache, Cached}; @@ -214,14 +214,11 @@ impl ProjectInfoCacheImpl { } pub fn insert_role_secret( &self, - project_id: &ProjectId, - endpoint_id: &EndpointId, - role_name: &RoleName, + project_id: ProjectIdInt, + endpoint_id: EndpointIdInt, + role_name: RoleNameInt, secret: Option, ) { - let project_id = ProjectIdInt::from(project_id); - let endpoint_id = EndpointIdInt::from(endpoint_id); - let role_name = RoleNameInt::from(role_name); if self.cache.len() >= self.config.size { // If there are too many entries, wait until the next gc cycle. return; @@ -234,12 +231,10 @@ impl ProjectInfoCacheImpl { } pub fn insert_allowed_ips( &self, - project_id: &ProjectId, - endpoint_id: &EndpointId, + project_id: ProjectIdInt, + endpoint_id: EndpointIdInt, allowed_ips: Arc>, ) { - let project_id = ProjectIdInt::from(project_id); - let endpoint_id = EndpointIdInt::from(endpoint_id); if self.cache.len() >= self.config.size { // If there are too many entries, wait until the next gc cycle. return; @@ -358,7 +353,7 @@ impl Cache for ProjectInfoCacheImpl { #[cfg(test)] mod tests { use super::*; - use crate::scram::ServerSecret; + use crate::{scram::ServerSecret, ProjectId}; #[tokio::test] async fn test_project_info_cache_settings() { @@ -369,8 +364,8 @@ mod tests { ttl: Duration::from_secs(1), gc_interval: Duration::from_secs(600), }); - let project_id = "project".into(); - let endpoint_id = "endpoint".into(); + let project_id: ProjectId = "project".into(); + let endpoint_id: EndpointId = "endpoint".into(); let user1: RoleName = "user1".into(); let user2: RoleName = "user2".into(); let secret1 = Some(AuthSecret::Scram(ServerSecret::mock([1; 32]))); @@ -379,9 +374,23 @@ mod tests { "127.0.0.1".parse().unwrap(), "127.0.0.2".parse().unwrap(), ]); - cache.insert_role_secret(&project_id, &endpoint_id, &user1, secret1.clone()); - cache.insert_role_secret(&project_id, &endpoint_id, &user2, secret2.clone()); - cache.insert_allowed_ips(&project_id, &endpoint_id, allowed_ips.clone()); + cache.insert_role_secret( + (&project_id).into(), + (&endpoint_id).into(), + (&user1).into(), + secret1.clone(), + ); + cache.insert_role_secret( + (&project_id).into(), + (&endpoint_id).into(), + (&user2).into(), + secret2.clone(), + ); + cache.insert_allowed_ips( + (&project_id).into(), + (&endpoint_id).into(), + allowed_ips.clone(), + ); let cached = cache.get_role_secret(&endpoint_id, &user1).unwrap(); assert!(cached.cached()); @@ -393,7 +402,12 @@ mod tests { // Shouldn't add more than 2 roles. let user3: RoleName = "user3".into(); let secret3 = Some(AuthSecret::Scram(ServerSecret::mock([3; 32]))); - cache.insert_role_secret(&project_id, &endpoint_id, &user3, secret3.clone()); + cache.insert_role_secret( + (&project_id).into(), + (&endpoint_id).into(), + (&user3).into(), + secret3.clone(), + ); assert!(cache.get_role_secret(&endpoint_id, &user3).is_none()); let cached = cache.get_allowed_ips(&endpoint_id).unwrap(); @@ -421,8 +435,8 @@ mod tests { cache.clone().disable_ttl(); tokio::time::advance(Duration::from_secs(2)).await; - let project_id = "project".into(); - let endpoint_id = "endpoint".into(); + let project_id: ProjectId = "project".into(); + let endpoint_id: EndpointId = "endpoint".into(); let user1: RoleName = "user1".into(); let user2: RoleName = "user2".into(); let secret1 = Some(AuthSecret::Scram(ServerSecret::mock([1; 32]))); @@ -431,9 +445,23 @@ mod tests { "127.0.0.1".parse().unwrap(), "127.0.0.2".parse().unwrap(), ]); - cache.insert_role_secret(&project_id, &endpoint_id, &user1, secret1.clone()); - cache.insert_role_secret(&project_id, &endpoint_id, &user2, secret2.clone()); - cache.insert_allowed_ips(&project_id, &endpoint_id, allowed_ips.clone()); + cache.insert_role_secret( + (&project_id).into(), + (&endpoint_id).into(), + (&user1).into(), + secret1.clone(), + ); + cache.insert_role_secret( + (&project_id).into(), + (&endpoint_id).into(), + (&user2).into(), + secret2.clone(), + ); + cache.insert_allowed_ips( + (&project_id).into(), + (&endpoint_id).into(), + allowed_ips.clone(), + ); tokio::time::advance(Duration::from_secs(2)).await; // Nothing should be invalidated. @@ -470,8 +498,8 @@ mod tests { gc_interval: Duration::from_secs(600), })); - let project_id = "project".into(); - let endpoint_id = "endpoint".into(); + let project_id: ProjectId = "project".into(); + let endpoint_id: EndpointId = "endpoint".into(); let user1: RoleName = "user1".into(); let user2: RoleName = "user2".into(); let secret1 = Some(AuthSecret::Scram(ServerSecret::mock([1; 32]))); @@ -480,10 +508,20 @@ mod tests { "127.0.0.1".parse().unwrap(), "127.0.0.2".parse().unwrap(), ]); - cache.insert_role_secret(&project_id, &endpoint_id, &user1, secret1.clone()); + cache.insert_role_secret( + (&project_id).into(), + (&endpoint_id).into(), + (&user1).into(), + secret1.clone(), + ); cache.clone().disable_ttl(); tokio::time::advance(Duration::from_millis(100)).await; - cache.insert_role_secret(&project_id, &endpoint_id, &user2, secret2.clone()); + cache.insert_role_secret( + (&project_id).into(), + (&endpoint_id).into(), + (&user2).into(), + secret2.clone(), + ); // Added before ttl was disabled + ttl should be still cached. let cached = cache.get_role_secret(&endpoint_id, &user1).unwrap(); @@ -497,7 +535,11 @@ mod tests { assert!(cache.get_role_secret(&endpoint_id, &user2).is_none()); // Added after ttl was disabled + ttl should not be cached. - cache.insert_allowed_ips(&project_id, &endpoint_id, allowed_ips.clone()); + cache.insert_allowed_ips( + (&project_id).into(), + (&endpoint_id).into(), + allowed_ips.clone(), + ); let cached = cache.get_allowed_ips(&endpoint_id).unwrap(); assert!(!cached.cached()); diff --git a/proxy/src/compute.rs b/proxy/src/compute.rs index 65153babcb..ee33b97fbd 100644 --- a/proxy/src/compute.rs +++ b/proxy/src/compute.rs @@ -276,6 +276,7 @@ impl ConnCfg { let stream = connection.stream.into_inner(); info!( + cold_start_info = ctx.cold_start_info.as_str(), "connected to compute node at {host} ({socket_addr}) sslmode={:?}", self.0.get_ssl_mode() ); diff --git a/proxy/src/console/messages.rs b/proxy/src/console/messages.rs index 102076f2c6..45161f5ac8 100644 --- a/proxy/src/console/messages.rs +++ b/proxy/src/console/messages.rs @@ -3,7 +3,7 @@ use std::fmt; use crate::auth::IpPattern; -use crate::{BranchId, EndpointId, ProjectId}; +use crate::intern::{BranchIdInt, EndpointIdInt, ProjectIdInt}; /// Generic error response with human-readable description. /// Note that we can't always present it to user as is. @@ -18,7 +18,7 @@ pub struct ConsoleError { pub struct GetRoleSecret { pub role_secret: Box, pub allowed_ips: Option>, - pub project_id: Option, + pub project_id: Option, } // Manually implement debug to omit sensitive info. @@ -93,22 +93,47 @@ impl fmt::Debug for DatabaseInfo { /// Various labels for prometheus metrics. /// Also known as `ProxyMetricsAuxInfo` in the console. -#[derive(Debug, Deserialize, Clone, Default)] +#[derive(Debug, Deserialize, Clone)] pub struct MetricsAuxInfo { - pub endpoint_id: EndpointId, - pub project_id: ProjectId, - pub branch_id: BranchId, - pub cold_start_info: Option, + pub endpoint_id: EndpointIdInt, + pub project_id: ProjectIdInt, + pub branch_id: BranchIdInt, + #[serde(default)] + pub cold_start_info: ColdStartInfo, } -#[derive(Debug, Default, Serialize, Deserialize, Clone)] +#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy)] #[serde(rename_all = "snake_case")] pub enum ColdStartInfo { #[default] - Unknown = 0, - Warm = 1, - PoolHit = 2, - PoolMiss = 3, + Unknown, + /// Compute was already running + Warm, + #[serde(rename = "pool_hit")] + /// Compute was not running but there was an available VM + VmPoolHit, + #[serde(rename = "pool_miss")] + /// Compute was not running and there were no VMs available + VmPoolMiss, + + // not provided by control plane + /// Connection available from HTTP pool + HttpPoolHit, + /// Cached connection info + WarmCached, +} + +impl ColdStartInfo { + pub fn as_str(&self) -> &'static str { + match self { + ColdStartInfo::Unknown => "unknown", + ColdStartInfo::Warm => "warm", + ColdStartInfo::VmPoolHit => "pool_hit", + ColdStartInfo::VmPoolMiss => "pool_miss", + ColdStartInfo::HttpPoolHit => "http_pool_hit", + ColdStartInfo::WarmCached => "warm_cached", + } + } } #[cfg(test)] diff --git a/proxy/src/console/provider.rs b/proxy/src/console/provider.rs index 69bfd6b045..f7d621fb12 100644 --- a/proxy/src/console/provider.rs +++ b/proxy/src/console/provider.rs @@ -12,7 +12,8 @@ use crate::{ compute, config::{CacheOptions, ProjectInfoCacheOptions}, context::RequestMonitoring, - scram, EndpointCacheKey, ProjectId, + intern::ProjectIdInt, + scram, EndpointCacheKey, }; use dashmap::DashMap; use std::{sync::Arc, time::Duration}; @@ -271,7 +272,7 @@ pub struct AuthInfo { /// List of IP addresses allowed for the autorization. pub allowed_ips: Vec, /// Project ID. This is used for cache invalidation. - pub project_id: Option, + pub project_id: Option, } /// Info for establishing a connection to a compute node. diff --git a/proxy/src/console/provider/mock.rs b/proxy/src/console/provider/mock.rs index b759c81373..cfe491f2aa 100644 --- a/proxy/src/console/provider/mock.rs +++ b/proxy/src/console/provider/mock.rs @@ -4,10 +4,16 @@ use super::{ errors::{ApiError, GetAuthInfoError, WakeComputeError}, AuthInfo, AuthSecret, CachedNodeInfo, NodeInfo, }; -use crate::console::provider::{CachedAllowedIps, CachedRoleSecret}; use crate::context::RequestMonitoring; use crate::{auth::backend::ComputeUserInfo, compute, error::io_error, scram, url::ApiUrl}; use crate::{auth::IpPattern, cache::Cached}; +use crate::{ + console::{ + messages::MetricsAuxInfo, + provider::{CachedAllowedIps, CachedRoleSecret}, + }, + BranchId, EndpointId, ProjectId, +}; use futures::TryFutureExt; use std::{str::FromStr, sync::Arc}; use thiserror::Error; @@ -114,7 +120,12 @@ impl Api { let node = NodeInfo { config, - aux: Default::default(), + aux: MetricsAuxInfo { + endpoint_id: (&EndpointId::from("endpoint")).into(), + project_id: (&ProjectId::from("project")).into(), + branch_id: (&BranchId::from("branch")).into(), + cold_start_info: crate::console::messages::ColdStartInfo::Warm, + }, allow_self_signed_compute: false, }; diff --git a/proxy/src/console/provider/neon.rs b/proxy/src/console/provider/neon.rs index 289b0c08f7..1a3e2ca795 100644 --- a/proxy/src/console/provider/neon.rs +++ b/proxy/src/console/provider/neon.rs @@ -181,15 +181,16 @@ impl super::Api for Api { } let auth_info = self.do_get_auth_info(ctx, user_info).await?; if let Some(project_id) = auth_info.project_id { + let ep_int = ep.into(); self.caches.project_info.insert_role_secret( - &project_id, - ep, - user, + project_id, + ep_int, + user.into(), auth_info.secret.clone(), ); self.caches.project_info.insert_allowed_ips( - &project_id, - ep, + project_id, + ep_int, Arc::new(auth_info.allowed_ips), ); ctx.set_project_id(project_id); @@ -217,15 +218,16 @@ impl super::Api for Api { let allowed_ips = Arc::new(auth_info.allowed_ips); let user = &user_info.user; if let Some(project_id) = auth_info.project_id { + let ep_int = ep.into(); self.caches.project_info.insert_role_secret( - &project_id, - ep, - user, + project_id, + ep_int, + user.into(), auth_info.secret.clone(), ); self.caches .project_info - .insert_allowed_ips(&project_id, ep, allowed_ips.clone()); + .insert_allowed_ips(project_id, ep_int, allowed_ips.clone()); ctx.set_project_id(project_id); } Ok(( @@ -248,8 +250,7 @@ impl super::Api for Api { // which means that we might cache it to reduce the load and latency. if let Some(cached) = self.caches.node_info.get(&key) { info!(key = &*key, "found cached compute node info"); - info!("cold_start_info=warm"); - ctx.set_cold_start_info(ColdStartInfo::Warm); + ctx.set_project(cached.aux.clone()); return Ok(cached); } @@ -260,17 +261,21 @@ impl super::Api for Api { if permit.should_check_cache() { if let Some(cached) = self.caches.node_info.get(&key) { info!(key = &*key, "found cached compute node info"); - info!("cold_start_info=warm"); - ctx.set_cold_start_info(ColdStartInfo::Warm); + ctx.set_project(cached.aux.clone()); return Ok(cached); } } - let node = self.do_wake_compute(ctx, user_info).await?; + let mut node = self.do_wake_compute(ctx, user_info).await?; ctx.set_project(node.aux.clone()); - let cold_start_info = node.aux.cold_start_info.clone().unwrap_or_default(); - info!(?cold_start_info, "woken up a compute node"); - let (_, cached) = self.caches.node_info.insert(key.clone(), node); + let cold_start_info = node.aux.cold_start_info; + info!("woken up a compute node"); + + // store the cached node as 'warm' + node.aux.cold_start_info = ColdStartInfo::WarmCached; + let (_, mut cached) = self.caches.node_info.insert(key.clone(), node); + cached.aux.cold_start_info = cold_start_info; + info!(key = &*key, "created a cache entry for compute node info"); Ok(cached) diff --git a/proxy/src/context.rs b/proxy/src/context.rs index 7ca830cdb4..fec95f4722 100644 --- a/proxy/src/context.rs +++ b/proxy/src/context.rs @@ -11,8 +11,9 @@ use uuid::Uuid; use crate::{ console::messages::{ColdStartInfo, MetricsAuxInfo}, error::ErrorKind, + intern::{BranchIdInt, ProjectIdInt}, metrics::{LatencyTimer, ENDPOINT_ERRORS_BY_KIND, ERROR_BY_KIND}, - BranchId, DbName, EndpointId, ProjectId, RoleName, + DbName, EndpointId, RoleName, }; use self::parquet::RequestData; @@ -34,8 +35,8 @@ pub struct RequestMonitoring { pub span: Span, // filled in as they are discovered - project: Option, - branch: Option, + project: Option, + branch: Option, endpoint_id: Option, dbname: Option, user: Option, @@ -43,7 +44,7 @@ pub struct RequestMonitoring { error_kind: Option, pub(crate) auth_method: Option, success: bool, - cold_start_info: Option, + pub(crate) cold_start_info: ColdStartInfo, // extra // This sender is here to keep the request monitoring channel open while requests are taking place. @@ -92,7 +93,7 @@ impl RequestMonitoring { error_kind: None, auth_method: None, success: false, - cold_start_info: None, + cold_start_info: ColdStartInfo::Unknown, sender: LOG_CHAN.get().and_then(|tx| tx.upgrade()), latency_timer: LatencyTimer::new(protocol), @@ -113,26 +114,31 @@ impl RequestMonitoring { } pub fn set_cold_start_info(&mut self, info: ColdStartInfo) { - self.cold_start_info = Some(info); + self.cold_start_info = info; + self.latency_timer.cold_start_info(info); } pub fn set_project(&mut self, x: MetricsAuxInfo) { - self.set_endpoint_id(x.endpoint_id); + if self.endpoint_id.is_none() { + self.set_endpoint_id(x.endpoint_id.as_str().into()) + } self.branch = Some(x.branch_id); self.project = Some(x.project_id); - self.cold_start_info = x.cold_start_info; + self.set_cold_start_info(x.cold_start_info); } - pub fn set_project_id(&mut self, project_id: ProjectId) { + pub fn set_project_id(&mut self, project_id: ProjectIdInt) { self.project = Some(project_id); } pub fn set_endpoint_id(&mut self, endpoint_id: EndpointId) { - self.span.record("ep", display(&endpoint_id)); - crate::metrics::CONNECTING_ENDPOINTS - .with_label_values(&[self.protocol]) - .measure(&endpoint_id); - self.endpoint_id = Some(endpoint_id); + if self.endpoint_id.is_none() { + self.span.record("ep", display(&endpoint_id)); + crate::metrics::CONNECTING_ENDPOINTS + .with_label_values(&[self.protocol]) + .measure(&endpoint_id); + self.endpoint_id = Some(endpoint_id); + } } pub fn set_application(&mut self, app: Option) { diff --git a/proxy/src/context/parquet.rs b/proxy/src/context/parquet.rs index 04e5695255..eb77409429 100644 --- a/proxy/src/context/parquet.rs +++ b/proxy/src/context/parquet.rs @@ -87,7 +87,7 @@ pub struct RequestData { /// Or if we make it to proxy_pass success: bool, /// Indicates if the cplane started the new compute node for this request. - cold_start_info: Option<&'static str>, + cold_start_info: &'static str, /// Tracks time from session start (HTTP request/libpq TCP handshake) /// Through to success/failure duration_us: u64, @@ -115,12 +115,7 @@ impl From<&RequestMonitoring> for RequestData { region: value.region, error: value.error_kind.as_ref().map(|e| e.to_metric_label()), success: value.success, - cold_start_info: value.cold_start_info.as_ref().map(|x| match x { - crate::console::messages::ColdStartInfo::Unknown => "unknown", - crate::console::messages::ColdStartInfo::Warm => "warm", - crate::console::messages::ColdStartInfo::PoolHit => "pool_hit", - crate::console::messages::ColdStartInfo::PoolMiss => "pool_miss", - }), + cold_start_info: value.cold_start_info.as_str(), duration_us: SystemTime::from(value.first_packet) .elapsed() .unwrap_or_default() @@ -454,7 +449,7 @@ mod tests { region: "us-east-1", error: None, success: rng.gen(), - cold_start_info: Some("no"), + cold_start_info: "no", duration_us: rng.gen_range(0..30_000_000), } } @@ -524,15 +519,15 @@ mod tests { assert_eq!( file_stats, [ - (1314406, 3, 6000), - (1314399, 3, 6000), - (1314459, 3, 6000), - (1314416, 3, 6000), - (1314546, 3, 6000), - (1314388, 3, 6000), - (1314180, 3, 6000), - (1314416, 3, 6000), - (438359, 1, 2000) + (1314385, 3, 6000), + (1314378, 3, 6000), + (1314438, 3, 6000), + (1314395, 3, 6000), + (1314525, 3, 6000), + (1314367, 3, 6000), + (1314159, 3, 6000), + (1314395, 3, 6000), + (438352, 1, 2000) ] ); @@ -562,11 +557,11 @@ mod tests { assert_eq!( file_stats, [ - (1220668, 5, 10000), - (1226818, 5, 10000), - (1228612, 5, 10000), - (1227974, 5, 10000), - (1219252, 5, 10000) + (1220633, 5, 10000), + (1226783, 5, 10000), + (1228577, 5, 10000), + (1227939, 5, 10000), + (1219217, 5, 10000) ] ); @@ -598,11 +593,11 @@ mod tests { assert_eq!( file_stats, [ - (1206315, 5, 10000), - (1206046, 5, 10000), - (1206339, 5, 10000), - (1206327, 5, 10000), - (1206582, 5, 10000) + (1206280, 5, 10000), + (1206011, 5, 10000), + (1206304, 5, 10000), + (1206292, 5, 10000), + (1206547, 5, 10000) ] ); @@ -627,15 +622,15 @@ mod tests { assert_eq!( file_stats, [ - (1314406, 3, 6000), - (1314399, 3, 6000), - (1314459, 3, 6000), - (1314416, 3, 6000), - (1314546, 3, 6000), - (1314388, 3, 6000), - (1314180, 3, 6000), - (1314416, 3, 6000), - (438359, 1, 2000) + (1314385, 3, 6000), + (1314378, 3, 6000), + (1314438, 3, 6000), + (1314395, 3, 6000), + (1314525, 3, 6000), + (1314367, 3, 6000), + (1314159, 3, 6000), + (1314395, 3, 6000), + (438352, 1, 2000) ] ); @@ -672,7 +667,7 @@ mod tests { // files are smaller than the size threshold, but they took too long to fill so were flushed early assert_eq!( file_stats, - [(658837, 2, 3001), (658551, 2, 3000), (658347, 2, 2999)] + [(658823, 2, 3001), (658537, 2, 3000), (658333, 2, 2999)] ); tmpdir.close().unwrap(); diff --git a/proxy/src/metrics.rs b/proxy/src/metrics.rs index 9da1fdc02f..59ee899c08 100644 --- a/proxy/src/metrics.rs +++ b/proxy/src/metrics.rs @@ -12,6 +12,8 @@ use metrics::{ use once_cell::sync::Lazy; use tokio::time::{self, Instant}; +use crate::console::messages::ColdStartInfo; + pub static NUM_DB_CONNECTIONS_GAUGE: Lazy = Lazy::new(|| { register_int_counter_pair_vec!( "proxy_opened_db_connections_total", @@ -50,8 +52,8 @@ pub static COMPUTE_CONNECTION_LATENCY: Lazy = Lazy::new(|| { "proxy_compute_connection_latency_seconds", "Time it took for proxy to establish a connection to the compute endpoint", // http/ws/tcp, true/false, true/false, success/failure, client/client_and_cplane - // 3 * 2 * 2 * 2 * 2 = 48 counters - &["protocol", "cache_miss", "pool_miss", "outcome", "excluded"], + // 3 * 6 * 2 * 2 = 72 counters + &["protocol", "cold_start_info", "outcome", "excluded"], // largest bucket = 2^16 * 0.5ms = 32s exponential_buckets(0.0005, 2.0, 16).unwrap(), ) @@ -183,6 +185,20 @@ struct Accumulated { compute: time::Duration, } +enum Outcome { + Success, + Failed, +} + +impl Outcome { + fn as_str(&self) -> &'static str { + match self { + Outcome::Success => "success", + Outcome::Failed => "failed", + } + } +} + pub struct LatencyTimer { // time since the stopwatch was started start: time::Instant, @@ -192,9 +208,8 @@ pub struct LatencyTimer { accumulated: Accumulated, // label data protocol: &'static str, - cache_miss: bool, - pool_miss: bool, - outcome: &'static str, + cold_start_info: ColdStartInfo, + outcome: Outcome, } pub struct LatencyTimerPause<'a> { @@ -210,11 +225,9 @@ impl LatencyTimer { stop: None, accumulated: Accumulated::default(), protocol, - cache_miss: false, - // by default we don't do pooling - pool_miss: true, + cold_start_info: ColdStartInfo::Unknown, // assume failed unless otherwise specified - outcome: "failed", + outcome: Outcome::Failed, } } @@ -226,12 +239,8 @@ impl LatencyTimer { } } - pub fn cache_miss(&mut self) { - self.cache_miss = true; - } - - pub fn pool_hit(&mut self) { - self.pool_miss = false; + pub fn cold_start_info(&mut self, cold_start_info: ColdStartInfo) { + self.cold_start_info = cold_start_info; } pub fn success(&mut self) { @@ -239,7 +248,7 @@ impl LatencyTimer { self.stop = Some(time::Instant::now()); // success - self.outcome = "success"; + self.outcome = Outcome::Success; } } @@ -264,9 +273,8 @@ impl Drop for LatencyTimer { COMPUTE_CONNECTION_LATENCY .with_label_values(&[ self.protocol, - bool_to_str(self.cache_miss), - bool_to_str(self.pool_miss), - self.outcome, + self.cold_start_info.as_str(), + self.outcome.as_str(), "client", ]) .observe((duration.saturating_sub(self.accumulated.client)).as_secs_f64()); @@ -275,9 +283,8 @@ impl Drop for LatencyTimer { COMPUTE_CONNECTION_LATENCY .with_label_values(&[ self.protocol, - bool_to_str(self.cache_miss), - bool_to_str(self.pool_miss), - self.outcome, + self.cold_start_info.as_str(), + self.outcome.as_str(), "client_and_cplane", ]) .observe((duration.saturating_sub(accumulated_total)).as_secs_f64()); diff --git a/proxy/src/proxy/connect_compute.rs b/proxy/src/proxy/connect_compute.rs index c76e2ff6d9..4c0d68ce0b 100644 --- a/proxy/src/proxy/connect_compute.rs +++ b/proxy/src/proxy/connect_compute.rs @@ -87,7 +87,6 @@ impl ConnectMechanism for TcpMechanism<'_> { } /// Try to connect to the compute node, retrying if necessary. -/// This function might update `node_info`, so we take it by `&mut`. #[tracing::instrument(skip_all)] pub async fn connect_to_compute( ctx: &mut RequestMonitoring, @@ -132,7 +131,6 @@ where } else { // if we failed to connect, it's likely that the compute node was suspended, wake a new compute node info!("compute node's state has likely changed; requesting a wake-up"); - ctx.latency_timer.cache_miss(); let old_node_info = invalidate_cache(node_info); let mut node_info = wake_compute(&mut num_retries, ctx, user_info).await?; node_info.reuse_settings(old_node_info); diff --git a/proxy/src/proxy/passthrough.rs b/proxy/src/proxy/passthrough.rs index cf53c6e673..c81a1a8292 100644 --- a/proxy/src/proxy/passthrough.rs +++ b/proxy/src/proxy/passthrough.rs @@ -19,8 +19,8 @@ pub async fn proxy_pass( aux: MetricsAuxInfo, ) -> anyhow::Result<()> { let usage = USAGE_METRICS.register(Ids { - endpoint_id: aux.endpoint_id.clone(), - branch_id: aux.branch_id.clone(), + endpoint_id: aux.endpoint_id, + branch_id: aux.branch_id, }); let m_sent = NUM_BYTES_PROXIED_COUNTER.with_label_values(&["tx"]); diff --git a/proxy/src/proxy/tests.rs b/proxy/src/proxy/tests.rs index a4051447c1..71d85e106d 100644 --- a/proxy/src/proxy/tests.rs +++ b/proxy/src/proxy/tests.rs @@ -12,11 +12,12 @@ use crate::auth::backend::{ }; use crate::config::CertResolver; use crate::console::caches::NodeInfoCache; +use crate::console::messages::MetricsAuxInfo; use crate::console::provider::{CachedAllowedIps, CachedRoleSecret, ConsoleBackend}; use crate::console::{self, CachedNodeInfo, NodeInfo}; use crate::error::ErrorKind; use crate::proxy::retry::{retry_after, NUM_RETRIES_CONNECT}; -use crate::{http, sasl, scram}; +use crate::{http, sasl, scram, BranchId, EndpointId, ProjectId}; use anyhow::{bail, Context}; use async_trait::async_trait; use rstest::rstest; @@ -512,7 +513,12 @@ impl TestBackend for TestConnectMechanism { fn helper_create_cached_node_info(cache: &'static NodeInfoCache) -> CachedNodeInfo { let node = NodeInfo { config: compute::ConnCfg::new(), - aux: Default::default(), + aux: MetricsAuxInfo { + endpoint_id: (&EndpointId::from("endpoint")).into(), + project_id: (&ProjectId::from("project")).into(), + branch_id: (&BranchId::from("branch")).into(), + cold_start_info: crate::console::messages::ColdStartInfo::Warm, + }, allow_self_signed_compute: false, }; let (_, node) = cache.insert("key".into(), node); diff --git a/proxy/src/serverless/backend.rs b/proxy/src/serverless/backend.rs index f10779d7ba..8aa5ad4e8a 100644 --- a/proxy/src/serverless/backend.rs +++ b/proxy/src/serverless/backend.rs @@ -9,7 +9,6 @@ use crate::{ config::ProxyConfig, console::{ errors::{GetAuthInfoError, WakeComputeError}, - messages::ColdStartInfo, CachedNodeInfo, }, context::RequestMonitoring, @@ -57,7 +56,10 @@ impl PoolingBackend { let auth_outcome = crate::auth::validate_password_and_exchange(&conn_info.password, secret).await?; let res = match auth_outcome { - crate::sasl::Outcome::Success(key) => Ok(key), + crate::sasl::Outcome::Success(key) => { + info!("user successfully authenticated"); + Ok(key) + } crate::sasl::Outcome::Failure(reason) => { info!("auth backend failed with an error: {reason}"); Err(AuthError::auth_failed(&*conn_info.user_info.user)) @@ -89,8 +91,6 @@ impl PoolingBackend { }; if let Some(client) = maybe_client { - info!("cold_start_info=warm"); - ctx.set_cold_start_info(ColdStartInfo::Warm); return Ok(client); } let conn_id = uuid::Uuid::new_v4(); diff --git a/proxy/src/serverless/conn_pool.rs b/proxy/src/serverless/conn_pool.rs index c7e8eaef76..35311facb8 100644 --- a/proxy/src/serverless/conn_pool.rs +++ b/proxy/src/serverless/conn_pool.rs @@ -17,7 +17,7 @@ use tokio::time::Instant; use tokio_postgres::tls::NoTlsStream; use tokio_postgres::{AsyncMessage, ReadyForQueryStatus, Socket}; -use crate::console::messages::MetricsAuxInfo; +use crate::console::messages::{ColdStartInfo, MetricsAuxInfo}; use crate::metrics::{ENDPOINT_POOLS, GC_LATENCY, NUM_OPEN_CLIENTS_IN_HTTP_POOL}; use crate::usage_metrics::{Ids, MetricCounter, USAGE_METRICS}; use crate::{ @@ -383,9 +383,12 @@ impl GlobalConnPool { "pid", &tracing::field::display(client.inner.get_process_id()), ); - info!("pool: reusing connection '{conn_info}'"); + info!( + cold_start_info = ColdStartInfo::HttpPoolHit.as_str(), + "pool: reusing connection '{conn_info}'" + ); client.session.send(ctx.session_id)?; - ctx.latency_timer.pool_hit(); + ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit); ctx.latency_timer.success(); return Ok(Some(Client::new(client, conn_info.clone(), endpoint_pool))); } @@ -454,8 +457,9 @@ pub fn poll_client( let (tx, mut rx) = tokio::sync::watch::channel(session_id); let span = info_span!(parent: None, "connection", %conn_id); + let cold_start_info = ctx.cold_start_info; span.in_scope(|| { - info!(%conn_info, %session_id, "new connection"); + info!(cold_start_info = cold_start_info.as_str(), %conn_info, %session_id, "new connection"); }); let pool = match conn_info.endpoint_cache_key() { Some(endpoint) => Arc::downgrade(&global_pool.get_or_create_endpoint_pool(&endpoint)), @@ -565,8 +569,8 @@ impl Client { pub fn metrics(&self) -> Arc { let aux = &self.inner.as_ref().unwrap().aux; USAGE_METRICS.register(Ids { - endpoint_id: aux.endpoint_id.clone(), - branch_id: aux.branch_id.clone(), + endpoint_id: aux.endpoint_id, + branch_id: aux.branch_id, }) } } @@ -666,6 +670,8 @@ impl Drop for Client { mod tests { use std::{mem, sync::atomic::AtomicBool}; + use crate::{BranchId, EndpointId, ProjectId}; + use super::*; struct MockClient(Arc); @@ -691,7 +697,12 @@ mod tests { ClientInner { inner: client, session: tokio::sync::watch::Sender::new(uuid::Uuid::new_v4()), - aux: Default::default(), + aux: MetricsAuxInfo { + endpoint_id: (&EndpointId::from("endpoint")).into(), + project_id: (&ProjectId::from("project")).into(), + branch_id: (&BranchId::from("branch")).into(), + cold_start_info: crate::console::messages::ColdStartInfo::Warm, + }, conn_id: uuid::Uuid::new_v4(), } } diff --git a/proxy/src/usage_metrics.rs b/proxy/src/usage_metrics.rs index b21056735d..5ffbf95c07 100644 --- a/proxy/src/usage_metrics.rs +++ b/proxy/src/usage_metrics.rs @@ -3,7 +3,8 @@ use crate::{ config::{MetricBackupCollectionConfig, MetricCollectionConfig}, context::parquet::{FAILED_UPLOAD_MAX_RETRIES, FAILED_UPLOAD_WARN_THRESHOLD}, - http, BranchId, EndpointId, + http, + intern::{BranchIdInt, EndpointIdInt}, }; use anyhow::Context; use async_compression::tokio::write::GzipEncoder; @@ -43,8 +44,8 @@ const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60); /// because we enrich the event with project_id in the control-plane endpoint. #[derive(Eq, Hash, PartialEq, Serialize, Deserialize, Debug, Clone)] pub struct Ids { - pub endpoint_id: EndpointId, - pub branch_id: BranchId, + pub endpoint_id: EndpointIdInt, + pub branch_id: BranchIdInt, } pub trait MetricCounterRecorder { @@ -494,7 +495,7 @@ mod tests { use url::Url; use super::*; - use crate::{http, rate_limiter::RateLimiterConfig}; + use crate::{http, rate_limiter::RateLimiterConfig, BranchId, EndpointId}; #[tokio::test] async fn metrics() { @@ -536,8 +537,8 @@ mod tests { // register a new counter let counter = metrics.register(Ids { - endpoint_id: "e1".into(), - branch_id: "b1".into(), + endpoint_id: (&EndpointId::from("e1")).into(), + branch_id: (&BranchId::from("b1")).into(), }); // the counter should be observed despite 0 egress diff --git a/safekeeper/src/wal_backup_partial.rs b/safekeeper/src/wal_backup_partial.rs index a535c814ea..200096ac5c 100644 --- a/safekeeper/src/wal_backup_partial.rs +++ b/safekeeper/src/wal_backup_partial.rs @@ -337,6 +337,17 @@ pub async fn main_task(tli: Arc, conf: SafeKeeperConf) { } } + // if we don't have any data and zero LSNs, wait for something + while flush_lsn_rx.borrow().lsn == Lsn(0) { + tokio::select! { + _ = cancellation_rx.changed() => { + info!("timeline canceled"); + return; + } + _ = flush_lsn_rx.changed() => {} + } + } + // fixing the segno and waiting some time to prevent reuploading the same segment too often let pending_segno = backup.segno(flush_lsn_rx.borrow().lsn); let timeout = tokio::time::sleep(await_duration); diff --git a/control_plane/attachment_service/Cargo.toml b/storage_controller/Cargo.toml similarity index 83% rename from control_plane/attachment_service/Cargo.toml rename to storage_controller/Cargo.toml index 595b091df4..165cafaf4e 100644 --- a/control_plane/attachment_service/Cargo.toml +++ b/storage_controller/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "attachment_service" +name = "storage_controller" version = "0.1.0" edition.workspace = true license.workspace = true @@ -45,8 +45,8 @@ diesel = { version = "2.1.4", features = ["serde_json", "postgres", "r2d2"] } diesel_migrations = { version = "2.1.0" } r2d2 = { version = "0.8.10" } -utils = { path = "../../libs/utils/" } -metrics = { path = "../../libs/metrics/" } -control_plane = { path = ".." } -workspace_hack = { version = "0.1", path = "../../workspace_hack" } +utils = { path = "../libs/utils/" } +metrics = { path = "../libs/metrics/" } +control_plane = { path = "../control_plane" } +workspace_hack = { version = "0.1", path = "../workspace_hack" } diff --git a/control_plane/attachment_service/migrations/.keep b/storage_controller/migrations/.keep similarity index 100% rename from control_plane/attachment_service/migrations/.keep rename to storage_controller/migrations/.keep diff --git a/control_plane/attachment_service/migrations/00000000000000_diesel_initial_setup/down.sql b/storage_controller/migrations/00000000000000_diesel_initial_setup/down.sql similarity index 100% rename from control_plane/attachment_service/migrations/00000000000000_diesel_initial_setup/down.sql rename to storage_controller/migrations/00000000000000_diesel_initial_setup/down.sql diff --git a/control_plane/attachment_service/migrations/00000000000000_diesel_initial_setup/up.sql b/storage_controller/migrations/00000000000000_diesel_initial_setup/up.sql similarity index 100% rename from control_plane/attachment_service/migrations/00000000000000_diesel_initial_setup/up.sql rename to storage_controller/migrations/00000000000000_diesel_initial_setup/up.sql diff --git a/control_plane/attachment_service/migrations/2024-01-07-211257_create_tenant_shards/down.sql b/storage_controller/migrations/2024-01-07-211257_create_tenant_shards/down.sql similarity index 100% rename from control_plane/attachment_service/migrations/2024-01-07-211257_create_tenant_shards/down.sql rename to storage_controller/migrations/2024-01-07-211257_create_tenant_shards/down.sql diff --git a/control_plane/attachment_service/migrations/2024-01-07-211257_create_tenant_shards/up.sql b/storage_controller/migrations/2024-01-07-211257_create_tenant_shards/up.sql similarity index 100% rename from control_plane/attachment_service/migrations/2024-01-07-211257_create_tenant_shards/up.sql rename to storage_controller/migrations/2024-01-07-211257_create_tenant_shards/up.sql diff --git a/control_plane/attachment_service/migrations/2024-01-07-212945_create_nodes/down.sql b/storage_controller/migrations/2024-01-07-212945_create_nodes/down.sql similarity index 100% rename from control_plane/attachment_service/migrations/2024-01-07-212945_create_nodes/down.sql rename to storage_controller/migrations/2024-01-07-212945_create_nodes/down.sql diff --git a/control_plane/attachment_service/migrations/2024-01-07-212945_create_nodes/up.sql b/storage_controller/migrations/2024-01-07-212945_create_nodes/up.sql similarity index 100% rename from control_plane/attachment_service/migrations/2024-01-07-212945_create_nodes/up.sql rename to storage_controller/migrations/2024-01-07-212945_create_nodes/up.sql diff --git a/control_plane/attachment_service/migrations/2024-02-29-094122_generations_null/down.sql b/storage_controller/migrations/2024-02-29-094122_generations_null/down.sql similarity index 100% rename from control_plane/attachment_service/migrations/2024-02-29-094122_generations_null/down.sql rename to storage_controller/migrations/2024-02-29-094122_generations_null/down.sql diff --git a/control_plane/attachment_service/migrations/2024-02-29-094122_generations_null/up.sql b/storage_controller/migrations/2024-02-29-094122_generations_null/up.sql similarity index 100% rename from control_plane/attachment_service/migrations/2024-02-29-094122_generations_null/up.sql rename to storage_controller/migrations/2024-02-29-094122_generations_null/up.sql diff --git a/control_plane/attachment_service/migrations/2024-03-18-184429_rename_policy/down.sql b/storage_controller/migrations/2024-03-18-184429_rename_policy/down.sql similarity index 100% rename from control_plane/attachment_service/migrations/2024-03-18-184429_rename_policy/down.sql rename to storage_controller/migrations/2024-03-18-184429_rename_policy/down.sql diff --git a/control_plane/attachment_service/migrations/2024-03-18-184429_rename_policy/up.sql b/storage_controller/migrations/2024-03-18-184429_rename_policy/up.sql similarity index 100% rename from control_plane/attachment_service/migrations/2024-03-18-184429_rename_policy/up.sql rename to storage_controller/migrations/2024-03-18-184429_rename_policy/up.sql diff --git a/control_plane/attachment_service/migrations/2024-03-27-133204_tenant_policies/down.sql b/storage_controller/migrations/2024-03-27-133204_tenant_policies/down.sql similarity index 100% rename from control_plane/attachment_service/migrations/2024-03-27-133204_tenant_policies/down.sql rename to storage_controller/migrations/2024-03-27-133204_tenant_policies/down.sql diff --git a/control_plane/attachment_service/migrations/2024-03-27-133204_tenant_policies/up.sql b/storage_controller/migrations/2024-03-27-133204_tenant_policies/up.sql similarity index 100% rename from control_plane/attachment_service/migrations/2024-03-27-133204_tenant_policies/up.sql rename to storage_controller/migrations/2024-03-27-133204_tenant_policies/up.sql diff --git a/control_plane/attachment_service/src/auth.rs b/storage_controller/src/auth.rs similarity index 100% rename from control_plane/attachment_service/src/auth.rs rename to storage_controller/src/auth.rs diff --git a/control_plane/attachment_service/src/compute_hook.rs b/storage_controller/src/compute_hook.rs similarity index 61% rename from control_plane/attachment_service/src/compute_hook.rs rename to storage_controller/src/compute_hook.rs index 1a8dc6b86d..eb0c4472e4 100644 --- a/control_plane/attachment_service/src/compute_hook.rs +++ b/storage_controller/src/compute_hook.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use std::{collections::HashMap, time::Duration}; use control_plane::endpoint::{ComputeControlPlane, EndpointStatus}; @@ -18,14 +19,26 @@ const SLOWDOWN_DELAY: Duration = Duration::from_secs(5); pub(crate) const API_CONCURRENCY: usize = 32; +struct UnshardedComputeHookTenant { + // Which node is this tenant attached to + node_id: NodeId, + + // Must hold this lock to send a notification. + send_lock: Arc>>, +} struct ShardedComputeHookTenant { stripe_size: ShardStripeSize, shard_count: ShardCount, shards: Vec<(ShardNumber, NodeId)>, + + // Must hold this lock to send a notification. The contents represent + // the last successfully sent notification, and are used to coalesce multiple + // updates by only sending when there is a chance since our last successful send. + send_lock: Arc>>, } enum ComputeHookTenant { - Unsharded(NodeId), + Unsharded(UnshardedComputeHookTenant), Sharded(ShardedComputeHookTenant), } @@ -37,9 +50,20 @@ impl ComputeHookTenant { shards: vec![(tenant_shard_id.shard_number, node_id)], stripe_size, shard_count: tenant_shard_id.shard_count, + send_lock: Arc::default(), }) } else { - Self::Unsharded(node_id) + Self::Unsharded(UnshardedComputeHookTenant { + node_id, + send_lock: Arc::default(), + }) + } + } + + fn get_send_lock(&self) -> &Arc>> { + match self { + Self::Unsharded(unsharded_tenant) => &unsharded_tenant.send_lock, + Self::Sharded(sharded_tenant) => &sharded_tenant.send_lock, } } @@ -52,8 +76,8 @@ impl ComputeHookTenant { node_id: NodeId, ) { match self { - Self::Unsharded(existing_node_id) if tenant_shard_id.shard_count.count() == 1 => { - *existing_node_id = node_id + Self::Unsharded(unsharded_tenant) if tenant_shard_id.shard_count.count() == 1 => { + unsharded_tenant.node_id = node_id } Self::Sharded(sharded_tenant) if sharded_tenant.stripe_size == stripe_size @@ -80,14 +104,14 @@ impl ComputeHookTenant { } } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)] struct ComputeHookNotifyRequestShard { node_id: NodeId, shard_number: ShardNumber, } /// Request body that we send to the control plane to notify it of where a tenant is attached -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)] struct ComputeHookNotifyRequest { tenant_id: TenantId, stripe_size: Option, @@ -120,14 +144,44 @@ pub(crate) enum NotifyError { Fatal(StatusCode), } +enum MaybeSendResult { + // Please send this request while holding the lock, and if you succeed then write + // the request into the lock. + Transmit( + ( + ComputeHookNotifyRequest, + tokio::sync::OwnedMutexGuard>, + ), + ), + // Something requires sending, but you must wait for a current sender then call again + AwaitLock(Arc>>), + // Nothing requires sending + Noop, +} + impl ComputeHookTenant { - fn maybe_reconfigure(&self, tenant_id: TenantId) -> Option { - match self { - Self::Unsharded(node_id) => Some(ComputeHookNotifyRequest { + fn maybe_send( + &self, + tenant_id: TenantId, + lock: Option>>, + ) -> MaybeSendResult { + let locked = match lock { + Some(already_locked) => already_locked, + None => { + // Lock order: this _must_ be only a try_lock, because we are called inside of the [`ComputeHook::state`] lock. + let Ok(locked) = self.get_send_lock().clone().try_lock_owned() else { + return MaybeSendResult::AwaitLock(self.get_send_lock().clone()); + }; + locked + } + }; + + let request = match self { + Self::Unsharded(unsharded_tenant) => Some(ComputeHookNotifyRequest { tenant_id, shards: vec![ComputeHookNotifyRequestShard { shard_number: ShardNumber(0), - node_id: *node_id, + node_id: unsharded_tenant.node_id, }], stripe_size: None, }), @@ -151,12 +205,25 @@ impl ComputeHookTenant { // Sharded tenant doesn't yet have information for all its shards tracing::info!( - "ComputeHookTenant::maybe_reconfigure: not enough shards ({}/{})", + "ComputeHookTenant::maybe_send: not enough shards ({}/{})", sharded_tenant.shards.len(), sharded_tenant.shard_count.count() ); None } + }; + + match request { + None => { + // Not yet ready to emit a notification + tracing::info!("Tenant isn't yet ready to emit a notification"); + MaybeSendResult::Noop + } + Some(request) if Some(&request) == locked.as_ref() => { + // No change from the last value successfully sent + MaybeSendResult::Noop + } + Some(request) => MaybeSendResult::Transmit((request, locked)), } } } @@ -166,8 +233,15 @@ impl ComputeHookTenant { /// the compute connection string. pub(super) struct ComputeHook { config: Config, - state: tokio::sync::Mutex>, + state: std::sync::Mutex>, authorization_header: Option, + + // Concurrency limiter, so that we do not overload the cloud control plane when updating + // large numbers of tenants (e.g. when failing over after a node failure) + api_concurrency: tokio::sync::Semaphore, + + // This lock is only used in testing enviroments, to serialize calls into neon_lock + neon_local_lock: tokio::sync::Mutex<()>, } impl ComputeHook { @@ -181,14 +255,20 @@ impl ComputeHook { state: Default::default(), config, authorization_header, + neon_local_lock: Default::default(), + api_concurrency: tokio::sync::Semaphore::new(API_CONCURRENCY), } } /// For test environments: use neon_local's LocalEnv to update compute async fn do_notify_local( &self, - reconfigure_request: ComputeHookNotifyRequest, + reconfigure_request: &ComputeHookNotifyRequest, ) -> anyhow::Result<()> { + // neon_local updates are not safe to call concurrently, use a lock to serialize + // all calls to this function + let _locked = self.neon_local_lock.lock().await; + let env = match LocalEnv::load_config() { Ok(e) => e, Err(e) => { @@ -205,7 +285,7 @@ impl ComputeHook { } = reconfigure_request; let compute_pageservers = shards - .into_iter() + .iter() .map(|shard| { let ps_conf = env .get_pageserver_conf(shard.node_id) @@ -217,10 +297,10 @@ impl ComputeHook { .collect::>(); for (endpoint_name, endpoint) in &cplane.endpoints { - if endpoint.tenant_id == tenant_id && endpoint.status() == EndpointStatus::Running { + if endpoint.tenant_id == *tenant_id && endpoint.status() == EndpointStatus::Running { tracing::info!("Reconfiguring endpoint {}", endpoint_name,); endpoint - .reconfigure(compute_pageservers.clone(), stripe_size) + .reconfigure(compute_pageservers.clone(), *stripe_size) .await?; } } @@ -298,12 +378,23 @@ impl ComputeHook { async fn do_notify( &self, url: &String, - reconfigure_request: ComputeHookNotifyRequest, + reconfigure_request: &ComputeHookNotifyRequest, cancel: &CancellationToken, ) -> Result<(), NotifyError> { let client = reqwest::Client::new(); + + // We hold these semaphore units across all retries, rather than only across each + // HTTP request: this is to preserve fairness and avoid a situation where a retry might + // time out waiting for a semaphore. + let _units = self + .api_concurrency + .acquire() + .await + // Interpret closed semaphore as shutdown + .map_err(|_| NotifyError::ShuttingDown)?; + backoff::retry( - || self.do_notify_iteration(&client, url, &reconfigure_request, cancel), + || self.do_notify_iteration(&client, url, reconfigure_request, cancel), |e| { matches!( e, @@ -343,42 +434,70 @@ impl ComputeHook { stripe_size: ShardStripeSize, cancel: &CancellationToken, ) -> Result<(), NotifyError> { - let mut locked = self.state.lock().await; + let maybe_send_result = { + let mut state_locked = self.state.lock().unwrap(); - use std::collections::hash_map::Entry; - let tenant = match locked.entry(tenant_shard_id.tenant_id) { - Entry::Vacant(e) => e.insert(ComputeHookTenant::new( - tenant_shard_id, - stripe_size, - node_id, - )), - Entry::Occupied(e) => { - let tenant = e.into_mut(); - tenant.update(tenant_shard_id, stripe_size, node_id); - tenant + use std::collections::hash_map::Entry; + let tenant = match state_locked.entry(tenant_shard_id.tenant_id) { + Entry::Vacant(e) => e.insert(ComputeHookTenant::new( + tenant_shard_id, + stripe_size, + node_id, + )), + Entry::Occupied(e) => { + let tenant = e.into_mut(); + tenant.update(tenant_shard_id, stripe_size, node_id); + tenant + } + }; + tenant.maybe_send(tenant_shard_id.tenant_id, None) + }; + + // Process result: we may get an update to send, or we may have to wait for a lock + // before trying again. + let (request, mut send_lock_guard) = match maybe_send_result { + MaybeSendResult::Noop => { + return Ok(()); } + MaybeSendResult::AwaitLock(send_lock) => { + let send_locked = send_lock.lock_owned().await; + + // Lock order: maybe_send is called within the `[Self::state]` lock, and takes the send lock, but here + // we have acquired the send lock and take `[Self::state]` lock. This is safe because maybe_send only uses + // try_lock. + let state_locked = self.state.lock().unwrap(); + let Some(tenant) = state_locked.get(&tenant_shard_id.tenant_id) else { + return Ok(()); + }; + match tenant.maybe_send(tenant_shard_id.tenant_id, Some(send_locked)) { + MaybeSendResult::AwaitLock(_) => { + unreachable!("We supplied lock guard") + } + MaybeSendResult::Noop => { + return Ok(()); + } + MaybeSendResult::Transmit((request, lock)) => (request, lock), + } + } + MaybeSendResult::Transmit((request, lock)) => (request, lock), }; - let reconfigure_request = tenant.maybe_reconfigure(tenant_shard_id.tenant_id); - let Some(reconfigure_request) = reconfigure_request else { - // The tenant doesn't yet have pageservers for all its shards: we won't notify anything - // until it does. - tracing::info!("Tenant isn't yet ready to emit a notification"); - return Ok(()); - }; - - if let Some(notify_url) = &self.config.compute_hook_url { - self.do_notify(notify_url, reconfigure_request, cancel) - .await + let result = if let Some(notify_url) = &self.config.compute_hook_url { + self.do_notify(notify_url, &request, cancel).await } else { - self.do_notify_local(reconfigure_request) - .await - .map_err(|e| { - // This path is for testing only, so munge the error into our prod-style error type. - tracing::error!("Local notification hook failed: {e}"); - NotifyError::Fatal(StatusCode::INTERNAL_SERVER_ERROR) - }) + self.do_notify_local(&request).await.map_err(|e| { + // This path is for testing only, so munge the error into our prod-style error type. + tracing::error!("Local notification hook failed: {e}"); + NotifyError::Fatal(StatusCode::INTERNAL_SERVER_ERROR) + }) + }; + + if result.is_ok() { + // Before dropping the send lock, stash the request we just sent so that + // subsequent callers can avoid redundantly re-sending the same thing. + *send_lock_guard = Some(request); } + result } } @@ -402,21 +521,22 @@ pub(crate) mod tests { NodeId(1), ); - // An unsharded tenant is always ready to emit a notification - assert!(tenant_state.maybe_reconfigure(tenant_id).is_some()); - assert_eq!( - tenant_state - .maybe_reconfigure(tenant_id) - .unwrap() - .shards - .len(), - 1 - ); - assert!(tenant_state - .maybe_reconfigure(tenant_id) - .unwrap() - .stripe_size - .is_none()); + // An unsharded tenant is always ready to emit a notification, but won't + // send the same one twice + let send_result = tenant_state.maybe_send(tenant_id, None); + let MaybeSendResult::Transmit((request, mut guard)) = send_result else { + anyhow::bail!("Wrong send result"); + }; + assert_eq!(request.shards.len(), 1); + assert!(request.stripe_size.is_none()); + + // Simulate successful send + *guard = Some(request); + drop(guard); + + // Try asking again: this should be a no-op + let send_result = tenant_state.maybe_send(tenant_id, None); + assert!(matches!(send_result, MaybeSendResult::Noop)); // Writing the first shard of a multi-sharded situation (i.e. in a split) // resets the tenant state and puts it in an non-notifying state (need to @@ -430,7 +550,10 @@ pub(crate) mod tests { ShardStripeSize(32768), NodeId(1), ); - assert!(tenant_state.maybe_reconfigure(tenant_id).is_none()); + assert!(matches!( + tenant_state.maybe_send(tenant_id, None), + MaybeSendResult::Noop + )); // Writing the second shard makes it ready to notify tenant_state.update( @@ -443,22 +566,16 @@ pub(crate) mod tests { NodeId(1), ); - assert!(tenant_state.maybe_reconfigure(tenant_id).is_some()); - assert_eq!( - tenant_state - .maybe_reconfigure(tenant_id) - .unwrap() - .shards - .len(), - 2 - ); - assert_eq!( - tenant_state - .maybe_reconfigure(tenant_id) - .unwrap() - .stripe_size, - Some(ShardStripeSize(32768)) - ); + let send_result = tenant_state.maybe_send(tenant_id, None); + let MaybeSendResult::Transmit((request, mut guard)) = send_result else { + anyhow::bail!("Wrong send result"); + }; + assert_eq!(request.shards.len(), 2); + assert_eq!(request.stripe_size, Some(ShardStripeSize(32768))); + + // Simulate successful send + *guard = Some(request); + drop(guard); Ok(()) } diff --git a/control_plane/attachment_service/src/heartbeater.rs b/storage_controller/src/heartbeater.rs similarity index 100% rename from control_plane/attachment_service/src/heartbeater.rs rename to storage_controller/src/heartbeater.rs diff --git a/control_plane/attachment_service/src/http.rs b/storage_controller/src/http.rs similarity index 100% rename from control_plane/attachment_service/src/http.rs rename to storage_controller/src/http.rs diff --git a/control_plane/attachment_service/src/id_lock_map.rs b/storage_controller/src/id_lock_map.rs similarity index 100% rename from control_plane/attachment_service/src/id_lock_map.rs rename to storage_controller/src/id_lock_map.rs diff --git a/control_plane/attachment_service/src/lib.rs b/storage_controller/src/lib.rs similarity index 98% rename from control_plane/attachment_service/src/lib.rs rename to storage_controller/src/lib.rs index 8bcd5c0ac4..2ea490a14b 100644 --- a/control_plane/attachment_service/src/lib.rs +++ b/storage_controller/src/lib.rs @@ -14,7 +14,7 @@ mod reconciler; mod scheduler; mod schema; pub mod service; -mod tenant_state; +mod tenant_shard; #[derive(Ord, PartialOrd, Eq, PartialEq, Copy, Clone, Serialize)] struct Sequence(u64); diff --git a/control_plane/attachment_service/src/main.rs b/storage_controller/src/main.rs similarity index 97% rename from control_plane/attachment_service/src/main.rs rename to storage_controller/src/main.rs index 5150468537..3c03d6efe8 100644 --- a/control_plane/attachment_service/src/main.rs +++ b/storage_controller/src/main.rs @@ -1,13 +1,13 @@ use anyhow::{anyhow, Context}; -use attachment_service::http::make_router; -use attachment_service::metrics::preinitialize_metrics; -use attachment_service::persistence::Persistence; -use attachment_service::service::{Config, Service, MAX_UNAVAILABLE_INTERVAL_DEFAULT}; use camino::Utf8PathBuf; use clap::Parser; use diesel::Connection; use metrics::launch_timestamp::LaunchTimestamp; use std::sync::Arc; +use storage_controller::http::make_router; +use storage_controller::metrics::preinitialize_metrics; +use storage_controller::persistence::Persistence; +use storage_controller::service::{Config, Service, MAX_UNAVAILABLE_INTERVAL_DEFAULT}; use tokio::signal::unix::SignalKind; use tokio_util::sync::CancellationToken; use utils::auth::{JwtAuth, SwappableJwtAuth}; @@ -51,7 +51,7 @@ struct Cli { #[arg(short, long)] path: Option, - /// URL to connect to postgres, like postgresql://localhost:1234/attachment_service + /// URL to connect to postgres, like postgresql://localhost:1234/storage_controller #[arg(long)] database_url: Option, diff --git a/control_plane/attachment_service/src/metrics.rs b/storage_controller/src/metrics.rs similarity index 100% rename from control_plane/attachment_service/src/metrics.rs rename to storage_controller/src/metrics.rs diff --git a/control_plane/attachment_service/src/node.rs b/storage_controller/src/node.rs similarity index 100% rename from control_plane/attachment_service/src/node.rs rename to storage_controller/src/node.rs diff --git a/control_plane/attachment_service/src/pageserver_client.rs b/storage_controller/src/pageserver_client.rs similarity index 100% rename from control_plane/attachment_service/src/pageserver_client.rs rename to storage_controller/src/pageserver_client.rs diff --git a/control_plane/attachment_service/src/persistence.rs b/storage_controller/src/persistence.rs similarity index 99% rename from control_plane/attachment_service/src/persistence.rs rename to storage_controller/src/persistence.rs index d60392bdbc..55fbfd10bc 100644 --- a/control_plane/attachment_service/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -696,7 +696,7 @@ impl Persistence { } } -/// Parts of [`crate::tenant_state::TenantState`] that are stored durably +/// Parts of [`crate::tenant_shard::TenantShard`] that are stored durably #[derive(Queryable, Selectable, Insertable, Serialize, Deserialize, Clone, Eq, PartialEq)] #[diesel(table_name = crate::schema::tenant_shards)] pub(crate) struct TenantShardPersistence { diff --git a/control_plane/attachment_service/src/persistence/split_state.rs b/storage_controller/src/persistence/split_state.rs similarity index 100% rename from control_plane/attachment_service/src/persistence/split_state.rs rename to storage_controller/src/persistence/split_state.rs diff --git a/control_plane/attachment_service/src/reconciler.rs b/storage_controller/src/reconciler.rs similarity index 99% rename from control_plane/attachment_service/src/reconciler.rs rename to storage_controller/src/reconciler.rs index 72eb8faccb..49cfaad569 100644 --- a/control_plane/attachment_service/src/reconciler.rs +++ b/storage_controller/src/reconciler.rs @@ -18,14 +18,14 @@ use utils::sync::gate::GateGuard; use crate::compute_hook::{ComputeHook, NotifyError}; use crate::node::Node; -use crate::tenant_state::{IntentState, ObservedState, ObservedStateLocation}; +use crate::tenant_shard::{IntentState, ObservedState, ObservedStateLocation}; const DEFAULT_HEATMAP_PERIOD: &str = "60s"; /// Object with the lifetime of the background reconcile task that is created /// for tenants which have a difference between their intent and observed states. pub(super) struct Reconciler { - /// See [`crate::tenant_state::TenantState`] for the meanings of these fields: they are a snapshot + /// See [`crate::tenant_shard::TenantShard`] for the meanings of these fields: they are a snapshot /// of a tenant's state from when we spawned a reconcile task. pub(super) tenant_shard_id: TenantShardId, pub(crate) shard: ShardIdentity, @@ -48,11 +48,11 @@ pub(super) struct Reconciler { /// To avoid stalling if the cloud control plane is unavailable, we may proceed /// past failures in [`ComputeHook::notify`], but we _must_ remember that we failed - /// so that we can set [`crate::tenant_state::TenantState::pending_compute_notification`] to ensure a later retry. + /// so that we can set [`crate::tenant_shard::TenantShard::pending_compute_notification`] to ensure a later retry. pub(crate) compute_notify_failure: bool, /// A means to abort background reconciliation: it is essential to - /// call this when something changes in the original TenantState that + /// call this when something changes in the original TenantShard that /// will make this reconciliation impossible or unnecessary, for /// example when a pageserver node goes offline, or the PlacementPolicy for /// the tenant is changed. @@ -66,7 +66,7 @@ pub(super) struct Reconciler { pub(crate) persistence: Arc, } -/// This is a snapshot of [`crate::tenant_state::IntentState`], but it does not do any +/// This is a snapshot of [`crate::tenant_shard::IntentState`], but it does not do any /// reference counting for Scheduler. The IntentState is what the scheduler works with, /// and the TargetState is just the instruction for a particular Reconciler run. #[derive(Debug)] diff --git a/control_plane/attachment_service/src/scheduler.rs b/storage_controller/src/scheduler.rs similarity index 98% rename from control_plane/attachment_service/src/scheduler.rs rename to storage_controller/src/scheduler.rs index 782189d11f..862ac0cbfe 100644 --- a/control_plane/attachment_service/src/scheduler.rs +++ b/storage_controller/src/scheduler.rs @@ -1,4 +1,4 @@ -use crate::{node::Node, tenant_state::TenantState}; +use crate::{node::Node, tenant_shard::TenantShard}; use pageserver_api::controller_api::UtilizationScore; use serde::Serialize; use std::collections::HashMap; @@ -27,7 +27,7 @@ pub enum MaySchedule { #[derive(Serialize)] struct SchedulerNode { - /// How many shards are currently scheduled on this node, via their [`crate::tenant_state::IntentState`]. + /// How many shards are currently scheduled on this node, via their [`crate::tenant_shard::IntentState`]. shard_count: usize, /// Whether this node is currently elegible to have new shards scheduled (this is derived @@ -84,7 +84,7 @@ impl std::ops::Add for AffinityScore { } } -// For carrying state between multiple calls to [`TenantState::schedule`], e.g. when calling +// For carrying state between multiple calls to [`TenantShard::schedule`], e.g. when calling // it for many shards in the same tenant. #[derive(Debug, Default)] pub(crate) struct ScheduleContext { @@ -147,7 +147,7 @@ impl Scheduler { pub(crate) fn consistency_check<'a>( &self, nodes: impl Iterator, - shards: impl Iterator, + shards: impl Iterator, ) -> anyhow::Result<()> { let mut expect_nodes: HashMap = HashMap::new(); for node in nodes { @@ -398,7 +398,7 @@ pub(crate) mod test_utils { mod tests { use super::*; - use crate::tenant_state::IntentState; + use crate::tenant_shard::IntentState; #[test] fn scheduler_basic() -> anyhow::Result<()> { let nodes = test_utils::make_test_nodes(2); diff --git a/control_plane/attachment_service/src/schema.rs b/storage_controller/src/schema.rs similarity index 100% rename from control_plane/attachment_service/src/schema.rs rename to storage_controller/src/schema.rs diff --git a/control_plane/attachment_service/src/service.rs b/storage_controller/src/service.rs similarity index 98% rename from control_plane/attachment_service/src/service.rs rename to storage_controller/src/service.rs index 0f87a8ab05..010558b797 100644 --- a/control_plane/attachment_service/src/service.rs +++ b/storage_controller/src/service.rs @@ -66,9 +66,9 @@ use crate::{ persistence::{split_state::SplitState, DatabaseError, Persistence, TenantShardPersistence}, reconciler::attached_location_conf, scheduler::Scheduler, - tenant_state::{ + tenant_shard::{ IntentState, ObservedState, ObservedStateLocation, ReconcileResult, ReconcileWaitError, - ReconcilerWaiter, TenantState, + ReconcilerWaiter, TenantShard, }, }; @@ -92,7 +92,7 @@ pub const MAX_UNAVAILABLE_INTERVAL_DEFAULT: Duration = Duration::from_secs(30); // Top level state available to all HTTP handlers struct ServiceState { - tenants: BTreeMap, + tenants: BTreeMap, nodes: Arc>, @@ -102,7 +102,7 @@ struct ServiceState { impl ServiceState { fn new( nodes: HashMap, - tenants: BTreeMap, + tenants: BTreeMap, scheduler: Scheduler, ) -> Self { Self { @@ -116,7 +116,7 @@ impl ServiceState { &mut self, ) -> ( &mut Arc>, - &mut BTreeMap, + &mut BTreeMap, &mut Scheduler, ) { (&mut self.nodes, &mut self.tenants, &mut self.scheduler) @@ -335,11 +335,11 @@ impl Service { for (tenant_shard_id, shard_observations) in observed { for (node_id, observed_loc) in shard_observations { - let Some(tenant_state) = tenants.get_mut(&tenant_shard_id) else { + let Some(tenant_shard) = tenants.get_mut(&tenant_shard_id) else { cleanup.push((tenant_shard_id, node_id)); continue; }; - tenant_state + tenant_shard .observed .locations .insert(node_id, ObservedStateLocation { conf: observed_loc }); @@ -348,14 +348,14 @@ impl Service { // Populate each tenant's intent state let mut schedule_context = ScheduleContext::default(); - for (tenant_shard_id, tenant_state) in tenants.iter_mut() { + for (tenant_shard_id, tenant_shard) in tenants.iter_mut() { if tenant_shard_id.shard_number == ShardNumber(0) { // Reset scheduling context each time we advance to the next Tenant schedule_context = ScheduleContext::default(); } - tenant_state.intent_from_observed(scheduler); - if let Err(e) = tenant_state.schedule(scheduler, &mut schedule_context) { + tenant_shard.intent_from_observed(scheduler); + if let Err(e) = tenant_shard.schedule(scheduler, &mut schedule_context) { // Non-fatal error: we are unable to properly schedule the tenant, perhaps because // not enough pageservers are available. The tenant may well still be available // to clients. @@ -364,11 +364,11 @@ impl Service { // If we're both intending and observed to be attached at a particular node, we will // emit a compute notification for this. In the case where our observed state does not // yet match our intent, we will eventually reconcile, and that will emit a compute notification. - if let Some(attached_at) = tenant_state.stably_attached() { + if let Some(attached_at) = tenant_shard.stably_attached() { compute_notifications.push(( *tenant_shard_id, attached_at, - tenant_state.shard.stripe_size, + tenant_shard.shard.stripe_size, )); } } @@ -743,7 +743,7 @@ impl Service { /// Apply the contents of a [`ReconcileResult`] to our in-memory state: if the reconciliation /// was successful, this will update the observed state of the tenant such that subsequent - /// calls to [`TenantState::maybe_reconcile`] will do nothing. + /// calls to [`TenantShard::maybe_reconcile`] will do nothing. #[instrument(skip_all, fields( tenant_id=%result.tenant_shard_id.tenant_id, shard_id=%result.tenant_shard_id.shard_slug(), sequence=%result.sequence @@ -761,10 +761,10 @@ impl Service { tenant.generation = std::cmp::max(tenant.generation, result.generation); // If the reconciler signals that it failed to notify compute, set this state on - // the shard so that a future [`TenantState::maybe_reconcile`] will try again. + // the shard so that a future [`TenantShard::maybe_reconcile`] will try again. tenant.pending_compute_notification = result.pending_compute_notification; - // Let the TenantState know it is idle. + // Let the TenantShard know it is idle. tenant.reconcile_complete(result.sequence); match result.result { @@ -979,7 +979,7 @@ impl Service { if let Some(generation_pageserver) = tsp.generation_pageserver { intent.set_attached(&mut scheduler, Some(NodeId(generation_pageserver as u64))); } - let new_tenant = TenantState::from_persistent(tsp, intent)?; + let new_tenant = TenantShard::from_persistent(tsp, intent)?; tenants.insert(tenant_shard_id, new_tenant); } @@ -1126,7 +1126,7 @@ impl Service { let mut locked = self.inner.write().unwrap(); locked.tenants.insert( attach_req.tenant_shard_id, - TenantState::new( + TenantShard::new( attach_req.tenant_shard_id, ShardIdentity::unsharded(), PlacementPolicy::Attached(0), @@ -1178,32 +1178,32 @@ impl Service { let mut locked = self.inner.write().unwrap(); let (_nodes, tenants, scheduler) = locked.parts_mut(); - let tenant_state = tenants + let tenant_shard = tenants .get_mut(&attach_req.tenant_shard_id) .expect("Checked for existence above"); if let Some(new_generation) = new_generation { - tenant_state.generation = Some(new_generation); - tenant_state.policy = PlacementPolicy::Attached(0); + tenant_shard.generation = Some(new_generation); + tenant_shard.policy = PlacementPolicy::Attached(0); } else { // This is a detach notification. We must update placement policy to avoid re-attaching // during background scheduling/reconciliation, or during storage controller restart. assert!(attach_req.node_id.is_none()); - tenant_state.policy = PlacementPolicy::Detached; + tenant_shard.policy = PlacementPolicy::Detached; } if let Some(attaching_pageserver) = attach_req.node_id.as_ref() { tracing::info!( tenant_id = %attach_req.tenant_shard_id, ps_id = %attaching_pageserver, - generation = ?tenant_state.generation, + generation = ?tenant_shard.generation, "issuing", ); - } else if let Some(ps_id) = tenant_state.intent.get_attached() { + } else if let Some(ps_id) = tenant_shard.intent.get_attached() { tracing::info!( tenant_id = %attach_req.tenant_shard_id, %ps_id, - generation = ?tenant_state.generation, + generation = ?tenant_shard.generation, "dropping", ); } else { @@ -1211,14 +1211,14 @@ impl Service { tenant_id = %attach_req.tenant_shard_id, "no-op: tenant already has no pageserver"); } - tenant_state + tenant_shard .intent .set_attached(scheduler, attach_req.node_id); tracing::info!( "attach_hook: tenant {} set generation {:?}, pageserver {}", attach_req.tenant_shard_id, - tenant_state.generation, + tenant_shard.generation, // TODO: this is an odd number of 0xf's attach_req.node_id.unwrap_or(utils::id::NodeId(0xfffffff)) ); @@ -1230,36 +1230,36 @@ impl Service { #[cfg(feature = "testing")] { if let Some(node_id) = attach_req.node_id { - tenant_state.observed.locations = HashMap::from([( + tenant_shard.observed.locations = HashMap::from([( node_id, ObservedStateLocation { conf: Some(attached_location_conf( - tenant_state.generation.unwrap(), - &tenant_state.shard, - &tenant_state.config, + tenant_shard.generation.unwrap(), + &tenant_shard.shard, + &tenant_shard.config, false, )), }, )]); } else { - tenant_state.observed.locations.clear(); + tenant_shard.observed.locations.clear(); } } Ok(AttachHookResponse { gen: attach_req .node_id - .map(|_| tenant_state.generation.expect("Test hook, not used on tenants that are mid-onboarding with a NULL generation").into().unwrap()), + .map(|_| tenant_shard.generation.expect("Test hook, not used on tenants that are mid-onboarding with a NULL generation").into().unwrap()), }) } pub(crate) fn inspect(&self, inspect_req: InspectRequest) -> InspectResponse { let locked = self.inner.read().unwrap(); - let tenant_state = locked.tenants.get(&inspect_req.tenant_shard_id); + let tenant_shard = locked.tenants.get(&inspect_req.tenant_shard_id); InspectResponse { - attachment: tenant_state.and_then(|s| { + attachment: tenant_shard.and_then(|s| { s.intent .get_attached() .map(|ps| (s.generation.expect("Test hook, not used on tenants that are mid-onboarding with a NULL generation").into().unwrap(), ps)) @@ -1321,11 +1321,11 @@ impl Service { let mut locked = self.inner.write().unwrap(); for (tenant_shard_id, observed_loc) in configs.tenant_shards { - let Some(tenant_state) = locked.tenants.get_mut(&tenant_shard_id) else { + let Some(tenant_shard) = locked.tenants.get_mut(&tenant_shard_id) else { cleanup.push(tenant_shard_id); continue; }; - tenant_state + tenant_shard .observed .locations .insert(node.get_id(), ObservedStateLocation { conf: observed_loc }); @@ -1496,13 +1496,13 @@ impl Service { }; for req_tenant in validate_req.tenants { - if let Some(tenant_state) = locked.tenants.get(&req_tenant.id) { - let valid = tenant_state.generation == Some(Generation::new(req_tenant.gen)); + if let Some(tenant_shard) = locked.tenants.get(&req_tenant.id) { + let valid = tenant_shard.generation == Some(Generation::new(req_tenant.gen)); tracing::info!( "handle_validate: {}(gen {}): valid={valid} (latest {:?})", req_tenant.id, req_tenant.gen, - tenant_state.generation + tenant_shard.generation ); response.tenants.push(ValidateResponseTenant { id: req_tenant.id, @@ -1688,7 +1688,7 @@ impl Service { continue; } Entry::Vacant(entry) => { - let state = entry.insert(TenantState::new( + let state = entry.insert(TenantShard::new( tenant_shard_id, ShardIdentity::from_params( tenant_shard_id.shard_number, @@ -2738,7 +2738,7 @@ impl Service { /// Returns None if the input iterator of shards does not include a shard with number=0 fn tenant_describe_impl<'a>( &self, - shards: impl Iterator, + shards: impl Iterator, ) -> Option { let mut shard_zero = None; let mut describe_shards = Vec::new(); @@ -3038,7 +3038,7 @@ impl Service { }, ); - let mut child_state = TenantState::new(child, child_shard, policy.clone()); + let mut child_state = TenantShard::new(child, child_shard, policy.clone()); child_state.intent = IntentState::single(scheduler, Some(pageserver)); child_state.observed = ObservedState { locations: child_observed, @@ -3046,7 +3046,7 @@ impl Service { child_state.generation = Some(generation); child_state.config = config.clone(); - // The child's TenantState::splitting is intentionally left at the default value of Idle, + // The child's TenantShard::splitting is intentionally left at the default value of Idle, // as at this point in the split process we have succeeded and this part is infallible: // we will never need to do any special recovery from this state. @@ -3595,8 +3595,8 @@ impl Service { Ok(()) } - /// For debug/support: a full JSON dump of TenantStates. Returns a response so that - /// we don't have to make TenantState clonable in the return path. + /// For debug/support: a full JSON dump of TenantShards. Returns a response so that + /// we don't have to make TenantShard clonable in the return path. pub(crate) fn tenants_dump(&self) -> Result, ApiError> { let serialized = { let locked = self.inner.read().unwrap(); @@ -3700,7 +3700,7 @@ impl Service { } /// For debug/support: a JSON dump of the [`Scheduler`]. Returns a response so that - /// we don't have to make TenantState clonable in the return path. + /// we don't have to make TenantShard clonable in the return path. pub(crate) fn scheduler_dump(&self) -> Result, ApiError> { let serialized = { let locked = self.inner.read().unwrap(); @@ -3917,8 +3917,8 @@ impl Service { tracing::info!("Node {} transition to offline", node_id); let mut tenants_affected: usize = 0; - for (tenant_shard_id, tenant_state) in tenants { - if let Some(observed_loc) = tenant_state.observed.locations.get_mut(&node_id) { + for (tenant_shard_id, tenant_shard) in tenants { + if let Some(observed_loc) = tenant_shard.observed.locations.get_mut(&node_id) { // When a node goes offline, we set its observed configuration to None, indicating unknown: we will // not assume our knowledge of the node's configuration is accurate until it comes back online observed_loc.conf = None; @@ -3931,24 +3931,24 @@ impl Service { continue; } - if tenant_state.intent.demote_attached(node_id) { - tenant_state.sequence = tenant_state.sequence.next(); + if tenant_shard.intent.demote_attached(node_id) { + tenant_shard.sequence = tenant_shard.sequence.next(); // TODO: populate a ScheduleContext including all shards in the same tenant_id (only matters // for tenants without secondary locations: if they have a secondary location, then this // schedule() call is just promoting an existing secondary) let mut schedule_context = ScheduleContext::default(); - match tenant_state.schedule(scheduler, &mut schedule_context) { + match tenant_shard.schedule(scheduler, &mut schedule_context) { Err(e) => { // It is possible that some tenants will become unschedulable when too many pageservers // go offline: in this case there isn't much we can do other than make the issue observable. - // TODO: give TenantState a scheduling error attribute to be queried later. + // TODO: give TenantShard a scheduling error attribute to be queried later. tracing::warn!(%tenant_shard_id, "Scheduling error when marking pageserver {} offline: {e}", node_id); } Ok(()) => { if self - .maybe_reconcile_shard(tenant_state, &new_nodes) + .maybe_reconcile_shard(tenant_shard, &new_nodes) .is_some() { tenants_affected += 1; @@ -3967,10 +3967,10 @@ impl Service { tracing::info!("Node {} transition to active", node_id); // When a node comes back online, we must reconcile any tenant that has a None observed // location on the node. - for tenant_state in locked.tenants.values_mut() { - if let Some(observed_loc) = tenant_state.observed.locations.get_mut(&node_id) { + for tenant_shard in locked.tenants.values_mut() { + if let Some(observed_loc) = tenant_shard.observed.locations.get_mut(&node_id) { if observed_loc.conf.is_none() { - self.maybe_reconcile_shard(tenant_state, &new_nodes); + self.maybe_reconcile_shard(tenant_shard, &new_nodes); } } } @@ -4053,11 +4053,11 @@ impl Service { Ok(()) } - /// Convenience wrapper around [`TenantState::maybe_reconcile`] that provides + /// Convenience wrapper around [`TenantShard::maybe_reconcile`] that provides /// all the references to parts of Self that are needed fn maybe_reconcile_shard( &self, - shard: &mut TenantState, + shard: &mut TenantShard, nodes: &Arc>, ) -> Option { shard.maybe_reconcile( @@ -4123,7 +4123,7 @@ impl Service { let mut reconciles_spawned = 0; - let mut tenant_shards: Vec<&TenantState> = Vec::new(); + let mut tenant_shards: Vec<&TenantShard> = Vec::new(); // Limit on how many shards' optmizations each call to this function will execute. Combined // with the frequency of background calls, this acts as an implicit rate limit that runs a small @@ -4254,7 +4254,7 @@ impl Service { pub async fn shutdown(&self) { // Note that this already stops processing any results from reconciles: so - // we do not expect that our [`TenantState`] objects will reach a neat + // we do not expect that our [`TenantShard`] objects will reach a neat // final state. self.cancel.cancel(); diff --git a/control_plane/attachment_service/src/tenant_state.rs b/storage_controller/src/tenant_shard.rs similarity index 96% rename from control_plane/attachment_service/src/tenant_state.rs rename to storage_controller/src/tenant_shard.rs index 6717b8e178..58b8ef8d5d 100644 --- a/control_plane/attachment_service/src/tenant_state.rs +++ b/storage_controller/src/tenant_shard.rs @@ -50,7 +50,7 @@ where /// This struct implement Serialize for debugging purposes, but is _not_ persisted /// itself: see [`crate::persistence`] for the subset of tenant shard state that is persisted. #[derive(Serialize)] -pub(crate) struct TenantState { +pub(crate) struct TenantShard { pub(crate) tenant_shard_id: TenantShardId, pub(crate) shard: ShardIdentity, @@ -354,7 +354,7 @@ pub(crate) struct ReconcilerHandle { } /// When a reconcile task completes, it sends this result object -/// to be applied to the primary TenantState. +/// to be applied to the primary TenantShard. pub(crate) struct ReconcileResult { pub(crate) sequence: Sequence, /// On errors, `observed` should be treated as an incompleted description @@ -367,7 +367,7 @@ pub(crate) struct ReconcileResult { pub(crate) generation: Option, pub(crate) observed: ObservedState, - /// Set [`TenantState::pending_compute_notification`] from this flag + /// Set [`TenantShard::pending_compute_notification`] from this flag pub(crate) pending_compute_notification: bool, } @@ -379,7 +379,7 @@ impl ObservedState { } } -impl TenantState { +impl TenantShard { pub(crate) fn new( tenant_shard_id: TenantShardId, shard: ShardIdentity, @@ -1143,7 +1143,7 @@ pub(crate) mod tests { use super::*; - fn make_test_tenant_shard(policy: PlacementPolicy) -> TenantState { + fn make_test_tenant_shard(policy: PlacementPolicy) -> TenantShard { let tenant_id = TenantId::generate(); let shard_number = ShardNumber(0); let shard_count = ShardCount::new(1); @@ -1153,7 +1153,7 @@ pub(crate) mod tests { shard_number, shard_count, }; - TenantState::new( + TenantShard::new( tenant_shard_id, ShardIdentity::new( shard_number, @@ -1165,7 +1165,7 @@ pub(crate) mod tests { ) } - fn make_test_tenant(policy: PlacementPolicy, shard_count: ShardCount) -> Vec { + fn make_test_tenant(policy: PlacementPolicy, shard_count: ShardCount) -> Vec { let tenant_id = TenantId::generate(); (0..shard_count.count()) @@ -1177,7 +1177,7 @@ pub(crate) mod tests { shard_number, shard_count, }; - TenantState::new( + TenantShard::new( tenant_shard_id, ShardIdentity::new( shard_number, @@ -1202,24 +1202,24 @@ pub(crate) mod tests { let mut scheduler = Scheduler::new(nodes.values()); let mut context = ScheduleContext::default(); - let mut tenant_state = make_test_tenant_shard(PlacementPolicy::Attached(1)); - tenant_state + let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1)); + tenant_shard .schedule(&mut scheduler, &mut context) .expect("we have enough nodes, scheduling should work"); // Expect to initially be schedule on to different nodes - assert_eq!(tenant_state.intent.secondary.len(), 1); - assert!(tenant_state.intent.attached.is_some()); + assert_eq!(tenant_shard.intent.secondary.len(), 1); + assert!(tenant_shard.intent.attached.is_some()); - let attached_node_id = tenant_state.intent.attached.unwrap(); - let secondary_node_id = *tenant_state.intent.secondary.iter().last().unwrap(); + let attached_node_id = tenant_shard.intent.attached.unwrap(); + let secondary_node_id = *tenant_shard.intent.secondary.iter().last().unwrap(); assert_ne!(attached_node_id, secondary_node_id); // Notifying the attached node is offline should demote it to a secondary - let changed = tenant_state.intent.demote_attached(attached_node_id); + let changed = tenant_shard.intent.demote_attached(attached_node_id); assert!(changed); - assert!(tenant_state.intent.attached.is_none()); - assert_eq!(tenant_state.intent.secondary.len(), 2); + assert!(tenant_shard.intent.attached.is_none()); + assert_eq!(tenant_shard.intent.secondary.len(), 2); // Update the scheduler state to indicate the node is offline nodes @@ -1229,18 +1229,18 @@ pub(crate) mod tests { scheduler.node_upsert(nodes.get(&attached_node_id).unwrap()); // Scheduling the node should promote the still-available secondary node to attached - tenant_state + tenant_shard .schedule(&mut scheduler, &mut context) .expect("active nodes are available"); - assert_eq!(tenant_state.intent.attached.unwrap(), secondary_node_id); + assert_eq!(tenant_shard.intent.attached.unwrap(), secondary_node_id); // The original attached node should have been retained as a secondary assert_eq!( - *tenant_state.intent.secondary.iter().last().unwrap(), + *tenant_shard.intent.secondary.iter().last().unwrap(), attached_node_id ); - tenant_state.intent.clear(&mut scheduler); + tenant_shard.intent.clear(&mut scheduler); Ok(()) } @@ -1250,48 +1250,48 @@ pub(crate) mod tests { let nodes = make_test_nodes(3); let mut scheduler = Scheduler::new(nodes.values()); - let mut tenant_state = make_test_tenant_shard(PlacementPolicy::Attached(1)); + let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1)); - tenant_state.observed.locations.insert( + tenant_shard.observed.locations.insert( NodeId(3), ObservedStateLocation { conf: Some(LocationConfig { mode: LocationConfigMode::AttachedMulti, generation: Some(2), secondary_conf: None, - shard_number: tenant_state.shard.number.0, - shard_count: tenant_state.shard.count.literal(), - shard_stripe_size: tenant_state.shard.stripe_size.0, + shard_number: tenant_shard.shard.number.0, + shard_count: tenant_shard.shard.count.literal(), + shard_stripe_size: tenant_shard.shard.stripe_size.0, tenant_conf: TenantConfig::default(), }), }, ); - tenant_state.observed.locations.insert( + tenant_shard.observed.locations.insert( NodeId(2), ObservedStateLocation { conf: Some(LocationConfig { mode: LocationConfigMode::AttachedStale, generation: Some(1), secondary_conf: None, - shard_number: tenant_state.shard.number.0, - shard_count: tenant_state.shard.count.literal(), - shard_stripe_size: tenant_state.shard.stripe_size.0, + shard_number: tenant_shard.shard.number.0, + shard_count: tenant_shard.shard.count.literal(), + shard_stripe_size: tenant_shard.shard.stripe_size.0, tenant_conf: TenantConfig::default(), }), }, ); - tenant_state.intent_from_observed(&mut scheduler); + tenant_shard.intent_from_observed(&mut scheduler); // The highest generationed attached location gets used as attached - assert_eq!(tenant_state.intent.attached, Some(NodeId(3))); + assert_eq!(tenant_shard.intent.attached, Some(NodeId(3))); // Other locations get used as secondary - assert_eq!(tenant_state.intent.secondary, vec![NodeId(2)]); + assert_eq!(tenant_shard.intent.secondary, vec![NodeId(2)]); - scheduler.consistency_check(nodes.values(), [&tenant_state].into_iter())?; + scheduler.consistency_check(nodes.values(), [&tenant_shard].into_iter())?; - tenant_state.intent.clear(&mut scheduler); + tenant_shard.intent.clear(&mut scheduler); Ok(()) } @@ -1300,23 +1300,23 @@ pub(crate) mod tests { let nodes = make_test_nodes(3); let mut scheduler = Scheduler::new(nodes.values()); - let mut tenant_state = make_test_tenant_shard(PlacementPolicy::Attached(1)); + let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1)); // In pause mode, schedule() shouldn't do anything - tenant_state.scheduling_policy = ShardSchedulingPolicy::Pause; - assert!(tenant_state + tenant_shard.scheduling_policy = ShardSchedulingPolicy::Pause; + assert!(tenant_shard .schedule(&mut scheduler, &mut ScheduleContext::default()) .is_ok()); - assert!(tenant_state.intent.all_pageservers().is_empty()); + assert!(tenant_shard.intent.all_pageservers().is_empty()); // In active mode, schedule() works - tenant_state.scheduling_policy = ShardSchedulingPolicy::Active; - assert!(tenant_state + tenant_shard.scheduling_policy = ShardSchedulingPolicy::Active; + assert!(tenant_shard .schedule(&mut scheduler, &mut ScheduleContext::default()) .is_ok()); - assert!(!tenant_state.intent.all_pageservers().is_empty()); + assert!(!tenant_shard.intent.all_pageservers().is_empty()); - tenant_state.intent.clear(&mut scheduler); + tenant_shard.intent.clear(&mut scheduler); Ok(()) } @@ -1429,7 +1429,7 @@ pub(crate) mod tests { fn optimize_til_idle( nodes: &HashMap, scheduler: &mut Scheduler, - shards: &mut [TenantState], + shards: &mut [TenantShard], ) { let mut loop_n = 0; loop { diff --git a/test_runner/fixtures/workload.py b/test_runner/fixtures/workload.py index 4ebc02e6fd..c44628ce06 100644 --- a/test_runner/fixtures/workload.py +++ b/test_runner/fixtures/workload.py @@ -81,15 +81,14 @@ class Workload: return self._endpoint - def __del__(self): - if self._endpoint is not None: - self._endpoint.stop() - def stop(self): if self._endpoint is not None: self._endpoint.stop() self._endpoint = None + def __del__(self): + self.stop() + def init(self, pageserver_id: Optional[int] = None): endpoint = self.endpoint(pageserver_id) diff --git a/test_runner/regress/test_pageserver_generations.py b/test_runner/regress/test_pageserver_generations.py index 4767f2edb1..67f68a62af 100644 --- a/test_runner/regress/test_pageserver_generations.py +++ b/test_runner/regress/test_pageserver_generations.py @@ -22,6 +22,7 @@ from fixtures.neon_fixtures import ( NeonPageserver, PgBin, S3Scrubber, + flush_ep_to_pageserver, last_flush_lsn_upload, ) from fixtures.pageserver.http import PageserverApiException @@ -30,6 +31,7 @@ from fixtures.pageserver.utils import ( list_prefix, wait_for_last_record_lsn, wait_for_upload, + wait_for_upload_queue_empty, ) from fixtures.remote_storage import ( RemoteStorageKind, @@ -111,7 +113,6 @@ def generate_uploads_and_deletions( last_flush_lsn_upload( env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver.id ) - ps_http.timeline_checkpoint(tenant_id, timeline_id) # Compaction should generate some GC-elegible layers for i in range(0, 2): @@ -121,6 +122,17 @@ def generate_uploads_and_deletions( print_gc_result(gc_result) assert gc_result["layers_removed"] > 0 + # Stop endpoint and flush all data to pageserver, then checkpoint it: this + # ensures that the pageserver is in a fully idle state: there will be no more + # background ingest, no more uploads pending, and therefore no non-determinism + # in subsequent actions like pageserver restarts. + final_lsn = flush_ep_to_pageserver(env, endpoint, tenant_id, timeline_id, pageserver.id) + ps_http.timeline_checkpoint(tenant_id, timeline_id) + # Finish uploads + wait_for_upload(ps_http, tenant_id, timeline_id, final_lsn) + # Finish all remote writes (including deletions) + wait_for_upload_queue_empty(ps_http, tenant_id, timeline_id) + def read_all( env: NeonEnv, tenant_id: Optional[TenantId] = None, timeline_id: Optional[TimelineId] = None @@ -385,9 +397,8 @@ def test_deletion_queue_recovery( if validate_before == ValidateBefore.NO_VALIDATE: failpoints.append( # Prevent deletion lists from being validated, we will test that they are - # dropped properly during recovery. 'pause' is okay here because we kill - # the pageserver with immediate=true - ("control-plane-client-validate", "pause") + # dropped properly during recovery. This is such a long sleep as to be equivalent to "never" + ("control-plane-client-validate", "return(3600000)") ) ps_http.configure_failpoints(failpoints) diff --git a/test_runner/regress/test_sharding.py b/test_runner/regress/test_sharding.py index bca11bbbe7..bfaab9125f 100644 --- a/test_runner/regress/test_sharding.py +++ b/test_runner/regress/test_sharding.py @@ -10,11 +10,13 @@ from fixtures.log_helper import log from fixtures.neon_fixtures import ( NeonEnv, NeonEnvBuilder, + S3Scrubber, StorageControllerApiException, last_flush_lsn_upload, tenant_get_shards, wait_for_last_flush_lsn, ) +from fixtures.pageserver.utils import assert_prefix_empty, assert_prefix_not_empty from fixtures.remote_storage import s3_storage from fixtures.types import Lsn, TenantId, TenantShardId, TimelineId from fixtures.utils import wait_until @@ -69,6 +71,15 @@ def test_sharding_smoke( log.info(f"sizes = {sizes}") return sizes + # The imported initdb for timeline creation should + # not be fully imported on every shard. We use a 1MB strripe size so expect + # pretty good distribution: no one shard should have more than half the data + sizes = get_sizes() + physical_initdb_total = sum(sizes.values()) + expect_initdb_size = 20 * 1024 * 1024 + assert physical_initdb_total > expect_initdb_size + assert all(s < expect_initdb_size // 2 for s in sizes.values()) + # Test that timeline creation works on a sharded tenant timeline_b = env.neon_cli.create_branch("branch_b", tenant_id=tenant_id) @@ -101,6 +112,38 @@ def test_sharding_smoke( env.storage_controller.consistency_check() + # Validate that deleting a sharded tenant removes all files in the prefix + + # Before deleting, stop the client and check we have some objects to delete + workload.stop() + assert_prefix_not_empty( + neon_env_builder.pageserver_remote_storage, + prefix="/".join( + ( + "tenants", + str(tenant_id), + ) + ), + ) + + # Check the scrubber isn't confused by sharded content, then disable + # it during teardown because we'll have deleted by then + S3Scrubber(neon_env_builder).scan_metadata() + neon_env_builder.scrub_on_exit = False + + env.storage_controller.pageserver_api().tenant_delete(tenant_id) + assert_prefix_empty( + neon_env_builder.pageserver_remote_storage, + prefix="/".join( + ( + "tenants", + str(tenant_id), + ) + ), + ) + + env.storage_controller.consistency_check() + def test_sharding_split_unsharded( neon_env_builder: NeonEnvBuilder, diff --git a/test_runner/regress/test_sharding_service.py b/test_runner/regress/test_storage_controller.py similarity index 97% rename from test_runner/regress/test_sharding_service.py rename to test_runner/regress/test_storage_controller.py index 3248afae15..840f354142 100644 --- a/test_runner/regress/test_sharding_service.py +++ b/test_runner/regress/test_storage_controller.py @@ -42,11 +42,11 @@ def get_node_shard_counts(env: NeonEnv, tenant_ids): return counts -def test_sharding_service_smoke( +def test_storage_controller_smoke( neon_env_builder: NeonEnvBuilder, ): """ - Test the basic lifecycle of a sharding service: + Test the basic lifecycle of a storage controller: - Restarting - Restarting a pageserver - Creating and deleting tenants and timelines @@ -204,7 +204,7 @@ def test_node_status_after_restart( env.storage_controller.consistency_check() -def test_sharding_service_passthrough( +def test_storage_controller_passthrough( neon_env_builder: NeonEnvBuilder, ): """ @@ -231,7 +231,7 @@ def test_sharding_service_passthrough( env.storage_controller.consistency_check() -def test_sharding_service_restart(neon_env_builder: NeonEnvBuilder): +def test_storage_controller_restart(neon_env_builder: NeonEnvBuilder): env = neon_env_builder.init_start() tenant_a = env.initial_tenant tenant_b = TenantId.generate() @@ -266,7 +266,7 @@ def test_sharding_service_restart(neon_env_builder: NeonEnvBuilder): @pytest.mark.parametrize("warm_up", [True, False]) -def test_sharding_service_onboarding(neon_env_builder: NeonEnvBuilder, warm_up: bool): +def test_storage_controller_onboarding(neon_env_builder: NeonEnvBuilder, warm_up: bool): """ We onboard tenants to the sharding service by treating it as a 'virtual pageserver' which provides the /location_config API. This is similar to creating a tenant, @@ -420,7 +420,7 @@ def test_sharding_service_onboarding(neon_env_builder: NeonEnvBuilder, warm_up: env.storage_controller.consistency_check() -def test_sharding_service_compute_hook( +def test_storage_controller_compute_hook( httpserver: HTTPServer, neon_env_builder: NeonEnvBuilder, httpserver_listen_address, @@ -533,7 +533,7 @@ def test_sharding_service_compute_hook( env.storage_controller.consistency_check() -def test_sharding_service_debug_apis(neon_env_builder: NeonEnvBuilder): +def test_storage_controller_debug_apis(neon_env_builder: NeonEnvBuilder): """ Verify that occasional-use debug APIs work as expected. This is a lightweight test that just hits the endpoints to check that they don't bitrot. @@ -594,7 +594,7 @@ def test_sharding_service_debug_apis(neon_env_builder: NeonEnvBuilder): env.storage_controller.consistency_check() -def test_sharding_service_s3_time_travel_recovery( +def test_storage_controller_s3_time_travel_recovery( neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, ): @@ -704,7 +704,7 @@ def test_sharding_service_s3_time_travel_recovery( env.storage_controller.consistency_check() -def test_sharding_service_auth(neon_env_builder: NeonEnvBuilder): +def test_storage_controller_auth(neon_env_builder: NeonEnvBuilder): neon_env_builder.auth_enabled = True env = neon_env_builder.init_start() svc = env.storage_controller @@ -773,7 +773,7 @@ def test_sharding_service_auth(neon_env_builder: NeonEnvBuilder): ) -def test_sharding_service_tenant_conf(neon_env_builder: NeonEnvBuilder): +def test_storage_controller_tenant_conf(neon_env_builder: NeonEnvBuilder): """ Validate the pageserver-compatible API endpoints for setting and getting tenant conf, without supplying the whole LocationConf. @@ -876,7 +876,7 @@ def build_node_to_tenants_map(env: NeonEnv) -> dict[int, list[TenantId]]: PageserverFailpoint(pageserver_id=1, failpoint="get-utilization-http-handler"), ], ) -def test_sharding_service_heartbeats( +def test_storage_controller_heartbeats( neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, failure: Failure ): neon_env_builder.num_pageservers = 2 @@ -986,7 +986,7 @@ def test_sharding_service_heartbeats( wait_until(10, 1, storage_controller_consistent) -def test_sharding_service_re_attach(neon_env_builder: NeonEnvBuilder): +def test_storage_controller_re_attach(neon_env_builder: NeonEnvBuilder): """ Exercise the behavior of the /re-attach endpoint on pageserver startup when pageservers have a mixture of attached and secondary locations @@ -1187,7 +1187,14 @@ def test_storcon_cli(neon_env_builder: NeonEnvBuilder): storcon_cli(["node-configure", "--node-id", "1", "--scheduling", "pause"]) assert "Pause" in storcon_cli(["nodes"])[3] - # Make a node offline + # We will simulate a node death and then marking it offline + env.pageservers[0].stop(immediate=True) + # Sleep to make it unlikely that the controller's heartbeater will race handling + # a /utilization response internally, such that it marks the node back online. IRL + # there would always be a longer delay than this before a node failing and a human + # intervening. + time.sleep(2) + storcon_cli(["node-configure", "--node-id", "1", "--availability", "offline"]) assert "Offline" in storcon_cli(["nodes"])[3] @@ -1196,7 +1203,10 @@ def test_storcon_cli(neon_env_builder: NeonEnvBuilder): assert len(tenant_lines) == 5 assert str(env.initial_tenant) in tenant_lines[3] - env.storage_controller.allowed_errors.append(".*Scheduling is disabled by policy.*") + # Setting scheduling policies intentionally result in warnings, they're for rare use. + env.storage_controller.allowed_errors.extend( + [".*Skipping reconcile for policy.*", ".*Scheduling is disabled by policy.*"] + ) # Describe a tenant tenant_lines = storcon_cli(["tenant-describe", "--tenant-id", str(env.initial_tenant)])