diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs
index 71b7ea05ec..14b667eeba 100644
--- a/pageserver/src/http/routes.rs
+++ b/pageserver/src/http/routes.rs
@@ -709,6 +709,26 @@ async fn tenant_detach_handler(
json_response(StatusCode::OK, ())
}
+async fn tenant_reset_handler(
+ request: Request
,
+ _cancel: CancellationToken,
+) -> Result, ApiError> {
+ let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
+ check_permission(&request, Some(tenant_shard_id.tenant_id))?;
+
+ let drop_cache: Option = parse_query_param(&request, "drop_cache")?;
+
+ let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
+ let state = get_state(&request);
+ state
+ .tenant_manager
+ .reset_tenant(tenant_shard_id, drop_cache.unwrap_or(false), ctx)
+ .await
+ .map_err(ApiError::InternalServerError)?;
+
+ json_response(StatusCode::OK, ())
+}
+
async fn tenant_load_handler(
mut request: Request,
_cancel: CancellationToken,
@@ -1828,6 +1848,9 @@ pub fn make_router(
.post("/v1/tenant/:tenant_id/detach", |r| {
api_handler(r, tenant_detach_handler)
})
+ .post("/v1/tenant/:tenant_shard_id/reset", |r| {
+ api_handler(r, tenant_reset_handler)
+ })
.post("/v1/tenant/:tenant_id/load", |r| {
api_handler(r, tenant_load_handler)
})
diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs
index c09270112f..d9d44d1f8f 100644
--- a/pageserver/src/tenant/mgr.rs
+++ b/pageserver/src/tenant/mgr.rs
@@ -1038,6 +1038,81 @@ impl TenantManager {
Ok(())
}
+
+ /// Resetting a tenant is equivalent to detaching it, then attaching it again with the same
+ /// LocationConf that was last used to attach it. Optionally, the local file cache may be
+ /// dropped before re-attaching.
+ ///
+ /// This is not part of a tenant's normal lifecycle: it is used for debug/support, in situations
+ /// where an issue is identified that would go away with a restart of the tenant.
+ ///
+ /// This does not have any special "force" shutdown of a tenant: it relies on the tenant's tasks
+ /// to respect the cancellation tokens used in normal shutdown().
+ #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %drop_cache))]
+ pub(crate) async fn reset_tenant(
+ &self,
+ tenant_shard_id: TenantShardId,
+ drop_cache: bool,
+ ctx: RequestContext,
+ ) -> anyhow::Result<()> {
+ let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
+ let Some(old_slot) = slot_guard.get_old_value() else {
+ anyhow::bail!("Tenant not found when trying to reset");
+ };
+
+ let Some(tenant) = old_slot.get_attached() else {
+ slot_guard.revert();
+ anyhow::bail!("Tenant is not in attached state");
+ };
+
+ let (_guard, progress) = utils::completion::channel();
+ match tenant.shutdown(progress, false).await {
+ Ok(()) => {
+ slot_guard.drop_old_value()?;
+ }
+ Err(_barrier) => {
+ slot_guard.revert();
+ anyhow::bail!("Cannot reset Tenant, already shutting down");
+ }
+ }
+
+ let tenant_path = self.conf.tenant_path(&tenant_shard_id);
+ let timelines_path = self.conf.timelines_path(&tenant_shard_id);
+ let config = Tenant::load_tenant_config(self.conf, &tenant_shard_id)?;
+
+ if drop_cache {
+ tracing::info!("Dropping local file cache");
+
+ match tokio::fs::read_dir(&timelines_path).await {
+ Err(e) => {
+ tracing::warn!("Failed to list timelines while dropping cache: {}", e);
+ }
+ Ok(mut entries) => {
+ while let Some(entry) = entries.next_entry().await? {
+ tokio::fs::remove_dir_all(entry.path()).await?;
+ }
+ }
+ }
+ }
+
+ let shard_identity = config.shard;
+ let tenant = tenant_spawn(
+ self.conf,
+ tenant_shard_id,
+ &tenant_path,
+ self.resources.clone(),
+ AttachedTenantConf::try_from(config)?,
+ shard_identity,
+ None,
+ self.tenants,
+ SpawnMode::Normal,
+ &ctx,
+ )?;
+
+ slot_guard.upsert(TenantSlot::Attached(tenant))?;
+
+ Ok(())
+ }
}
#[derive(Debug, thiserror::Error)]
@@ -1246,8 +1321,7 @@ pub(crate) async fn delete_tenant(
// See https://github.com/neondatabase/neon/issues/5080
// TODO(sharding): make delete API sharding-aware
- let mut slot_guard =
- tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist)?;
+ let slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist)?;
// unwrap is safe because we used MustExist mode when acquiring
let tenant = match slot_guard.get_old_value().as_ref().unwrap() {
@@ -1574,9 +1648,10 @@ pub enum TenantSlotUpsertError {
MapState(#[from] TenantMapError),
}
-#[derive(Debug)]
+#[derive(Debug, thiserror::Error)]
enum TenantSlotDropError {
/// It is only legal to drop a TenantSlot if its contents are fully shut down
+ #[error("Tenant was not shut down")]
NotShutdown,
}
@@ -1636,9 +1711,9 @@ impl SlotGuard {
}
}
- /// Take any value that was present in the slot before we acquired ownership
+ /// Get any value that was present in the slot before we acquired ownership
/// of it: in state transitions, this will be the old state.
- fn get_old_value(&mut self) -> &Option {
+ fn get_old_value(&self) -> &Option {
&self.old_value
}
diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py
index 76aa40122f..eccab5fb6a 100644
--- a/test_runner/fixtures/pageserver/http.py
+++ b/test_runner/fixtures/pageserver/http.py
@@ -260,6 +260,14 @@ class PageserverHttpClient(requests.Session):
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/detach", params=params)
self.verbose_error(res)
+ def tenant_reset(self, tenant_id: TenantId, drop_cache: bool):
+ params = {}
+ if drop_cache:
+ params["drop_cache"] = "true"
+
+ res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/reset", params=params)
+ self.verbose_error(res)
+
def tenant_delete(self, tenant_id: TenantId):
res = self.delete(f"http://localhost:{self.port}/v1/tenant/{tenant_id}")
self.verbose_error(res)
diff --git a/test_runner/regress/test_tenant_detach.py b/test_runner/regress/test_tenant_detach.py
index 0bd3800480..df497c0f7b 100644
--- a/test_runner/regress/test_tenant_detach.py
+++ b/test_runner/regress/test_tenant_detach.py
@@ -1,4 +1,5 @@
import asyncio
+import enum
import random
import time
from threading import Thread
@@ -51,11 +52,20 @@ def do_gc_target(
log.info("gc http thread returning")
+class ReattachMode(str, enum.Enum):
+ REATTACH_EXPLICIT = "explicit"
+ REATTACH_RESET = "reset"
+ REATTACH_RESET_DROP = "reset"
+
+
# Basic detach and re-attach test
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
+@pytest.mark.parametrize(
+ "mode",
+ [ReattachMode.REATTACH_EXPLICIT, ReattachMode.REATTACH_RESET, ReattachMode.REATTACH_RESET_DROP],
+)
def test_tenant_reattach(
- neon_env_builder: NeonEnvBuilder,
- remote_storage_kind: RemoteStorageKind,
+ neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind, mode: str
):
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
@@ -100,8 +110,15 @@ def test_tenant_reattach(
ps_metrics.query_one("pageserver_last_record_lsn", filter=tenant_metric_filter).value
)
- pageserver_http.tenant_detach(tenant_id)
- pageserver_http.tenant_attach(tenant_id)
+ if mode == ReattachMode.REATTACH_EXPLICIT:
+ # Explicitly detach then attach the tenant as two separate API calls
+ pageserver_http.tenant_detach(tenant_id)
+ pageserver_http.tenant_attach(tenant_id)
+ elif mode in (ReattachMode.REATTACH_RESET, ReattachMode.REATTACH_RESET_DROP):
+ # Use the reset API to detach/attach in one shot
+ pageserver_http.tenant_reset(tenant_id, mode == ReattachMode.REATTACH_RESET_DROP)
+ else:
+ raise NotImplementedError(mode)
time.sleep(1) # for metrics propagation