diff --git a/libs/safekeeper_api/src/models.rs b/libs/safekeeper_api/src/models.rs index 33ff636a79..20f11edae7 100644 --- a/libs/safekeeper_api/src/models.rs +++ b/libs/safekeeper_api/src/models.rs @@ -227,6 +227,8 @@ pub struct TimelineDeleteResult { pub dir_existed: bool, } +pub type TenantDeleteResult = std::collections::HashMap; + fn lsn_invalid() -> Lsn { Lsn::INVALID } diff --git a/safekeeper/client/src/mgmt_api.rs b/safekeeper/client/src/mgmt_api.rs index afef5e792e..5849df0343 100644 --- a/safekeeper/client/src/mgmt_api.rs +++ b/safekeeper/client/src/mgmt_api.rs @@ -115,13 +115,17 @@ impl Client { "{}/v1/tenant/{}/timeline/{}", self.mgmt_api_endpoint, tenant_id, timeline_id ); - let resp = self.request(Method::DELETE, &uri, ()).await?; + let resp = self + .request_maybe_body(Method::DELETE, &uri, None::<()>) + .await?; resp.json().await.map_err(Error::ReceiveBody) } - pub async fn delete_tenant(&self, tenant_id: TenantId) -> Result { + pub async fn delete_tenant(&self, tenant_id: TenantId) -> Result { let uri = format!("{}/v1/tenant/{}", self.mgmt_api_endpoint, tenant_id); - let resp = self.request(Method::DELETE, &uri, ()).await?; + let resp = self + .request_maybe_body(Method::DELETE, &uri, None::<()>) + .await?; resp.json().await.map_err(Error::ReceiveBody) } @@ -197,6 +201,16 @@ impl Client { method: Method, uri: U, body: B, + ) -> Result { + self.request_maybe_body(method, uri, Some(body)).await + } + + /// Send the request and check that the status code is good, with an optional body. + async fn request_maybe_body( + &self, + method: Method, + uri: U, + body: Option, ) -> Result { let res = self.request_noerror(method, uri, body).await?; let response = res.error_from_body().await?; @@ -208,12 +222,15 @@ impl Client { &self, method: Method, uri: U, - body: B, + body: Option, ) -> Result { let mut req = self.client.request(method, uri); if let Some(value) = &self.authorization_header { req = req.header(reqwest::header::AUTHORIZATION, value.get_contents()) } - req.json(&body).send().await.map_err(Error::ReceiveBody) + if let Some(body) = body { + req = req.json(&body); + } + req.send().await.map_err(Error::ReceiveBody) } } diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index b264fe8a1c..8395c88171 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -16,9 +16,9 @@ use http_utils::{RequestExt, RouterBuilder}; use hyper::{Body, Request, Response, StatusCode}; use postgres_ffi::WAL_SEGMENT_SIZE; use safekeeper_api::models::{ - AcceptorStateStatus, PullTimelineRequest, SafekeeperStatus, SkTimelineInfo, TermSwitchApiEntry, - TimelineCopyRequest, TimelineCreateRequest, TimelineDeleteResult, TimelineStatus, - TimelineTermBumpRequest, + AcceptorStateStatus, PullTimelineRequest, SafekeeperStatus, SkTimelineInfo, TenantDeleteResult, + TermSwitchApiEntry, TimelineCopyRequest, TimelineCreateRequest, TimelineDeleteResult, + TimelineStatus, TimelineTermBumpRequest, }; use safekeeper_api::{ServerInfo, membership, models}; use storage_broker::proto::{SafekeeperTimelineInfo, TenantTimelineId as ProtoTenantTimelineId}; @@ -83,13 +83,11 @@ async fn tenant_delete_handler(mut request: Request) -> Result>(), - ) + let response_body: TenantDeleteResult = delete_info + .iter() + .map(|(ttid, resp)| (format!("{}", ttid.timeline_id), *resp)) + .collect::>(); + json_response(StatusCode::OK, response_body) } async fn timeline_create_handler(mut request: Request) -> Result, ApiError> { diff --git a/storage_controller/src/heartbeater.rs b/storage_controller/src/heartbeater.rs index 732c4ea443..fe916aa36a 100644 --- a/storage_controller/src/heartbeater.rs +++ b/storage_controller/src/heartbeater.rs @@ -12,6 +12,7 @@ use safekeeper_api::models::SafekeeperUtilization; use safekeeper_client::mgmt_api; use thiserror::Error; use tokio_util::sync::CancellationToken; +use tracing::Instrument; use utils::id::NodeId; use utils::logging::SecretString; @@ -227,6 +228,7 @@ impl HeartBeat for HeartbeaterTask Some((*node_id, status)) } + .instrument(tracing::info_span!("heartbeat_ps", %node_id)) }); } @@ -369,6 +371,7 @@ impl HeartBeat for HeartbeaterTask, ) -> DatabaseResult> { use crate::schema::safekeeper_timeline_pending_ops::dsl; - const FILTER_VAL_1: i64 = 1; - const FILTER_VAL_2: i64 = 2; - let filter_opt = filter_for_sk.map(|id| id.0 as i64); let timeline_from_db = self .with_measured_conn(DatabaseOperation::ListTimelineReconcile, move |conn| { Box::pin(async move { let from_db: Vec = - dsl::safekeeper_timeline_pending_ops - .filter( - dsl::sk_id - .eq(filter_opt.unwrap_or(FILTER_VAL_1)) - .and(dsl::sk_id.eq(filter_opt.unwrap_or(FILTER_VAL_2))), - ) - .load(conn) - .await?; + dsl::safekeeper_timeline_pending_ops.load(conn).await?; Ok(from_db) }) }) diff --git a/storage_controller/src/safekeeper_client.rs b/storage_controller/src/safekeeper_client.rs index 98e3f74071..988159af4a 100644 --- a/storage_controller/src/safekeeper_client.rs +++ b/storage_controller/src/safekeeper_client.rs @@ -101,7 +101,7 @@ impl SafekeeperClient { pub(crate) async fn delete_tenant( &self, tenant_id: TenantId, - ) -> Result { + ) -> Result { measured_request!( "delete_tenant", crate::metrics::Method::Delete, diff --git a/storage_controller/src/service/safekeeper_reconciler.rs b/storage_controller/src/service/safekeeper_reconciler.rs index 8e752a8ff1..76e3162617 100644 --- a/storage_controller/src/service/safekeeper_reconciler.rs +++ b/storage_controller/src/service/safekeeper_reconciler.rs @@ -35,6 +35,10 @@ impl SafekeeperReconcilers { service: &Arc, reqs: Vec, ) { + tracing::info!( + "Scheduling {} pending safekeeper ops loaded from db", + reqs.len() + ); for req in reqs { self.schedule_request(service, req); } @@ -74,7 +78,7 @@ pub(crate) async fn load_schedule_requests( service: &Arc, safekeepers: &HashMap, ) -> anyhow::Result> { - let pending_ops = service.persistence.list_pending_ops(None).await?; + let pending_ops = service.persistence.list_pending_ops().await?; let mut res = Vec::with_capacity(pending_ops.len()); for op_persist in pending_ops { let node_id = NodeId(op_persist.sk_id as u64); @@ -232,12 +236,14 @@ impl SafekeeperReconciler { let kind = req.kind; let tenant_id = req.tenant_id; let timeline_id = req.timeline_id; + let node_id = req.safekeeper.skp.id; self.reconcile_one(req, req_cancel) .instrument(tracing::info_span!( "reconcile_one", ?kind, %tenant_id, - ?timeline_id + ?timeline_id, + %node_id, )) .await; } diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index b9344f2fb4..097c187699 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -4073,6 +4073,101 @@ def test_storage_controller_location_conf_equivalence(neon_env_builder: NeonEnvB assert reconciles_after_restart == 0 +@run_only_on_default_postgres("PG version is not interesting here") +@pytest.mark.parametrize("restart_storcon", [True, False]) +def test_storcon_create_delete_sk_down(neon_env_builder: NeonEnvBuilder, restart_storcon: bool): + """ + Test that the storcon can create and delete tenants and timelines with a safekeeper being down. + - restart_storcon: tests whether the pending ops are persisted. + if we don't restart, we test that we don't require it to come from the db. + """ + + neon_env_builder.num_safekeepers = 3 + neon_env_builder.storage_controller_config = { + "timelines_onto_safekeepers": True, + } + env = neon_env_builder.init_start() + + env.safekeepers[0].stop() + + # Wait for heartbeater to pick up that the safekeeper is gone + # This isn't really neccessary + def logged_offline(): + env.storage_controller.assert_log_contains( + "Heartbeat round complete for 3 safekeepers, 1 offline" + ) + + wait_until(logged_offline) + + tenant_id = TenantId.generate() + timeline_id = TimelineId.generate() + env.create_tenant(tenant_id, timeline_id) + + env.safekeepers[1].assert_log_contains(f"creating new timeline {tenant_id}/{timeline_id}") + env.safekeepers[2].assert_log_contains(f"creating new timeline {tenant_id}/{timeline_id}") + + env.storage_controller.allowed_errors.extend( + [ + ".*Call to safekeeper.* management API still failed after.*", + ".*reconcile_one.*tenant_id={tenant_id}.*Call to safekeeper.* management API still failed after.*", + ] + ) + + if restart_storcon: + # Restart the storcon to check that we persist operations + env.storage_controller.stop() + env.storage_controller.start() + + config_lines = [ + "neon.safekeeper_proto_version = 3", + ] + with env.endpoints.create("main", tenant_id=tenant_id, config_lines=config_lines) as ep: + # endpoint should start. + ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3]) + ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)") + + env.storage_controller.assert_log_contains("writing pending op for sk id 1") + env.safekeepers[0].start() + + # ensure that we applied the operation also for the safekeeper we just brought down + def logged_contains_on_sk(): + env.safekeepers[0].assert_log_contains( + f"pulling timeline {tenant_id}/{timeline_id} from safekeeper" + ) + + wait_until(logged_contains_on_sk) + + env.safekeepers[1].stop() + + env.storage_controller.pageserver_api().tenant_delete(tenant_id) + + # ensure the safekeeper deleted the timeline + def timeline_deleted_on_active_sks(): + env.safekeepers[0].assert_log_contains( + f"deleting timeline {tenant_id}/{timeline_id} from disk" + ) + env.safekeepers[2].assert_log_contains( + f"deleting timeline {tenant_id}/{timeline_id} from disk" + ) + + wait_until(timeline_deleted_on_active_sks) + + if restart_storcon: + # Restart the storcon to check that we persist operations + env.storage_controller.stop() + env.storage_controller.start() + + env.safekeepers[1].start() + + # ensure that there is log msgs for the third safekeeper too + def timeline_deleted_on_sk(): + env.safekeepers[1].assert_log_contains( + f"deleting timeline {tenant_id}/{timeline_id} from disk" + ) + + wait_until(timeline_deleted_on_sk) + + @pytest.mark.parametrize("wrong_az", [True, False]) def test_storage_controller_graceful_migration(neon_env_builder: NeonEnvBuilder, wrong_az: bool): """