mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-15 12:10:37 +00:00
storage controller: miscellaneous improvements (#6800)
- Add some context to logs - Add tests for pageserver restarts when managed by storage controller - Make /location_config tolerate compute hook failures on shard creations, not just modifications.
This commit is contained in:
@@ -438,7 +438,7 @@ impl Reconciler {
|
||||
match self.observed.locations.get(&node_id) {
|
||||
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
|
||||
// Nothing to do
|
||||
tracing::info!("Observed configuration already correct.")
|
||||
tracing::info!(%node_id, "Observed configuration already correct.")
|
||||
}
|
||||
_ => {
|
||||
// In all cases other than a matching observed configuration, we will
|
||||
@@ -449,7 +449,7 @@ impl Reconciler {
|
||||
.increment_generation(self.tenant_shard_id, node_id)
|
||||
.await?;
|
||||
wanted_conf.generation = self.generation.into();
|
||||
tracing::info!("Observed configuration requires update.");
|
||||
tracing::info!(%node_id, "Observed configuration requires update.");
|
||||
self.location_config(node_id, wanted_conf, None).await?;
|
||||
self.compute_notify().await?;
|
||||
}
|
||||
|
||||
@@ -56,6 +56,11 @@ use crate::{
|
||||
PlacementPolicy, Sequence,
|
||||
};
|
||||
|
||||
// For operations that should be quick, like attaching a new tenant
|
||||
const SHORT_RECONCILE_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
|
||||
// For operations that might be slow, like migrating a tenant with
|
||||
// some data in it.
|
||||
const RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
|
||||
/// How long [`Service::startup_reconcile`] is allowed to take before it should give
|
||||
@@ -479,8 +484,8 @@ impl Service {
|
||||
async move {
|
||||
if let Err(e) = compute_hook.notify(tenant_shard_id, node_id, &cancel).await {
|
||||
tracing::error!(
|
||||
tenant_shard_id=%tenant_shard_id,
|
||||
node_id=%node_id,
|
||||
%tenant_shard_id,
|
||||
%node_id,
|
||||
"Failed to notify compute on startup for shard: {e}"
|
||||
);
|
||||
None
|
||||
@@ -1000,6 +1005,16 @@ impl Service {
|
||||
&self,
|
||||
create_req: TenantCreateRequest,
|
||||
) -> Result<TenantCreateResponse, ApiError> {
|
||||
let (response, waiters) = self.do_tenant_create(create_req).await?;
|
||||
|
||||
self.await_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await?;
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
pub(crate) async fn do_tenant_create(
|
||||
&self,
|
||||
create_req: TenantCreateRequest,
|
||||
) -> Result<(TenantCreateResponse, Vec<ReconcilerWaiter>), ApiError> {
|
||||
// This service expects to handle sharding itself: it is an error to try and directly create
|
||||
// a particular shard here.
|
||||
let tenant_id = if !create_req.new_tenant_id.is_unsharded() {
|
||||
@@ -1149,11 +1164,12 @@ impl Service {
|
||||
(waiters, response_shards)
|
||||
};
|
||||
|
||||
self.await_waiters(waiters).await?;
|
||||
|
||||
Ok(TenantCreateResponse {
|
||||
shards: response_shards,
|
||||
})
|
||||
Ok((
|
||||
TenantCreateResponse {
|
||||
shards: response_shards,
|
||||
},
|
||||
waiters,
|
||||
))
|
||||
}
|
||||
|
||||
/// Helper for functions that reconcile a number of shards, and would like to do a timeout-bounded
|
||||
@@ -1161,8 +1177,9 @@ impl Service {
|
||||
async fn await_waiters(
|
||||
&self,
|
||||
waiters: Vec<ReconcilerWaiter>,
|
||||
timeout: Duration,
|
||||
) -> Result<(), ReconcileWaitError> {
|
||||
let deadline = Instant::now().checked_add(Duration::from_secs(30)).unwrap();
|
||||
let deadline = Instant::now().checked_add(timeout).unwrap();
|
||||
for waiter in waiters {
|
||||
let timeout = deadline.duration_since(Instant::now());
|
||||
waiter.wait_timeout(timeout).await?;
|
||||
@@ -1300,12 +1317,8 @@ impl Service {
|
||||
}
|
||||
};
|
||||
|
||||
// TODO: if we timeout/fail on reconcile, we should still succeed this request,
|
||||
// because otherwise a broken compute hook causes a feedback loop where
|
||||
// location_config returns 500 and gets retried forever.
|
||||
|
||||
if let Some(create_req) = maybe_create {
|
||||
let create_resp = self.tenant_create(create_req).await?;
|
||||
let waiters = if let Some(create_req) = maybe_create {
|
||||
let (create_resp, waiters) = self.do_tenant_create(create_req).await?;
|
||||
result.shards = create_resp
|
||||
.shards
|
||||
.into_iter()
|
||||
@@ -1314,19 +1327,25 @@ impl Service {
|
||||
shard_id: s.shard_id,
|
||||
})
|
||||
.collect();
|
||||
waiters
|
||||
} else {
|
||||
// This was an update, wait for reconciliation
|
||||
if let Err(e) = self.await_waiters(waiters).await {
|
||||
// Do not treat a reconcile error as fatal: we have already applied any requested
|
||||
// Intent changes, and the reconcile can fail for external reasons like unavailable
|
||||
// compute notification API. In these cases, it is important that we do not
|
||||
// cause the cloud control plane to retry forever on this API.
|
||||
tracing::warn!(
|
||||
"Failed to reconcile after /location_config: {e}, returning success anyway"
|
||||
);
|
||||
}
|
||||
waiters
|
||||
};
|
||||
|
||||
if let Err(e) = self.await_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await {
|
||||
// Do not treat a reconcile error as fatal: we have already applied any requested
|
||||
// Intent changes, and the reconcile can fail for external reasons like unavailable
|
||||
// compute notification API. In these cases, it is important that we do not
|
||||
// cause the cloud control plane to retry forever on this API.
|
||||
tracing::warn!(
|
||||
"Failed to reconcile after /location_config: {e}, returning success anyway"
|
||||
);
|
||||
}
|
||||
|
||||
// Logging the full result is useful because it lets us cross-check what the cloud control
|
||||
// plane's tenant_shards table should contain.
|
||||
tracing::info!("Complete, returning {result:?}");
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
|
||||
@@ -302,6 +302,15 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
)
|
||||
self.verbose_error(res)
|
||||
|
||||
def tenant_list_locations(self):
|
||||
res = self.get(
|
||||
f"http://localhost:{self.port}/v1/location_config",
|
||||
)
|
||||
self.verbose_error(res)
|
||||
res_json = res.json()
|
||||
assert isinstance(res_json["tenant_shards"], list)
|
||||
return res_json
|
||||
|
||||
def tenant_delete(self, tenant_id: Union[TenantId, TenantShardId]):
|
||||
res = self.delete(f"http://localhost:{self.port}/v1/tenant/{tenant_id}")
|
||||
self.verbose_error(res)
|
||||
|
||||
@@ -235,11 +235,6 @@ def test_sharding_split_smoke(
|
||||
all_shards = tenant_get_shards(env, tenant_id)
|
||||
for tenant_shard_id, pageserver in all_shards:
|
||||
pageserver.http_client().timeline_gc(tenant_shard_id, timeline_id, None)
|
||||
|
||||
# Restart all nodes, to check that the newly created shards are durable
|
||||
for ps in env.pageservers:
|
||||
ps.restart()
|
||||
|
||||
workload.validate()
|
||||
|
||||
migrate_to_pageserver_ids = list(
|
||||
@@ -288,6 +283,32 @@ def test_sharding_split_smoke(
|
||||
|
||||
env.attachment_service.consistency_check()
|
||||
|
||||
# Validate pageserver state
|
||||
shards_exist: list[TenantShardId] = []
|
||||
for pageserver in env.pageservers:
|
||||
locations = pageserver.http_client().tenant_list_locations()
|
||||
shards_exist.extend(TenantShardId.parse(s[0]) for s in locations["tenant_shards"])
|
||||
|
||||
log.info("Shards after split: {shards_exist}")
|
||||
assert len(shards_exist) == split_shard_count
|
||||
|
||||
# Ensure post-split pageserver locations survive a restart (i.e. the child shards
|
||||
# correctly wrote config to disk, and the storage controller responds correctly
|
||||
# to /re-attach)
|
||||
for pageserver in env.pageservers:
|
||||
pageserver.stop()
|
||||
pageserver.start()
|
||||
|
||||
shards_exist = []
|
||||
for pageserver in env.pageservers:
|
||||
locations = pageserver.http_client().tenant_list_locations()
|
||||
shards_exist.extend(TenantShardId.parse(s[0]) for s in locations["tenant_shards"])
|
||||
|
||||
log.info("Shards after restart: {shards_exist}")
|
||||
assert len(shards_exist) == split_shard_count
|
||||
|
||||
workload.validate()
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
# The quantity of data isn't huge, but debug can be _very_ slow, and the things we're
|
||||
|
||||
@@ -125,6 +125,20 @@ def test_sharding_service_smoke(
|
||||
time.sleep(1)
|
||||
assert get_node_shard_counts(env, tenant_ids)[env.pageservers[0].id] == 0
|
||||
|
||||
# Restarting a pageserver should not detach any tenants (i.e. /re-attach works)
|
||||
before_restart = env.pageservers[1].http_client().tenant_list_locations()
|
||||
env.pageservers[1].stop()
|
||||
env.pageservers[1].start()
|
||||
after_restart = env.pageservers[1].http_client().tenant_list_locations()
|
||||
assert len(after_restart) == len(before_restart)
|
||||
|
||||
# Locations should be the same before & after restart, apart from generations
|
||||
for _shard_id, tenant in after_restart["tenant_shards"]:
|
||||
del tenant["generation"]
|
||||
for _shard_id, tenant in before_restart["tenant_shards"]:
|
||||
del tenant["generation"]
|
||||
assert before_restart == after_restart
|
||||
|
||||
# Delete all the tenants
|
||||
for tid in tenant_ids:
|
||||
tenant_delete_wait_completed(env.attachment_service.pageserver_api(), tid, 10)
|
||||
|
||||
Reference in New Issue
Block a user