Compare commits

...

9 Commits

Author SHA1 Message Date
Arpad Müller
fec5530c9e fixes 2025-04-02 20:02:20 +02:00
Arpad Müller
8d5d6b24df wip 2025-04-02 19:40:07 +02:00
Arpad Müller
0a5b9506ae cargo fmt 2025-04-02 16:03:01 +02:00
Arpad Müller
48d712af1b ruff format 2025-04-02 14:59:05 +02:00
Arpad Müller
1a78e9e9cf Add test_storcon_create_delete_sk_down 2025-04-01 20:19:18 +02:00
Arpad Müller
484208b5a8 logging 2025-04-01 20:17:10 +02:00
Arpad Müller
26d636ec14 Fix expected tenant deletion response 2025-04-01 20:16:53 +02:00
Arpad Müller
579069d74b Fix list_pending_ops which was broken 2025-04-01 20:16:53 +02:00
Arpad Müller
a02dcaea6b Don't send a body for deletion requests to safekeepers, () is not enough 2025-04-01 19:31:00 +02:00
7 changed files with 170 additions and 30 deletions

View File

@@ -227,6 +227,8 @@ pub struct TimelineDeleteResult {
pub dir_existed: bool,
}
pub type TenantDeleteResult = std::collections::HashMap<String, TimelineDeleteResult>;
fn lsn_invalid() -> Lsn {
Lsn::INVALID
}

View File

