mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-10 23:12:54 +00:00
pageserver: add a /reset API for tenants (#6014)
## Problem Traditionally we would detach/attach directly with curl if we wanted to "reboot" a single tenant. That's kind of inconvenient these days, because one needs to know a generation number to issue an attach request. Closes: https://github.com/neondatabase/neon/issues/6011 ## Summary of changes - Introduce a new `/reset` API, which remembers the LocationConf from the current attachment so that callers do not have to work out the correct configuration/generation to use. - As an additional support tool, allow an optional `drop_cache` query parameter, for situations where we are concerned that some on-disk state might be bad and want to clear that as well as the in-memory state. One might wonder why I didn't call this "reattach" -- it's because there's already a PS->CP API of that name and it could get confusing.
This commit is contained in:
@@ -709,6 +709,26 @@ async fn tenant_detach_handler(
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn tenant_reset_handler(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
|
||||
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||
|
||||
let drop_cache: Option<bool> = parse_query_param(&request, "drop_cache")?;
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
|
||||
let state = get_state(&request);
|
||||
state
|
||||
.tenant_manager
|
||||
.reset_tenant(tenant_shard_id, drop_cache.unwrap_or(false), ctx)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn tenant_load_handler(
|
||||
mut request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
@@ -1828,6 +1848,9 @@ pub fn make_router(
|
||||
.post("/v1/tenant/:tenant_id/detach", |r| {
|
||||
api_handler(r, tenant_detach_handler)
|
||||
})
|
||||
.post("/v1/tenant/:tenant_shard_id/reset", |r| {
|
||||
api_handler(r, tenant_reset_handler)
|
||||
})
|
||||
.post("/v1/tenant/:tenant_id/load", |r| {
|
||||
api_handler(r, tenant_load_handler)
|
||||
})
|
||||
|
||||
@@ -1038,6 +1038,81 @@ impl TenantManager {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Resetting a tenant is equivalent to detaching it, then attaching it again with the same
|
||||
/// LocationConf that was last used to attach it. Optionally, the local file cache may be
|
||||
/// dropped before re-attaching.
|
||||
///
|
||||
/// This is not part of a tenant's normal lifecycle: it is used for debug/support, in situations
|
||||
/// where an issue is identified that would go away with a restart of the tenant.
|
||||
///
|
||||
/// This does not have any special "force" shutdown of a tenant: it relies on the tenant's tasks
|
||||
/// to respect the cancellation tokens used in normal shutdown().
|
||||
#[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %drop_cache))]
|
||||
pub(crate) async fn reset_tenant(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
drop_cache: bool,
|
||||
ctx: RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
|
||||
let Some(old_slot) = slot_guard.get_old_value() else {
|
||||
anyhow::bail!("Tenant not found when trying to reset");
|
||||
};
|
||||
|
||||
let Some(tenant) = old_slot.get_attached() else {
|
||||
slot_guard.revert();
|
||||
anyhow::bail!("Tenant is not in attached state");
|
||||
};
|
||||
|
||||
let (_guard, progress) = utils::completion::channel();
|
||||
match tenant.shutdown(progress, false).await {
|
||||
Ok(()) => {
|
||||
slot_guard.drop_old_value()?;
|
||||
}
|
||||
Err(_barrier) => {
|
||||
slot_guard.revert();
|
||||
anyhow::bail!("Cannot reset Tenant, already shutting down");
|
||||
}
|
||||
}
|
||||
|
||||
let tenant_path = self.conf.tenant_path(&tenant_shard_id);
|
||||
let timelines_path = self.conf.timelines_path(&tenant_shard_id);
|
||||
let config = Tenant::load_tenant_config(self.conf, &tenant_shard_id)?;
|
||||
|
||||
if drop_cache {
|
||||
tracing::info!("Dropping local file cache");
|
||||
|
||||
match tokio::fs::read_dir(&timelines_path).await {
|
||||
Err(e) => {
|
||||
tracing::warn!("Failed to list timelines while dropping cache: {}", e);
|
||||
}
|
||||
Ok(mut entries) => {
|
||||
while let Some(entry) = entries.next_entry().await? {
|
||||
tokio::fs::remove_dir_all(entry.path()).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let shard_identity = config.shard;
|
||||
let tenant = tenant_spawn(
|
||||
self.conf,
|
||||
tenant_shard_id,
|
||||
&tenant_path,
|
||||
self.resources.clone(),
|
||||
AttachedTenantConf::try_from(config)?,
|
||||
shard_identity,
|
||||
None,
|
||||
self.tenants,
|
||||
SpawnMode::Normal,
|
||||
&ctx,
|
||||
)?;
|
||||
|
||||
slot_guard.upsert(TenantSlot::Attached(tenant))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
@@ -1246,8 +1321,7 @@ pub(crate) async fn delete_tenant(
|
||||
// See https://github.com/neondatabase/neon/issues/5080
|
||||
|
||||
// TODO(sharding): make delete API sharding-aware
|
||||
let mut slot_guard =
|
||||
tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist)?;
|
||||
let slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist)?;
|
||||
|
||||
// unwrap is safe because we used MustExist mode when acquiring
|
||||
let tenant = match slot_guard.get_old_value().as_ref().unwrap() {
|
||||
@@ -1574,9 +1648,10 @@ pub enum TenantSlotUpsertError {
|
||||
MapState(#[from] TenantMapError),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
enum TenantSlotDropError {
|
||||
/// It is only legal to drop a TenantSlot if its contents are fully shut down
|
||||
#[error("Tenant was not shut down")]
|
||||
NotShutdown,
|
||||
}
|
||||
|
||||
@@ -1636,9 +1711,9 @@ impl SlotGuard {
|
||||
}
|
||||
}
|
||||
|
||||
/// Take any value that was present in the slot before we acquired ownership
|
||||
/// Get any value that was present in the slot before we acquired ownership
|
||||
/// of it: in state transitions, this will be the old state.
|
||||
fn get_old_value(&mut self) -> &Option<TenantSlot> {
|
||||
fn get_old_value(&self) -> &Option<TenantSlot> {
|
||||
&self.old_value
|
||||
}
|
||||
|
||||
|
||||
@@ -260,6 +260,14 @@ class PageserverHttpClient(requests.Session):
|
||||
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/detach", params=params)
|
||||
self.verbose_error(res)
|
||||
|
||||
def tenant_reset(self, tenant_id: TenantId, drop_cache: bool):
|
||||
params = {}
|
||||
if drop_cache:
|
||||
params["drop_cache"] = "true"
|
||||
|
||||
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/reset", params=params)
|
||||
self.verbose_error(res)
|
||||
|
||||
def tenant_delete(self, tenant_id: TenantId):
|
||||
res = self.delete(f"http://localhost:{self.port}/v1/tenant/{tenant_id}")
|
||||
self.verbose_error(res)
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import asyncio
|
||||
import enum
|
||||
import random
|
||||
import time
|
||||
from threading import Thread
|
||||
@@ -51,11 +52,20 @@ def do_gc_target(
|
||||
log.info("gc http thread returning")
|
||||
|
||||
|
||||
class ReattachMode(str, enum.Enum):
|
||||
REATTACH_EXPLICIT = "explicit"
|
||||
REATTACH_RESET = "reset"
|
||||
REATTACH_RESET_DROP = "reset"
|
||||
|
||||
|
||||
# Basic detach and re-attach test
|
||||
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
|
||||
@pytest.mark.parametrize(
|
||||
"mode",
|
||||
[ReattachMode.REATTACH_EXPLICIT, ReattachMode.REATTACH_RESET, ReattachMode.REATTACH_RESET_DROP],
|
||||
)
|
||||
def test_tenant_reattach(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
remote_storage_kind: RemoteStorageKind,
|
||||
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind, mode: str
|
||||
):
|
||||
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
|
||||
|
||||
@@ -100,8 +110,15 @@ def test_tenant_reattach(
|
||||
ps_metrics.query_one("pageserver_last_record_lsn", filter=tenant_metric_filter).value
|
||||
)
|
||||
|
||||
pageserver_http.tenant_detach(tenant_id)
|
||||
pageserver_http.tenant_attach(tenant_id)
|
||||
if mode == ReattachMode.REATTACH_EXPLICIT:
|
||||
# Explicitly detach then attach the tenant as two separate API calls
|
||||
pageserver_http.tenant_detach(tenant_id)
|
||||
pageserver_http.tenant_attach(tenant_id)
|
||||
elif mode in (ReattachMode.REATTACH_RESET, ReattachMode.REATTACH_RESET_DROP):
|
||||
# Use the reset API to detach/attach in one shot
|
||||
pageserver_http.tenant_reset(tenant_id, mode == ReattachMode.REATTACH_RESET_DROP)
|
||||
else:
|
||||
raise NotImplementedError(mode)
|
||||
|
||||
time.sleep(1) # for metrics propagation
|
||||
|
||||
|
||||
Reference in New Issue
Block a user