Add test_storcon_create_delete_sk_down and make it work (#11400)

Adds a test `test_storcon_create_delete_sk_down` which tests the
reconciler and pending op persistence if faced with a temporary
safekeeper downtime during timeline creation or deletion. This is in
contrast to `test_explicit_timeline_creation_storcon`, which tests the
happy path.

We also do some fixes:

* timeline and tenant deletion http requests didn't expect a body, but
`()` sent one.
* we got the tenant deletion http request's return type wrong: it's
supposed to be a hash map
* we add some logging to improve observability
* We fix `list_pending_ops` which had broken code meant to make it
possible to restrict oneself to a single pageserver. But diesel doesn't
support that sadly, or at least I couldn't figure out a way to make it
work. We don't need that functionality, so remove it.
* We add an info span to the heartbeater futures with the node id, so
that there is no context-free msgs like "Backoff: waiting 1.1 seconds
before processing with the task" in the storcon logs. we could also add
the full base url of the node but don't do it as most other log lines
contain that information already, and if we do duplication it should at
least not be verbose. One can always find out the base url from the node
id.

Successor of #11261
Part of #9011
This commit is contained in:
Arpad Müller
2025-04-04 02:17:40 +02:00
committed by GitHub
parent e581b670f4
commit a917952b30
8 changed files with 140 additions and 30 deletions

View File

@@ -227,6 +227,8 @@ pub struct TimelineDeleteResult {
pub dir_existed: bool, pub dir_existed: bool,
} }
pub type TenantDeleteResult = std::collections::HashMap<String, TimelineDeleteResult>;
fn lsn_invalid() -> Lsn { fn lsn_invalid() -> Lsn {
Lsn::INVALID Lsn::INVALID
} }

View File

@@ -115,13 +115,17 @@ impl Client {
"{}/v1/tenant/{}/timeline/{}", "{}/v1/tenant/{}/timeline/{}",
self.mgmt_api_endpoint, tenant_id, timeline_id self.mgmt_api_endpoint, tenant_id, timeline_id
); );
let resp = self.request(Method::DELETE, &uri, ()).await?; let resp = self
.request_maybe_body(Method::DELETE, &uri, None::<()>)
.await?;
resp.json().await.map_err(Error::ReceiveBody) resp.json().await.map_err(Error::ReceiveBody)
} }
pub async fn delete_tenant(&self, tenant_id: TenantId) -> Result<models::TimelineDeleteResult> { pub async fn delete_tenant(&self, tenant_id: TenantId) -> Result<models::TenantDeleteResult> {
let uri = format!("{}/v1/tenant/{}", self.mgmt_api_endpoint, tenant_id); let uri = format!("{}/v1/tenant/{}", self.mgmt_api_endpoint, tenant_id);
let resp = self.request(Method::DELETE, &uri, ()).await?; let resp = self
.request_maybe_body(Method::DELETE, &uri, None::<()>)
.await?;
resp.json().await.map_err(Error::ReceiveBody) resp.json().await.map_err(Error::ReceiveBody)
} }
@@ -197,6 +201,16 @@ impl Client {
method: Method, method: Method,
uri: U, uri: U,
body: B, body: B,
) -> Result<reqwest::Response> {
self.request_maybe_body(method, uri, Some(body)).await
}
/// Send the request and check that the status code is good, with an optional body.
async fn request_maybe_body<B: serde::Serialize, U: reqwest::IntoUrl>(
&self,
method: Method,
uri: U,
body: Option<B>,
) -> Result<reqwest::Response> { ) -> Result<reqwest::Response> {
let res = self.request_noerror(method, uri, body).await?; let res = self.request_noerror(method, uri, body).await?;
let response = res.error_from_body().await?; let response = res.error_from_body().await?;
@@ -208,12 +222,15 @@ impl Client {
&self, &self,
method: Method, method: Method,
uri: U, uri: U,
body: B, body: Option<B>,
) -> Result<reqwest::Response> { ) -> Result<reqwest::Response> {
let mut req = self.client.request(method, uri); let mut req = self.client.request(method, uri);
if let Some(value) = &self.authorization_header { if let Some(value) = &self.authorization_header {
req = req.header(reqwest::header::AUTHORIZATION, value.get_contents()) req = req.header(reqwest::header::AUTHORIZATION, value.get_contents())
} }
req.json(&body).send().await.map_err(Error::ReceiveBody) if let Some(body) = body {
req = req.json(&body);
}
req.send().await.map_err(Error::ReceiveBody)
} }
} }

View File

