mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-15 12:10:37 +00:00
Compare commits
9 Commits
skyzh/sche
...
arpad/sk_t
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fec5530c9e | ||
|
|
8d5d6b24df | ||
|
|
0a5b9506ae | ||
|
|
48d712af1b | ||
|
|
1a78e9e9cf | ||
|
|
484208b5a8 | ||
|
|
26d636ec14 | ||
|
|
579069d74b | ||
|
|
a02dcaea6b |
@@ -227,6 +227,8 @@ pub struct TimelineDeleteResult {
|
||||
pub dir_existed: bool,
|
||||
}
|
||||
|
||||
pub type TenantDeleteResult = std::collections::HashMap<String, TimelineDeleteResult>;
|
||||
|
||||
fn lsn_invalid() -> Lsn {
|
||||
Lsn::INVALID
|
||||
}
|
||||
|
||||
@@ -115,13 +115,17 @@ impl Client {
|
||||
"{}/v1/tenant/{}/timeline/{}",
|
||||
self.mgmt_api_endpoint, tenant_id, timeline_id
|
||||
);
|
||||
let resp = self.request(Method::DELETE, &uri, ()).await?;
|
||||
let resp = self
|
||||
.request_maybe_body(Method::DELETE, &uri, None::<()>)
|
||||
.await?;
|
||||
resp.json().await.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
pub async fn delete_tenant(&self, tenant_id: TenantId) -> Result<models::TimelineDeleteResult> {
|
||||
pub async fn delete_tenant(&self, tenant_id: TenantId) -> Result<models::TenantDeleteResult> {
|
||||
let uri = format!("{}/v1/tenant/{}", self.mgmt_api_endpoint, tenant_id);
|
||||
let resp = self.request(Method::DELETE, &uri, ()).await?;
|
||||
let resp = self
|
||||
.request_maybe_body(Method::DELETE, &uri, None::<()>)
|
||||
.await?;
|
||||
resp.json().await.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
@@ -197,6 +201,16 @@ impl Client {
|
||||
method: Method,
|
||||
uri: U,
|
||||
body: B,
|
||||
) -> Result<reqwest::Response> {
|
||||
self.request_maybe_body(method, uri, Some(body)).await
|
||||
}
|
||||
|
||||
/// Send the request and check that the status code is good, with an optional body.
|
||||
async fn request_maybe_body<B: serde::Serialize, U: reqwest::IntoUrl>(
|
||||
&self,
|
||||
method: Method,
|
||||
uri: U,
|
||||
body: Option<B>,
|
||||
) -> Result<reqwest::Response> {
|
||||
let res = self.request_noerror(method, uri, body).await?;
|
||||
let response = res.error_from_body().await?;
|
||||
@@ -208,12 +222,15 @@ impl Client {
|
||||
&self,
|
||||
method: Method,
|
||||
uri: U,
|
||||
body: B,
|
||||
body: Option<B>,
|
||||
) -> Result<reqwest::Response> {
|
||||
let mut req = self.client.request(method, uri);
|
||||
if let Some(value) = &self.authorization_header {
|
||||
req = req.header(reqwest::header::AUTHORIZATION, value.get_contents())
|
||||
}
|
||||
req.json(&body).send().await.map_err(Error::ReceiveBody)
|
||||
if let Some(body) = body {
|
||||
req = req.json(&body);
|
||||
}
|
||||
req.send().await.map_err(Error::ReceiveBody)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,9 +16,9 @@ use http_utils::{RequestExt, RouterBuilder};
|
||||
use hyper::{Body, Request, Response, StatusCode};
|
||||
use postgres_ffi::WAL_SEGMENT_SIZE;
|
||||
use safekeeper_api::models::{
|
||||
AcceptorStateStatus, PullTimelineRequest, SafekeeperStatus, SkTimelineInfo, TermSwitchApiEntry,
|
||||
TimelineCopyRequest, TimelineCreateRequest, TimelineDeleteResult, TimelineStatus,
|
||||
TimelineTermBumpRequest,
|
||||
AcceptorStateStatus, PullTimelineRequest, SafekeeperStatus, SkTimelineInfo, TenantDeleteResult,
|
||||
TermSwitchApiEntry, TimelineCopyRequest, TimelineCreateRequest, TimelineDeleteResult,
|
||||
TimelineStatus, TimelineTermBumpRequest,
|
||||
};
|
||||
use safekeeper_api::{ServerInfo, membership, models};
|
||||
use storage_broker::proto::{SafekeeperTimelineInfo, TenantTimelineId as ProtoTenantTimelineId};
|
||||
@@ -83,13 +83,11 @@ async fn tenant_delete_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
.delete_all_for_tenant(&tenant_id, action)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
json_response(
|
||||
StatusCode::OK,
|
||||
delete_info
|
||||
.iter()
|
||||
.map(|(ttid, resp)| (format!("{}", ttid.timeline_id), *resp))
|
||||
.collect::<HashMap<String, TimelineDeleteResult>>(),
|
||||
)
|
||||
let response_body: TenantDeleteResult = delete_info
|
||||
.iter()
|
||||
.map(|(ttid, resp)| (format!("{}", ttid.timeline_id), *resp))
|
||||
.collect::<HashMap<String, TimelineDeleteResult>>();
|
||||
json_response(StatusCode::OK, response_body)
|
||||
}
|
||||
|
||||
async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
|
||||
@@ -1524,25 +1524,14 @@ impl Persistence {
|
||||
/// Load pending operations from db.
|
||||
pub(crate) async fn list_pending_ops(
|
||||
&self,
|
||||
filter_for_sk: Option<NodeId>,
|
||||
) -> DatabaseResult<Vec<TimelinePendingOpPersistence>> {
|
||||
use crate::schema::safekeeper_timeline_pending_ops::dsl;
|
||||
|
||||
const FILTER_VAL_1: i64 = 1;
|
||||
const FILTER_VAL_2: i64 = 2;
|
||||
let filter_opt = filter_for_sk.map(|id| id.0 as i64);
|
||||
let timeline_from_db = self
|
||||
.with_measured_conn(DatabaseOperation::ListTimelineReconcile, move |conn| {
|
||||
Box::pin(async move {
|
||||
let from_db: Vec<TimelinePendingOpPersistence> =
|
||||
dsl::safekeeper_timeline_pending_ops
|
||||
.filter(
|
||||
dsl::sk_id
|
||||
.eq(filter_opt.unwrap_or(FILTER_VAL_1))
|
||||
.and(dsl::sk_id.eq(filter_opt.unwrap_or(FILTER_VAL_2))),
|
||||
)
|
||||
.load(conn)
|
||||
.await?;
|
||||
dsl::safekeeper_timeline_pending_ops.load(conn).await?;
|
||||
Ok(from_db)
|
||||
})
|
||||
})
|
||||
|
||||
@@ -101,7 +101,7 @@ impl SafekeeperClient {
|
||||
pub(crate) async fn delete_tenant(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<models::TimelineDeleteResult> {
|
||||
) -> Result<models::TenantDeleteResult> {
|
||||
measured_request!(
|
||||
"delete_tenant",
|
||||
crate::metrics::Method::Delete,
|
||||
|
||||
@@ -35,6 +35,10 @@ impl SafekeeperReconcilers {
|
||||
service: &Arc<Service>,
|
||||
reqs: Vec<ScheduleRequest>,
|
||||
) {
|
||||
tracing::info!(
|
||||
"Scheduling {} pending safekeeper ops loaded from db",
|
||||
reqs.len()
|
||||
);
|
||||
for req in reqs {
|
||||
self.schedule_request(service, req);
|
||||
}
|
||||
@@ -74,7 +78,7 @@ pub(crate) async fn load_schedule_requests(
|
||||
service: &Arc<Service>,
|
||||
safekeepers: &HashMap<NodeId, Safekeeper>,
|
||||
) -> anyhow::Result<Vec<ScheduleRequest>> {
|
||||
let pending_ops = service.persistence.list_pending_ops(None).await?;
|
||||
let pending_ops = service.persistence.list_pending_ops().await?;
|
||||
let mut res = Vec::with_capacity(pending_ops.len());
|
||||
for op_persist in pending_ops {
|
||||
let node_id = NodeId(op_persist.sk_id as u64);
|
||||
@@ -232,12 +236,14 @@ impl SafekeeperReconciler {
|
||||
let kind = req.kind;
|
||||
let tenant_id = req.tenant_id;
|
||||
let timeline_id = req.timeline_id;
|
||||
let node_id = req.safekeeper.skp.id;
|
||||
self.reconcile_one(req, req_cancel)
|
||||
.instrument(tracing::info_span!(
|
||||
"reconcile_one",
|
||||
?kind,
|
||||
%tenant_id,
|
||||
?timeline_id
|
||||
?timeline_id,
|
||||
%node_id,
|
||||
))
|
||||
.await;
|
||||
}
|
||||
|
||||
@@ -4073,6 +4073,134 @@ def test_storage_controller_location_conf_equivalence(neon_env_builder: NeonEnvB
|
||||
assert reconciles_after_restart == 0
|
||||
|
||||
|
||||
@pytest.mark.parametrize("restart_storcon", [True, False])
|
||||
@pytest.mark.parametrize("delete_only_timeline", [True, False])
|
||||
def test_storcon_create_delete_sk_down(
|
||||
neon_env_builder: NeonEnvBuilder, restart_storcon: bool, delete_only_timeline: bool
|
||||
):
|
||||
"""
|
||||
Test that the storcon can create and delete tenants and timelines with a safekeeper being down.
|
||||
"""
|
||||
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
neon_env_builder.storage_controller_config = {
|
||||
"timelines_onto_safekeepers": True,
|
||||
}
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.safekeepers[0].stop()
|
||||
|
||||
# Wait for heartbeater to pick up that the safekeeper is gone
|
||||
# This isn't really neccessary
|
||||
def logged_offline():
|
||||
env.storage_controller.assert_log_contains(
|
||||
"Heartbeat round complete for 3 safekeepers, 1 offline"
|
||||
)
|
||||
|
||||
wait_until(logged_offline)
|
||||
|
||||
tenant_id = TenantId.generate()
|
||||
timeline_id = TimelineId.generate()
|
||||
env.create_tenant(tenant_id, timeline_id)
|
||||
branch_timeline_id = env.create_branch("child_of_main", tenant_id)
|
||||
|
||||
log.info(
|
||||
f"Creating tenant {tenant_id} with main timeline {timeline_id} and branch {branch_timeline_id}"
|
||||
)
|
||||
|
||||
env.storage_controller.allowed_errors.extend(
|
||||
[
|
||||
".*Call to safekeeper.* management API still failed after.*",
|
||||
f".*reconcile_one.*tenant_id={tenant_id}.*Call to safekeeper.* management API failed, will retry.*",
|
||||
f".*reconcile_one.*tenant_id={tenant_id}.*Call to safekeeper.* management API still failed after.*",
|
||||
]
|
||||
)
|
||||
|
||||
env.safekeepers[1].assert_log_contains(f"creating new timeline {tenant_id}/{timeline_id}")
|
||||
env.safekeepers[2].assert_log_contains(f"creating new timeline {tenant_id}/{timeline_id}")
|
||||
|
||||
if restart_storcon:
|
||||
# Restart the storcon to check that we persist operations
|
||||
env.storage_controller.stop()
|
||||
env.storage_controller.start()
|
||||
|
||||
config_lines = [
|
||||
"neon.safekeeper_proto_version = 3",
|
||||
]
|
||||
with env.endpoints.create("main", tenant_id=tenant_id, config_lines=config_lines) as ep:
|
||||
# endpoint should start.
|
||||
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3])
|
||||
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")
|
||||
|
||||
with env.endpoints.create("child_of_main", tenant_id=tenant_id, config_lines=config_lines) as ep:
|
||||
# endpoint should start.
|
||||
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3])
|
||||
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")
|
||||
|
||||
env.storage_controller.assert_log_contains("writing pending op for sk id 0")
|
||||
env.safekeepers[0].start()
|
||||
|
||||
# ensure that we applied the operation also for the safekeeper we just brought down
|
||||
def logged_contains_on_sk():
|
||||
env.safekeepers[0].assert_log_contains(
|
||||
f"pulling timeline {tenant_id}/{timeline_id} from safekeeper"
|
||||
)
|
||||
env.safekeepers[0].assert_log_contains(
|
||||
f"pulling timeline {tenant_id}/{branch_timeline_id} from safekeeper"
|
||||
)
|
||||
|
||||
wait_until(logged_contains_on_sk)
|
||||
|
||||
env.safekeepers[1].stop()
|
||||
|
||||
if delete_only_timeline:
|
||||
env.storage_controller.pageserver_api().timeline_delete(tenant_id, branch_timeline_id)
|
||||
else:
|
||||
env.storage_controller.pageserver_api().tenant_delete(tenant_id)
|
||||
|
||||
def logged_delete_finished():
|
||||
env.safekeepers[0].assert_log_contains(f"method=DELETE.*{tenant_id}.*Request handled")
|
||||
env.safekeepers[2].assert_log_contains(f"method=DELETE.*{tenant_id}.*Request handled")
|
||||
|
||||
wait_until(logged_delete_finished)
|
||||
|
||||
env.safekeepers[0].assert_log_contains(
|
||||
f"deleting timeline {tenant_id}/{branch_timeline_id} from disk"
|
||||
)
|
||||
env.safekeepers[2].assert_log_contains(
|
||||
f"deleting timeline {tenant_id}/{branch_timeline_id} from disk"
|
||||
)
|
||||
|
||||
root_was_deleted_on_0 = env.safekeepers[0].log_contains(
|
||||
f"deleting timeline {tenant_id}/{timeline_id} from disk"
|
||||
)
|
||||
root_was_deleted_on_2 = env.safekeepers[2].log_contains(
|
||||
f"deleting timeline {tenant_id}/{timeline_id} from disk"
|
||||
)
|
||||
assert (root_was_deleted_on_0 is None) == (root_was_deleted_on_2 is None)
|
||||
|
||||
# We only delete the root timeline iff the tenant delete was requested
|
||||
if delete_only_timeline:
|
||||
assert not root_was_deleted_on_0
|
||||
else:
|
||||
assert root_was_deleted_on_0
|
||||
|
||||
if restart_storcon:
|
||||
# Restart the storcon to check that we persist operations
|
||||
env.storage_controller.stop()
|
||||
env.storage_controller.start()
|
||||
|
||||
env.safekeepers[1].start()
|
||||
|
||||
# ensure that there is log msgs for the third safekeeper too
|
||||
def logged_deleted_on_sk():
|
||||
env.safekeepers[1].assert_log_contains(
|
||||
f"deleting timeline {tenant_id}/{branch_timeline_id} from disk"
|
||||
)
|
||||
|
||||
wait_until(logged_deleted_on_sk)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("wrong_az", [True, False])
|
||||
def test_storage_controller_graceful_migration(neon_env_builder: NeonEnvBuilder, wrong_az: bool):
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user