From da5e03b0d83fa85461a4e71fdceb4a7bb298c666 Mon Sep 17 00:00:00 2001 From: John Spray Date: Tue, 5 Dec 2023 15:38:27 +0000 Subject: [PATCH] pageserver: add a /reset API for tenants (#6014) ## Problem Traditionally we would detach/attach directly with curl if we wanted to "reboot" a single tenant. That's kind of inconvenient these days, because one needs to know a generation number to issue an attach request. Closes: https://github.com/neondatabase/neon/issues/6011 ## Summary of changes - Introduce a new `/reset` API, which remembers the LocationConf from the current attachment so that callers do not have to work out the correct configuration/generation to use. - As an additional support tool, allow an optional `drop_cache` query parameter, for situations where we are concerned that some on-disk state might be bad and want to clear that as well as the in-memory state. One might wonder why I didn't call this "reattach" -- it's because there's already a PS->CP API of that name and it could get confusing. --- pageserver/src/http/routes.rs | 23 ++++++ pageserver/src/tenant/mgr.rs | 85 +++++++++++++++++++++-- test_runner/fixtures/pageserver/http.py | 8 +++ test_runner/regress/test_tenant_detach.py | 25 +++++-- 4 files changed, 132 insertions(+), 9 deletions(-) diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 71b7ea05ec..14b667eeba 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -709,6 +709,26 @@ async fn tenant_detach_handler( json_response(StatusCode::OK, ()) } +async fn tenant_reset_handler( + request: Request, + _cancel: CancellationToken, +) -> Result, ApiError> { + let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; + check_permission(&request, Some(tenant_shard_id.tenant_id))?; + + let drop_cache: Option = parse_query_param(&request, "drop_cache")?; + + let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn); + let state = get_state(&request); + state + .tenant_manager + .reset_tenant(tenant_shard_id, drop_cache.unwrap_or(false), ctx) + .await + .map_err(ApiError::InternalServerError)?; + + json_response(StatusCode::OK, ()) +} + async fn tenant_load_handler( mut request: Request, _cancel: CancellationToken, @@ -1828,6 +1848,9 @@ pub fn make_router( .post("/v1/tenant/:tenant_id/detach", |r| { api_handler(r, tenant_detach_handler) }) + .post("/v1/tenant/:tenant_shard_id/reset", |r| { + api_handler(r, tenant_reset_handler) + }) .post("/v1/tenant/:tenant_id/load", |r| { api_handler(r, tenant_load_handler) }) diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index c09270112f..d9d44d1f8f 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -1038,6 +1038,81 @@ impl TenantManager { Ok(()) } + + /// Resetting a tenant is equivalent to detaching it, then attaching it again with the same + /// LocationConf that was last used to attach it. Optionally, the local file cache may be + /// dropped before re-attaching. + /// + /// This is not part of a tenant's normal lifecycle: it is used for debug/support, in situations + /// where an issue is identified that would go away with a restart of the tenant. + /// + /// This does not have any special "force" shutdown of a tenant: it relies on the tenant's tasks + /// to respect the cancellation tokens used in normal shutdown(). + #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %drop_cache))] + pub(crate) async fn reset_tenant( + &self, + tenant_shard_id: TenantShardId, + drop_cache: bool, + ctx: RequestContext, + ) -> anyhow::Result<()> { + let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?; + let Some(old_slot) = slot_guard.get_old_value() else { + anyhow::bail!("Tenant not found when trying to reset"); + }; + + let Some(tenant) = old_slot.get_attached() else { + slot_guard.revert(); + anyhow::bail!("Tenant is not in attached state"); + }; + + let (_guard, progress) = utils::completion::channel(); + match tenant.shutdown(progress, false).await { + Ok(()) => { + slot_guard.drop_old_value()?; + } + Err(_barrier) => { + slot_guard.revert(); + anyhow::bail!("Cannot reset Tenant, already shutting down"); + } + } + + let tenant_path = self.conf.tenant_path(&tenant_shard_id); + let timelines_path = self.conf.timelines_path(&tenant_shard_id); + let config = Tenant::load_tenant_config(self.conf, &tenant_shard_id)?; + + if drop_cache { + tracing::info!("Dropping local file cache"); + + match tokio::fs::read_dir(&timelines_path).await { + Err(e) => { + tracing::warn!("Failed to list timelines while dropping cache: {}", e); + } + Ok(mut entries) => { + while let Some(entry) = entries.next_entry().await? { + tokio::fs::remove_dir_all(entry.path()).await?; + } + } + } + } + + let shard_identity = config.shard; + let tenant = tenant_spawn( + self.conf, + tenant_shard_id, + &tenant_path, + self.resources.clone(), + AttachedTenantConf::try_from(config)?, + shard_identity, + None, + self.tenants, + SpawnMode::Normal, + &ctx, + )?; + + slot_guard.upsert(TenantSlot::Attached(tenant))?; + + Ok(()) + } } #[derive(Debug, thiserror::Error)] @@ -1246,8 +1321,7 @@ pub(crate) async fn delete_tenant( // See https://github.com/neondatabase/neon/issues/5080 // TODO(sharding): make delete API sharding-aware - let mut slot_guard = - tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist)?; + let slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist)?; // unwrap is safe because we used MustExist mode when acquiring let tenant = match slot_guard.get_old_value().as_ref().unwrap() { @@ -1574,9 +1648,10 @@ pub enum TenantSlotUpsertError { MapState(#[from] TenantMapError), } -#[derive(Debug)] +#[derive(Debug, thiserror::Error)] enum TenantSlotDropError { /// It is only legal to drop a TenantSlot if its contents are fully shut down + #[error("Tenant was not shut down")] NotShutdown, } @@ -1636,9 +1711,9 @@ impl SlotGuard { } } - /// Take any value that was present in the slot before we acquired ownership + /// Get any value that was present in the slot before we acquired ownership /// of it: in state transitions, this will be the old state. - fn get_old_value(&mut self) -> &Option { + fn get_old_value(&self) -> &Option { &self.old_value } diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index 76aa40122f..eccab5fb6a 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -260,6 +260,14 @@ class PageserverHttpClient(requests.Session): res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/detach", params=params) self.verbose_error(res) + def tenant_reset(self, tenant_id: TenantId, drop_cache: bool): + params = {} + if drop_cache: + params["drop_cache"] = "true" + + res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/reset", params=params) + self.verbose_error(res) + def tenant_delete(self, tenant_id: TenantId): res = self.delete(f"http://localhost:{self.port}/v1/tenant/{tenant_id}") self.verbose_error(res) diff --git a/test_runner/regress/test_tenant_detach.py b/test_runner/regress/test_tenant_detach.py index 0bd3800480..df497c0f7b 100644 --- a/test_runner/regress/test_tenant_detach.py +++ b/test_runner/regress/test_tenant_detach.py @@ -1,4 +1,5 @@ import asyncio +import enum import random import time from threading import Thread @@ -51,11 +52,20 @@ def do_gc_target( log.info("gc http thread returning") +class ReattachMode(str, enum.Enum): + REATTACH_EXPLICIT = "explicit" + REATTACH_RESET = "reset" + REATTACH_RESET_DROP = "reset" + + # Basic detach and re-attach test @pytest.mark.parametrize("remote_storage_kind", available_remote_storages()) +@pytest.mark.parametrize( + "mode", + [ReattachMode.REATTACH_EXPLICIT, ReattachMode.REATTACH_RESET, ReattachMode.REATTACH_RESET_DROP], +) def test_tenant_reattach( - neon_env_builder: NeonEnvBuilder, - remote_storage_kind: RemoteStorageKind, + neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind, mode: str ): neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind) @@ -100,8 +110,15 @@ def test_tenant_reattach( ps_metrics.query_one("pageserver_last_record_lsn", filter=tenant_metric_filter).value ) - pageserver_http.tenant_detach(tenant_id) - pageserver_http.tenant_attach(tenant_id) + if mode == ReattachMode.REATTACH_EXPLICIT: + # Explicitly detach then attach the tenant as two separate API calls + pageserver_http.tenant_detach(tenant_id) + pageserver_http.tenant_attach(tenant_id) + elif mode in (ReattachMode.REATTACH_RESET, ReattachMode.REATTACH_RESET_DROP): + # Use the reset API to detach/attach in one shot + pageserver_http.tenant_reset(tenant_id, mode == ReattachMode.REATTACH_RESET_DROP) + else: + raise NotImplementedError(mode) time.sleep(1) # for metrics propagation