@@ -16,9 +16,9 @@ use http_utils::{RequestExt, RouterBuilder};
use hyper::{Body, Request, Response, StatusCode}; use hyper::{Body, Request, Response, StatusCode};
use postgres_ffi::WAL_SEGMENT_SIZE; use postgres_ffi::WAL_SEGMENT_SIZE;
use safekeeper_api::models::{ use safekeeper_api::models::{
AcceptorStateStatus, PullTimelineRequest, SafekeeperStatus, SkTimelineInfo, TermSwitchApiEntry, AcceptorStateStatus, PullTimelineRequest, SafekeeperStatus, SkTimelineInfo, TenantDeleteResult,
TimelineCopyRequest, TimelineCreateRequest, TimelineDeleteResult, TimelineStatus, TermSwitchApiEntry, TimelineCopyRequest, TimelineCreateRequest, TimelineDeleteResult,
TimelineTermBumpRequest, TimelineStatus, TimelineTermBumpRequest,
}; };
use safekeeper_api::{ServerInfo, membership, models}; use safekeeper_api::{ServerInfo, membership, models};
use storage_broker::proto::{SafekeeperTimelineInfo, TenantTimelineId as ProtoTenantTimelineId}; use storage_broker::proto::{SafekeeperTimelineInfo, TenantTimelineId as ProtoTenantTimelineId};
@@ -83,13 +83,11 @@ async fn tenant_delete_handler(mut request: Request<Body>) -> Result<Response<Bo
.delete_all_for_tenant(&tenant_id, action) .delete_all_for_tenant(&tenant_id, action)
.await .await
.map_err(ApiError::InternalServerError)?; .map_err(ApiError::InternalServerError)?;
json_response( let response_body: TenantDeleteResult = delete_info
StatusCode::OK, .iter()
delete_info .map(|(ttid, resp)| (format!("{}", ttid.timeline_id), *resp))
.iter() .collect::<HashMap<String, TimelineDeleteResult>>();
.map(|(ttid, resp)| (format!("{}", ttid.timeline_id), *resp)) json_response(StatusCode::OK, response_body)
.collect::<HashMap<String, TimelineDeleteResult>>(),
)
} }
async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> { async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {

View File

@@ -12,6 +12,7 @@ use safekeeper_api::models::SafekeeperUtilization;
use safekeeper_client::mgmt_api; use safekeeper_client::mgmt_api;
use thiserror::Error; use thiserror::Error;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::Instrument;
use utils::id::NodeId; use utils::id::NodeId;
use utils::logging::SecretString; use utils::logging::SecretString;
@@ -227,6 +228,7 @@ impl HeartBeat<Node, PageserverState> for HeartbeaterTask<Node, PageserverState>
Some((*node_id, status)) Some((*node_id, status))
} }
.instrument(tracing::info_span!("heartbeat_ps", %node_id))
}); });
} }
@@ -369,6 +371,7 @@ impl HeartBeat<Safekeeper, SafekeeperState> for HeartbeaterTask<Safekeeper, Safe
Some((*node_id, status)) Some((*node_id, status))
} }
.instrument(tracing::info_span!("heartbeat_sk", %node_id))
}); });
} }

View File

