From a917952b308a06cbc66b92a8c62c55975fea7c0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Fri, 4 Apr 2025 02:17:40 +0200 Subject: [PATCH] Add test_storcon_create_delete_sk_down and make it work (#11400) Adds a test `test_storcon_create_delete_sk_down` which tests the reconciler and pending op persistence if faced with a temporary safekeeper downtime during timeline creation or deletion. This is in contrast to `test_explicit_timeline_creation_storcon`, which tests the happy path. We also do some fixes: * timeline and tenant deletion http requests didn't expect a body, but `()` sent one. * we got the tenant deletion http request's return type wrong: it's supposed to be a hash map * we add some logging to improve observability * We fix `list_pending_ops` which had broken code meant to make it possible to restrict oneself to a single pageserver. But diesel doesn't support that sadly, or at least I couldn't figure out a way to make it work. We don't need that functionality, so remove it. * We add an info span to the heartbeater futures with the node id, so that there is no context-free msgs like "Backoff: waiting 1.1 seconds before processing with the task" in the storcon logs. we could also add the full base url of the node but don't do it as most other log lines contain that information already, and if we do duplication it should at least not be verbose. One can always find out the base url from the node id. Successor of #11261 Part of #9011 --- libs/safekeeper_api/src/models.rs | 2 + safekeeper/client/src/mgmt_api.rs | 27 +++++- safekeeper/src/http/routes.rs | 18 ++-- storage_controller/src/heartbeater.rs | 3 + storage_controller/src/persistence.rs | 13 +-- storage_controller/src/safekeeper_client.rs | 2 +- .../src/service/safekeeper_reconciler.rs | 10 +- .../regress/test_storage_controller.py | 95 +++++++++++++++++++ 8 files changed, 140 insertions(+), 30 deletions(-) diff --git a/libs/safekeeper_api/src/models.rs b/libs/safekeeper_api/src/models.rs index 33ff636a79..20f11edae7 100644 --- a/libs/safekeeper_api/src/models.rs +++ b/libs/safekeeper_api/src/models.rs @@ -227,6 +227,8 @@ pub struct TimelineDeleteResult { pub dir_existed: bool, } +pub type TenantDeleteResult = std::collections::HashMap; + fn lsn_invalid() -> Lsn { Lsn::INVALID } diff --git a/safekeeper/client/src/mgmt_api.rs b/safekeeper/client/src/mgmt_api.rs index afef5e792e..5849df0343 100644 --- a/safekeeper/client/src/mgmt_api.rs +++ b/safekeeper/client/src/mgmt_api.rs @@ -115,13 +115,17 @@ impl Client { "{}/v1/tenant/{}/timeline/{}", self.mgmt_api_endpoint, tenant_id, timeline_id ); - let resp = self.request(Method::DELETE, &uri, ()).await?; + let resp = self + .request_maybe_body(Method::DELETE, &uri, None::<()>) + .await?; resp.json().await.map_err(Error::ReceiveBody) } - pub async fn delete_tenant(&self, tenant_id: TenantId) -> Result { + pub async fn delete_tenant(&self, tenant_id: TenantId) -> Result { let uri = format!("{}/v1/tenant/{}", self.mgmt_api_endpoint, tenant_id); - let resp = self.request(Method::DELETE, &uri, ()).await?; + let resp = self + .request_maybe_body(Method::DELETE, &uri, None::<()>) + .await?; resp.json().await.map_err(Error::ReceiveBody) } @@ -197,6 +201,16 @@ impl Client { method: Method, uri: U, body: B, + ) -> Result { + self.request_maybe_body(method, uri, Some(body)).await + } + + /// Send the request and check that the status code is good, with an optional body. + async fn request_maybe_body( + &self, + method: Method, + uri: U, + body: Option, ) -> Result { let res = self.request_noerror(method, uri, body).await?; let response = res.error_from_body().await?; @@ -208,12 +222,15 @@ impl Client { &self, method: Method, uri: U, - body: B, + body: Option, ) -> Result { let mut req = self.client.request(method, uri); if let Some(value) = &self.authorization_header { req = req.header(reqwest::header::AUTHORIZATION, value.get_contents()) } - req.json(&body).send().await.map_err(Error::ReceiveBody) + if let Some(body) = body { + req = req.json(&body); + } + req.send().await.map_err(Error::ReceiveBody) } } diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index b264fe8a1c..8395c88171 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -16,9 +16,9 @@ use http_utils::{RequestExt, RouterBuilder}; use hyper::{Body, Request, Response, StatusCode}; use postgres_ffi::WAL_SEGMENT_SIZE; use safekeeper_api::models::{ - AcceptorStateStatus, PullTimelineRequest, SafekeeperStatus, SkTimelineInfo, TermSwitchApiEntry, - TimelineCopyRequest, TimelineCreateRequest, TimelineDeleteResult, TimelineStatus, - TimelineTermBumpRequest, + AcceptorStateStatus, PullTimelineRequest, SafekeeperStatus, SkTimelineInfo, TenantDeleteResult, + TermSwitchApiEntry, TimelineCopyRequest, TimelineCreateRequest, TimelineDeleteResult, + TimelineStatus, TimelineTermBumpRequest, }; use safekeeper_api::{ServerInfo, membership, models}; use storage_broker::proto::{SafekeeperTimelineInfo, TenantTimelineId as ProtoTenantTimelineId}; @@ -83,13 +83,11 @@ async fn tenant_delete_handler(mut request: Request) -> Result>(), - ) + let response_body: TenantDeleteResult = delete_info + .iter() + .map(|(ttid, resp)| (format!("{}", ttid.timeline_id), *resp)) + .collect::>(); + json_response(StatusCode::OK, response_body) } async fn timeline_create_handler(mut request: Request) -> Result, ApiError> { diff --git a/storage_controller/src/heartbeater.rs b/storage_controller/src/heartbeater.rs index 732c4ea443..fe916aa36a 100644 --- a/storage_controller/src/heartbeater.rs +++ b/storage_controller/src/heartbeater.rs @@ -12,6 +12,7 @@ use safekeeper_api::models::SafekeeperUtilization; use safekeeper_client::mgmt_api; use thiserror::Error; use tokio_util::sync::CancellationToken; +use tracing::Instrument; use utils::id::NodeId; use utils::logging::SecretString; @@ -227,6 +228,7 @@ impl HeartBeat for HeartbeaterTask Some((*node_id, status)) } + .instrument(tracing::info_span!("heartbeat_ps", %node_id)) }); } @@ -369,6 +371,7 @@ impl HeartBeat for HeartbeaterTask, ) -> DatabaseResult> { use crate::schema::safekeeper_timeline_pending_ops::dsl; - const FILTER_VAL_1: i64 = 1; - const FILTER_VAL_2: i64 = 2; - let filter_opt = filter_for_sk.map(|id| id.0 as i64); let timeline_from_db = self .with_measured_conn(DatabaseOperation::ListTimelineReconcile, move |conn| { Box::pin(async move { let from_db: Vec = - dsl::safekeeper_timeline_pending_ops - .filter( - dsl::sk_id - .eq(filter_opt.unwrap_or(FILTER_VAL_1)) - .and(dsl::sk_id.eq(filter_opt.unwrap_or(FILTER_VAL_2))), - ) - .load(conn) - .await?; + dsl::safekeeper_timeline_pending_ops.load(conn).await?; Ok(from_db) }) }) diff --git a/storage_controller/src/safekeeper_client.rs b/storage_controller/src/safekeeper_client.rs index 98e3f74071..988159af4a 100644 --- a/storage_controller/src/safekeeper_client.rs +++ b/storage_controller/src/safekeeper_client.rs @@ -101,7 +101,7 @@ impl SafekeeperClient { pub(crate) async fn delete_tenant( &self, tenant_id: TenantId, - ) -> Result { + ) -> Result { measured_request!( "delete_tenant", crate::metrics::Method::Delete, diff --git a/storage_controller/src/service/safekeeper_reconciler.rs b/storage_controller/src/service/safekeeper_reconciler.rs index 8e752a8ff1..76e3162617 100644 --- a/storage_controller/src/service/safekeeper_reconciler.rs +++ b/storage_controller/src/service/safekeeper_reconciler.rs @@ -35,6 +35,10 @@ impl SafekeeperReconcilers { service: &Arc, reqs: Vec, ) { + tracing::info!( + "Scheduling {} pending safekeeper ops loaded from db", + reqs.len() + ); for req in reqs { self.schedule_request(service, req); } @@ -74,7 +78,7 @@ pub(crate) async fn load_schedule_requests( service: &Arc, safekeepers: &HashMap, ) -> anyhow::Result> { - let pending_ops = service.persistence.list_pending_ops(None).await?; + let pending_ops = service.persistence.list_pending_ops().await?; let mut res = Vec::with_capacity(pending_ops.len()); for op_persist in pending_ops { let node_id = NodeId(op_persist.sk_id as u64); @@ -232,12 +236,14 @@ impl SafekeeperReconciler { let kind = req.kind; let tenant_id = req.tenant_id; let timeline_id = req.timeline_id; + let node_id = req.safekeeper.skp.id; self.reconcile_one(req, req_cancel) .instrument(tracing::info_span!( "reconcile_one", ?kind, %tenant_id, - ?timeline_id + ?timeline_id, + %node_id, )) .await; } diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index b9344f2fb4..097c187699 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -4073,6 +4073,101 @@ def test_storage_controller_location_conf_equivalence(neon_env_builder: NeonEnvB assert reconciles_after_restart == 0 +@run_only_on_default_postgres("PG version is not interesting here") +@pytest.mark.parametrize("restart_storcon", [True, False]) +def test_storcon_create_delete_sk_down(neon_env_builder: NeonEnvBuilder, restart_storcon: bool): + """ + Test that the storcon can create and delete tenants and timelines with a safekeeper being down. + - restart_storcon: tests whether the pending ops are persisted. + if we don't restart, we test that we don't require it to come from the db. + """ + + neon_env_builder.num_safekeepers = 3 + neon_env_builder.storage_controller_config = { + "timelines_onto_safekeepers": True, + } + env = neon_env_builder.init_start() + + env.safekeepers[0].stop() + + # Wait for heartbeater to pick up that the safekeeper is gone + # This isn't really neccessary + def logged_offline(): + env.storage_controller.assert_log_contains( + "Heartbeat round complete for 3 safekeepers, 1 offline" + ) + + wait_until(logged_offline) + + tenant_id = TenantId.generate() + timeline_id = TimelineId.generate() + env.create_tenant(tenant_id, timeline_id) + + env.safekeepers[1].assert_log_contains(f"creating new timeline {tenant_id}/{timeline_id}") + env.safekeepers[2].assert_log_contains(f"creating new timeline {tenant_id}/{timeline_id}") + + env.storage_controller.allowed_errors.extend( + [ + ".*Call to safekeeper.* management API still failed after.*", + ".*reconcile_one.*tenant_id={tenant_id}.*Call to safekeeper.* management API still failed after.*", + ] + ) + + if restart_storcon: + # Restart the storcon to check that we persist operations + env.storage_controller.stop() + env.storage_controller.start() + + config_lines = [ + "neon.safekeeper_proto_version = 3", + ] + with env.endpoints.create("main", tenant_id=tenant_id, config_lines=config_lines) as ep: + # endpoint should start. + ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3]) + ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)") + + env.storage_controller.assert_log_contains("writing pending op for sk id 1") + env.safekeepers[0].start() + + # ensure that we applied the operation also for the safekeeper we just brought down + def logged_contains_on_sk(): + env.safekeepers[0].assert_log_contains( + f"pulling timeline {tenant_id}/{timeline_id} from safekeeper" + ) + + wait_until(logged_contains_on_sk) + + env.safekeepers[1].stop() + + env.storage_controller.pageserver_api().tenant_delete(tenant_id) + + # ensure the safekeeper deleted the timeline + def timeline_deleted_on_active_sks(): + env.safekeepers[0].assert_log_contains( + f"deleting timeline {tenant_id}/{timeline_id} from disk" + ) + env.safekeepers[2].assert_log_contains( + f"deleting timeline {tenant_id}/{timeline_id} from disk" + ) + + wait_until(timeline_deleted_on_active_sks) + + if restart_storcon: + # Restart the storcon to check that we persist operations + env.storage_controller.stop() + env.storage_controller.start() + + env.safekeepers[1].start() + + # ensure that there is log msgs for the third safekeeper too + def timeline_deleted_on_sk(): + env.safekeepers[1].assert_log_contains( + f"deleting timeline {tenant_id}/{timeline_id} from disk" + ) + + wait_until(timeline_deleted_on_sk) + + @pytest.mark.parametrize("wrong_az", [True, False]) def test_storage_controller_graceful_migration(neon_env_builder: NeonEnvBuilder, wrong_az: bool): """