From b5246753bfe89221492823f74e7cdc284dcb8541 Mon Sep 17 00:00:00 2001 From: John Spray Date: Thu, 22 Feb 2024 09:33:40 +0000 Subject: [PATCH] storage controller: miscellaneous improvements (#6800) - Add some context to logs - Add tests for pageserver restarts when managed by storage controller - Make /location_config tolerate compute hook failures on shard creations, not just modifications. --- .../attachment_service/src/reconciler.rs | 4 +- .../attachment_service/src/service.rs | 67 ++++++++++++------- test_runner/fixtures/pageserver/http.py | 9 +++ test_runner/regress/test_sharding.py | 31 +++++++-- test_runner/regress/test_sharding_service.py | 14 ++++ 5 files changed, 94 insertions(+), 31 deletions(-) diff --git a/control_plane/attachment_service/src/reconciler.rs b/control_plane/attachment_service/src/reconciler.rs index cdd6f76b14..751b06f93a 100644 --- a/control_plane/attachment_service/src/reconciler.rs +++ b/control_plane/attachment_service/src/reconciler.rs @@ -438,7 +438,7 @@ impl Reconciler { match self.observed.locations.get(&node_id) { Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => { // Nothing to do - tracing::info!("Observed configuration already correct.") + tracing::info!(%node_id, "Observed configuration already correct.") } _ => { // In all cases other than a matching observed configuration, we will @@ -449,7 +449,7 @@ impl Reconciler { .increment_generation(self.tenant_shard_id, node_id) .await?; wanted_conf.generation = self.generation.into(); - tracing::info!("Observed configuration requires update."); + tracing::info!(%node_id, "Observed configuration requires update."); self.location_config(node_id, wanted_conf, None).await?; self.compute_notify().await?; } diff --git a/control_plane/attachment_service/src/service.rs b/control_plane/attachment_service/src/service.rs index 74e1296709..6366348017 100644 --- a/control_plane/attachment_service/src/service.rs +++ b/control_plane/attachment_service/src/service.rs @@ -56,6 +56,11 @@ use crate::{ PlacementPolicy, Sequence, }; +// For operations that should be quick, like attaching a new tenant +const SHORT_RECONCILE_TIMEOUT: Duration = Duration::from_secs(5); + +// For operations that might be slow, like migrating a tenant with +// some data in it. const RECONCILE_TIMEOUT: Duration = Duration::from_secs(30); /// How long [`Service::startup_reconcile`] is allowed to take before it should give @@ -479,8 +484,8 @@ impl Service { async move { if let Err(e) = compute_hook.notify(tenant_shard_id, node_id, &cancel).await { tracing::error!( - tenant_shard_id=%tenant_shard_id, - node_id=%node_id, + %tenant_shard_id, + %node_id, "Failed to notify compute on startup for shard: {e}" ); None @@ -1000,6 +1005,16 @@ impl Service { &self, create_req: TenantCreateRequest, ) -> Result { + let (response, waiters) = self.do_tenant_create(create_req).await?; + + self.await_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await?; + Ok(response) + } + + pub(crate) async fn do_tenant_create( + &self, + create_req: TenantCreateRequest, + ) -> Result<(TenantCreateResponse, Vec), ApiError> { // This service expects to handle sharding itself: it is an error to try and directly create // a particular shard here. let tenant_id = if !create_req.new_tenant_id.is_unsharded() { @@ -1149,11 +1164,12 @@ impl Service { (waiters, response_shards) }; - self.await_waiters(waiters).await?; - - Ok(TenantCreateResponse { - shards: response_shards, - }) + Ok(( + TenantCreateResponse { + shards: response_shards, + }, + waiters, + )) } /// Helper for functions that reconcile a number of shards, and would like to do a timeout-bounded @@ -1161,8 +1177,9 @@ impl Service { async fn await_waiters( &self, waiters: Vec, + timeout: Duration, ) -> Result<(), ReconcileWaitError> { - let deadline = Instant::now().checked_add(Duration::from_secs(30)).unwrap(); + let deadline = Instant::now().checked_add(timeout).unwrap(); for waiter in waiters { let timeout = deadline.duration_since(Instant::now()); waiter.wait_timeout(timeout).await?; @@ -1300,12 +1317,8 @@ impl Service { } }; - // TODO: if we timeout/fail on reconcile, we should still succeed this request, - // because otherwise a broken compute hook causes a feedback loop where - // location_config returns 500 and gets retried forever. - - if let Some(create_req) = maybe_create { - let create_resp = self.tenant_create(create_req).await?; + let waiters = if let Some(create_req) = maybe_create { + let (create_resp, waiters) = self.do_tenant_create(create_req).await?; result.shards = create_resp .shards .into_iter() @@ -1314,19 +1327,25 @@ impl Service { shard_id: s.shard_id, }) .collect(); + waiters } else { - // This was an update, wait for reconciliation - if let Err(e) = self.await_waiters(waiters).await { - // Do not treat a reconcile error as fatal: we have already applied any requested - // Intent changes, and the reconcile can fail for external reasons like unavailable - // compute notification API. In these cases, it is important that we do not - // cause the cloud control plane to retry forever on this API. - tracing::warn!( - "Failed to reconcile after /location_config: {e}, returning success anyway" - ); - } + waiters + }; + + if let Err(e) = self.await_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await { + // Do not treat a reconcile error as fatal: we have already applied any requested + // Intent changes, and the reconcile can fail for external reasons like unavailable + // compute notification API. In these cases, it is important that we do not + // cause the cloud control plane to retry forever on this API. + tracing::warn!( + "Failed to reconcile after /location_config: {e}, returning success anyway" + ); } + // Logging the full result is useful because it lets us cross-check what the cloud control + // plane's tenant_shards table should contain. + tracing::info!("Complete, returning {result:?}"); + Ok(result) } diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index 98eb89d30c..427ef00c78 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -302,6 +302,15 @@ class PageserverHttpClient(requests.Session, MetricsGetter): ) self.verbose_error(res) + def tenant_list_locations(self): + res = self.get( + f"http://localhost:{self.port}/v1/location_config", + ) + self.verbose_error(res) + res_json = res.json() + assert isinstance(res_json["tenant_shards"], list) + return res_json + def tenant_delete(self, tenant_id: Union[TenantId, TenantShardId]): res = self.delete(f"http://localhost:{self.port}/v1/tenant/{tenant_id}") self.verbose_error(res) diff --git a/test_runner/regress/test_sharding.py b/test_runner/regress/test_sharding.py index 5413b178a5..57c8d1f849 100644 --- a/test_runner/regress/test_sharding.py +++ b/test_runner/regress/test_sharding.py @@ -235,11 +235,6 @@ def test_sharding_split_smoke( all_shards = tenant_get_shards(env, tenant_id) for tenant_shard_id, pageserver in all_shards: pageserver.http_client().timeline_gc(tenant_shard_id, timeline_id, None) - - # Restart all nodes, to check that the newly created shards are durable - for ps in env.pageservers: - ps.restart() - workload.validate() migrate_to_pageserver_ids = list( @@ -288,6 +283,32 @@ def test_sharding_split_smoke( env.attachment_service.consistency_check() + # Validate pageserver state + shards_exist: list[TenantShardId] = [] + for pageserver in env.pageservers: + locations = pageserver.http_client().tenant_list_locations() + shards_exist.extend(TenantShardId.parse(s[0]) for s in locations["tenant_shards"]) + + log.info("Shards after split: {shards_exist}") + assert len(shards_exist) == split_shard_count + + # Ensure post-split pageserver locations survive a restart (i.e. the child shards + # correctly wrote config to disk, and the storage controller responds correctly + # to /re-attach) + for pageserver in env.pageservers: + pageserver.stop() + pageserver.start() + + shards_exist = [] + for pageserver in env.pageservers: + locations = pageserver.http_client().tenant_list_locations() + shards_exist.extend(TenantShardId.parse(s[0]) for s in locations["tenant_shards"]) + + log.info("Shards after restart: {shards_exist}") + assert len(shards_exist) == split_shard_count + + workload.validate() + @pytest.mark.skipif( # The quantity of data isn't huge, but debug can be _very_ slow, and the things we're diff --git a/test_runner/regress/test_sharding_service.py b/test_runner/regress/test_sharding_service.py index 6525f9733f..e62d239d77 100644 --- a/test_runner/regress/test_sharding_service.py +++ b/test_runner/regress/test_sharding_service.py @@ -125,6 +125,20 @@ def test_sharding_service_smoke( time.sleep(1) assert get_node_shard_counts(env, tenant_ids)[env.pageservers[0].id] == 0 + # Restarting a pageserver should not detach any tenants (i.e. /re-attach works) + before_restart = env.pageservers[1].http_client().tenant_list_locations() + env.pageservers[1].stop() + env.pageservers[1].start() + after_restart = env.pageservers[1].http_client().tenant_list_locations() + assert len(after_restart) == len(before_restart) + + # Locations should be the same before & after restart, apart from generations + for _shard_id, tenant in after_restart["tenant_shards"]: + del tenant["generation"] + for _shard_id, tenant in before_restart["tenant_shards"]: + del tenant["generation"] + assert before_restart == after_restart + # Delete all the tenants for tid in tenant_ids: tenant_delete_wait_completed(env.attachment_service.pageserver_api(), tid, 10)