@@ -1524,25 +1524,14 @@ impl Persistence {
/// Load pending operations from db. /// Load pending operations from db.
pub(crate) async fn list_pending_ops( pub(crate) async fn list_pending_ops(
&self, &self,
filter_for_sk: Option<NodeId>,
) -> DatabaseResult<Vec<TimelinePendingOpPersistence>> { ) -> DatabaseResult<Vec<TimelinePendingOpPersistence>> {
use crate::schema::safekeeper_timeline_pending_ops::dsl; use crate::schema::safekeeper_timeline_pending_ops::dsl;
const FILTER_VAL_1: i64 = 1;
const FILTER_VAL_2: i64 = 2;
let filter_opt = filter_for_sk.map(|id| id.0 as i64);
let timeline_from_db = self let timeline_from_db = self
.with_measured_conn(DatabaseOperation::ListTimelineReconcile, move |conn| { .with_measured_conn(DatabaseOperation::ListTimelineReconcile, move |conn| {
Box::pin(async move { Box::pin(async move {
let from_db: Vec<TimelinePendingOpPersistence> = let from_db: Vec<TimelinePendingOpPersistence> =
dsl::safekeeper_timeline_pending_ops dsl::safekeeper_timeline_pending_ops.load(conn).await?;
.filter(
dsl::sk_id
.eq(filter_opt.unwrap_or(FILTER_VAL_1))
.and(dsl::sk_id.eq(filter_opt.unwrap_or(FILTER_VAL_2))),
)
.load(conn)
.await?;
Ok(from_db) Ok(from_db)
}) })
}) })

View File

@@ -101,7 +101,7 @@ impl SafekeeperClient {
pub(crate) async fn delete_tenant( pub(crate) async fn delete_tenant(
&self, &self,
tenant_id: TenantId, tenant_id: TenantId,
) -> Result<models::TimelineDeleteResult> { ) -> Result<models::TenantDeleteResult> {
measured_request!( measured_request!(
"delete_tenant", "delete_tenant",
crate::metrics::Method::Delete, crate::metrics::Method::Delete,

View File

@@ -35,6 +35,10 @@ impl SafekeeperReconcilers {
service: &Arc<Service>, service: &Arc<Service>,
reqs: Vec<ScheduleRequest>, reqs: Vec<ScheduleRequest>,
) { ) {
tracing::info!(
"Scheduling {} pending safekeeper ops loaded from db",
reqs.len()
);
for req in reqs { for req in reqs {
self.schedule_request(service, req); self.schedule_request(service, req);
} }
@@ -74,7 +78,7 @@ pub(crate) async fn load_schedule_requests(
service: &Arc<Service>, service: &Arc<Service>,
safekeepers: &HashMap<NodeId, Safekeeper>, safekeepers: &HashMap<NodeId, Safekeeper>,
) -> anyhow::Result<Vec<ScheduleRequest>> { ) -> anyhow::Result<Vec<ScheduleRequest>> {
let pending_ops = service.persistence.list_pending_ops(None).await?; let pending_ops = service.persistence.list_pending_ops().await?;
let mut res = Vec::with_capacity(pending_ops.len()); let mut res = Vec::with_capacity(pending_ops.len());
for op_persist in pending_ops { for op_persist in pending_ops {
let node_id = NodeId(op_persist.sk_id as u64); let node_id = NodeId(op_persist.sk_id as u64);
@@ -232,12 +236,14 @@ impl SafekeeperReconciler {
let kind = req.kind; let kind = req.kind;
let tenant_id = req.tenant_id; let tenant_id = req.tenant_id;
let timeline_id = req.timeline_id; let timeline_id = req.timeline_id;
let node_id = req.safekeeper.skp.id;
self.reconcile_one(req, req_cancel) self.reconcile_one(req, req_cancel)
.instrument(tracing::info_span!( .instrument(tracing::info_span!(
"reconcile_one", "reconcile_one",
?kind, ?kind,
%tenant_id, %tenant_id,
?timeline_id ?timeline_id,
%node_id,
)) ))
.await; .await;
} }

View File

@@ -4073,6 +4073,101 @@ def test_storage_controller_location_conf_equivalence(neon_env_builder: NeonEnvB
assert reconciles_after_restart == 0 assert reconciles_after_restart == 0
@run_only_on_default_postgres("PG version is not interesting here")
@pytest.mark.parametrize("restart_storcon", [True, False])
def test_storcon_create_delete_sk_down(neon_env_builder: NeonEnvBuilder, restart_storcon: bool):
"""
Test that the storcon can create and delete tenants and timelines with a safekeeper being down.
- restart_storcon: tests whether the pending ops are persisted.
if we don't restart, we test that we don't require it to come from the db.
"""
neon_env_builder.num_safekeepers = 3
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": True,
}
env = neon_env_builder.init_start()
env.safekeepers[0].stop()
# Wait for heartbeater to pick up that the safekeeper is gone
# This isn't really neccessary
def logged_offline():
env.storage_controller.assert_log_contains(
"Heartbeat round complete for 3 safekeepers, 1 offline"
)
wait_until(logged_offline)
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
env.create_tenant(tenant_id, timeline_id)
env.safekeepers[1].assert_log_contains(f"creating new timeline {tenant_id}/{timeline_id}")
env.safekeepers[2].assert_log_contains(f"creating new timeline {tenant_id}/{timeline_id}")
env.storage_controller.allowed_errors.extend(
[
".*Call to safekeeper.* management API still failed after.*",
".*reconcile_one.*tenant_id={tenant_id}.*Call to safekeeper.* management API still failed after.*",
]
)
if restart_storcon:
# Restart the storcon to check that we persist operations
env.storage_controller.stop()
env.storage_controller.start()
config_lines = [
"neon.safekeeper_proto_version = 3",
]
with env.endpoints.create("main", tenant_id=tenant_id, config_lines=config_lines) as ep:
# endpoint should start.
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3])
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")
env.storage_controller.assert_log_contains("writing pending op for sk id 1")
env.safekeepers[0].start()
# ensure that we applied the operation also for the safekeeper we just brought down
def logged_contains_on_sk():
env.safekeepers[0].assert_log_contains(
f"pulling timeline {tenant_id}/{timeline_id} from safekeeper"
)
wait_until(logged_contains_on_sk)
env.safekeepers[1].stop()
env.storage_controller.pageserver_api().tenant_delete(tenant_id)
# ensure the safekeeper deleted the timeline
def timeline_deleted_on_active_sks():
env.safekeepers[0].assert_log_contains(
f"deleting timeline {tenant_id}/{timeline_id} from disk"
)
env.safekeepers[2].assert_log_contains(
f"deleting timeline {tenant_id}/{timeline_id} from disk"
)
wait_until(timeline_deleted_on_active_sks)
if restart_storcon:
# Restart the storcon to check that we persist operations
env.storage_controller.stop()
env.storage_controller.start()
env.safekeepers[1].start()
# ensure that there is log msgs for the third safekeeper too
def timeline_deleted_on_sk():
env.safekeepers[1].assert_log_contains(
f"deleting timeline {tenant_id}/{timeline_id} from disk"
)
wait_until(timeline_deleted_on_sk)
@pytest.mark.parametrize("wrong_az", [True, False]) @pytest.mark.parametrize("wrong_az", [True, False])
def test_storage_controller_graceful_migration(neon_env_builder: NeonEnvBuilder, wrong_az: bool): def test_storage_controller_graceful_migration(neon_env_builder: NeonEnvBuilder, wrong_az: bool):
""" """