@@ -115,13 +115,17 @@ impl Client {
"{}/v1/tenant/{}/timeline/{}",
self.mgmt_api_endpoint, tenant_id, timeline_id
);
let resp = self.request(Method::DELETE, &uri, ()).await?;
let resp = self
.request_maybe_body(Method::DELETE, &uri, None::<()>)
.await?;
resp.json().await.map_err(Error::ReceiveBody)
}
pub async fn delete_tenant(&self, tenant_id: TenantId) -> Result<models::TimelineDeleteResult> {
pub async fn delete_tenant(&self, tenant_id: TenantId) -> Result<models::TenantDeleteResult> {
let uri = format!("{}/v1/tenant/{}", self.mgmt_api_endpoint, tenant_id);
let resp = self.request(Method::DELETE, &uri, ()).await?;
let resp = self
.request_maybe_body(Method::DELETE, &uri, None::<()>)
.await?;
resp.json().await.map_err(Error::ReceiveBody)
}
@@ -197,6 +201,16 @@ impl Client {
method: Method,
uri: U,
body: B,
) -> Result<reqwest::Response> {
self.request_maybe_body(method, uri, Some(body)).await
}
/// Send the request and check that the status code is good, with an optional body.
async fn request_maybe_body<B: serde::Serialize, U: reqwest::IntoUrl>(
&self,
method: Method,
uri: U,
body: Option<B>,
) -> Result<reqwest::Response> {
let res = self.request_noerror(method, uri, body).await?;
let response = res.error_from_body().await?;
@@ -208,12 +222,15 @@ impl Client {
&self,
method: Method,
uri: U,
body: B,
body: Option<B>,
) -> Result<reqwest::Response> {
let mut req = self.client.request(method, uri);
if let Some(value) = &self.authorization_header {
req = req.header(reqwest::header::AUTHORIZATION, value.get_contents())
}
req.json(&body).send().await.map_err(Error::ReceiveBody)
if let Some(body) = body {
req = req.json(&body);
}
req.send().await.map_err(Error::ReceiveBody)
}
}

View File

@@ -16,9 +16,9 @@ use http_utils::{RequestExt, RouterBuilder};
use hyper::{Body, Request, Response, StatusCode};
use postgres_ffi::WAL_SEGMENT_SIZE;
use safekeeper_api::models::{
AcceptorStateStatus, PullTimelineRequest, SafekeeperStatus, SkTimelineInfo, TermSwitchApiEntry,
TimelineCopyRequest, TimelineCreateRequest, TimelineDeleteResult, TimelineStatus,
TimelineTermBumpRequest,
AcceptorStateStatus, PullTimelineRequest, SafekeeperStatus, SkTimelineInfo, TenantDeleteResult,
TermSwitchApiEntry, TimelineCopyRequest, TimelineCreateRequest, TimelineDeleteResult,
TimelineStatus, TimelineTermBumpRequest,
};
use safekeeper_api::{ServerInfo, membership, models};
use storage_broker::proto::{SafekeeperTimelineInfo, TenantTimelineId as ProtoTenantTimelineId};
@@ -83,13 +83,11 @@ async fn tenant_delete_handler(mut request: Request<Body>) -> Result<Response<Bo
.delete_all_for_tenant(&tenant_id, action)
.await
.map_err(ApiError::InternalServerError)?;
json_response(
StatusCode::OK,
delete_info
.iter()
.map(|(ttid, resp)| (format!("{}", ttid.timeline_id), *resp))
.collect::<HashMap<String, TimelineDeleteResult>>(),
)
let response_body: TenantDeleteResult = delete_info
.iter()
.map(|(ttid, resp)| (format!("{}", ttid.timeline_id), *resp))
.collect::<HashMap<String, TimelineDeleteResult>>();
json_response(StatusCode::OK, response_body)
}
async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {

View File

@@ -1524,25 +1524,14 @@ impl Persistence {
/// Load pending operations from db.
pub(crate) async fn list_pending_ops(
&self,
filter_for_sk: Option<NodeId>,
) -> DatabaseResult<Vec<TimelinePendingOpPersistence>> {
use crate::schema::safekeeper_timeline_pending_ops::dsl;
const FILTER_VAL_1: i64 = 1;
const FILTER_VAL_2: i64 = 2;
let filter_opt = filter_for_sk.map(|id| id.0 as i64);
let timeline_from_db = self
.with_measured_conn(DatabaseOperation::ListTimelineReconcile, move |conn| {
Box::pin(async move {
let from_db: Vec<TimelinePendingOpPersistence> =
dsl::safekeeper_timeline_pending_ops
.filter(
dsl::sk_id
.eq(filter_opt.unwrap_or(FILTER_VAL_1))
.and(dsl::sk_id.eq(filter_opt.unwrap_or(FILTER_VAL_2))),
)
.load(conn)
.await?;
dsl::safekeeper_timeline_pending_ops.load(conn).await?;
Ok(from_db)
})
})

View File

@@ -101,7 +101,7 @@ impl SafekeeperClient {
pub(crate) async fn delete_tenant(
&self,
tenant_id: TenantId,
) -> Result<models::TimelineDeleteResult> {
) -> Result<models::TenantDeleteResult> {
measured_request!(
"delete_tenant",
crate::metrics::Method::Delete,

View File

@@ -35,6 +35,10 @@ impl SafekeeperReconcilers {
service: &Arc<Service>,
reqs: Vec<ScheduleRequest>,
) {
tracing::info!(
"Scheduling {} pending safekeeper ops loaded from db",
reqs.len()
);
for req in reqs {
self.schedule_request(service, req);
}
@@ -74,7 +78,7 @@ pub(crate) async fn load_schedule_requests(
service: &Arc<Service>,
safekeepers: &HashMap<NodeId, Safekeeper>,
) -> anyhow::Result<Vec<ScheduleRequest>> {
let pending_ops = service.persistence.list_pending_ops(None).await?;
let pending_ops = service.persistence.list_pending_ops().await?;
let mut res = Vec::with_capacity(pending_ops.len());
for op_persist in pending_ops {
let node_id = NodeId(op_persist.sk_id as u64);
@@ -232,12 +236,14 @@ impl SafekeeperReconciler {
let kind = req.kind;
let tenant_id = req.tenant_id;
let timeline_id = req.timeline_id;
let node_id = req.safekeeper.skp.id;
self.reconcile_one(req, req_cancel)
.instrument(tracing::info_span!(
"reconcile_one",
?kind,
%tenant_id,
?timeline_id
?timeline_id,
%node_id,
))
.await;
}

View File

@@ -4073,6 +4073,134 @@ def test_storage_controller_location_conf_equivalence(neon_env_builder: NeonEnvB
assert reconciles_after_restart == 0
@pytest.mark.parametrize("restart_storcon", [True, False])
@pytest.mark.parametrize("delete_only_timeline", [True, False])
def test_storcon_create_delete_sk_down(
neon_env_builder: NeonEnvBuilder, restart_storcon: bool, delete_only_timeline: bool
):
"""
Test that the storcon can create and delete tenants and timelines with a safekeeper being down.
"""
neon_env_builder.num_safekeepers = 3
neon_env_builder.storage_controller_config = {
"timelines_onto_safekeepers": True,
}
env = neon_env_builder.init_start()
env.safekeepers[0].stop()
# Wait for heartbeater to pick up that the safekeeper is gone
# This isn't really neccessary
def logged_offline():
env.storage_controller.assert_log_contains(
"Heartbeat round complete for 3 safekeepers, 1 offline"
)
wait_until(logged_offline)
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
env.create_tenant(tenant_id, timeline_id)
branch_timeline_id = env.create_branch("child_of_main", tenant_id)
log.info(
f"Creating tenant {tenant_id} with main timeline {timeline_id} and branch {branch_timeline_id}"
)
env.storage_controller.allowed_errors.extend(
[
".*Call to safekeeper.* management API still failed after.*",
f".*reconcile_one.*tenant_id={tenant_id}.*Call to safekeeper.* management API failed, will retry.*",
f".*reconcile_one.*tenant_id={tenant_id}.*Call to safekeeper.* management API still failed after.*",
]
)
env.safekeepers[1].assert_log_contains(f"creating new timeline {tenant_id}/{timeline_id}")
env.safekeepers[2].assert_log_contains(f"creating new timeline {tenant_id}/{timeline_id}")
if restart_storcon:
# Restart the storcon to check that we persist operations
env.storage_controller.stop()
env.storage_controller.start()
config_lines = [
"neon.safekeeper_proto_version = 3",
]
with env.endpoints.create("main", tenant_id=tenant_id, config_lines=config_lines) as ep:
# endpoint should start.
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3])
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")
with env.endpoints.create("child_of_main", tenant_id=tenant_id, config_lines=config_lines) as ep:
# endpoint should start.
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3])
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")
env.storage_controller.assert_log_contains("writing pending op for sk id 0")
env.safekeepers[0].start()
# ensure that we applied the operation also for the safekeeper we just brought down
def logged_contains_on_sk():
env.safekeepers[0].assert_log_contains(
f"pulling timeline {tenant_id}/{timeline_id} from safekeeper"
)
env.safekeepers[0].assert_log_contains(
f"pulling timeline {tenant_id}/{branch_timeline_id} from safekeeper"
)
wait_until(logged_contains_on_sk)
env.safekeepers[1].stop()
if delete_only_timeline:
env.storage_controller.pageserver_api().timeline_delete(tenant_id, branch_timeline_id)
else:
env.storage_controller.pageserver_api().tenant_delete(tenant_id)
def logged_delete_finished():
env.safekeepers[0].assert_log_contains(f"method=DELETE.*{tenant_id}.*Request handled")
env.safekeepers[2].assert_log_contains(f"method=DELETE.*{tenant_id}.*Request handled")
wait_until(logged_delete_finished)
env.safekeepers[0].assert_log_contains(
f"deleting timeline {tenant_id}/{branch_timeline_id} from disk"
)
env.safekeepers[2].assert_log_contains(
f"deleting timeline {tenant_id}/{branch_timeline_id} from disk"
)
root_was_deleted_on_0 = env.safekeepers[0].log_contains(
f"deleting timeline {tenant_id}/{timeline_id} from disk"
)
root_was_deleted_on_2 = env.safekeepers[2].log_contains(
f"deleting timeline {tenant_id}/{timeline_id} from disk"
)
assert (root_was_deleted_on_0 is None) == (root_was_deleted_on_2 is None)
# We only delete the root timeline iff the tenant delete was requested
if delete_only_timeline:
assert not root_was_deleted_on_0
else:
assert root_was_deleted_on_0
if restart_storcon:
# Restart the storcon to check that we persist operations
env.storage_controller.stop()
env.storage_controller.start()
env.safekeepers[1].start()
# ensure that there is log msgs for the third safekeeper too
def logged_deleted_on_sk():
env.safekeepers[1].assert_log_contains(
f"deleting timeline {tenant_id}/{branch_timeline_id} from disk"
)
wait_until(logged_deleted_on_sk)
@pytest.mark.parametrize("wrong_az", [True, False])
def test_storage_controller_graceful_migration(neon_env_builder: NeonEnvBuilder, wrong_az: bool):
"""