diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index c43416d0a1..6854cdc90b 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -889,6 +889,20 @@ async fn update_tenant_config_handler( json_response(StatusCode::OK, ()) } +/// Testing helper to transition a tenant to [`crate::tenant::TenantState::Broken`]. +#[cfg(feature = "testing")] +async fn handle_tenant_break(r: Request) -> Result, ApiError> { + let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; + + let tenant = crate::tenant::mgr::get_tenant(tenant_id, true) + .await + .map_err(|_| ApiError::Conflict(String::from("no active tenant found")))?; + + tenant.set_broken("broken from test"); + + json_response(StatusCode::OK, ()) +} + #[cfg(feature = "testing")] async fn failpoints_handler(mut request: Request) -> Result, ApiError> { if !fail::has_failpoints() { @@ -1038,6 +1052,86 @@ async fn always_panic_handler(req: Request) -> Result, ApiE json_response(StatusCode::NO_CONTENT, ()) } +async fn disk_usage_eviction_run(mut r: Request) -> Result, ApiError> { + check_permission(&r, None)?; + + #[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)] + struct Config { + /// How many bytes to evict before reporting that pressure is relieved. + evict_bytes: u64, + } + + #[derive(Debug, Clone, Copy, serde::Serialize)] + struct Usage { + // remains unchanged after instantiation of the struct + config: Config, + // updated by `add_available_bytes` + freed_bytes: u64, + } + + impl crate::disk_usage_eviction_task::Usage for Usage { + fn has_pressure(&self) -> bool { + self.config.evict_bytes > self.freed_bytes + } + + fn add_available_bytes(&mut self, bytes: u64) { + self.freed_bytes += bytes; + } + } + + let config = json_request::(&mut r) + .await + .map_err(|_| ApiError::BadRequest(anyhow::anyhow!("invalid JSON body")))?; + + let usage = Usage { + config, + freed_bytes: 0, + }; + + use crate::task_mgr::MGMT_REQUEST_RUNTIME; + + let (tx, rx) = tokio::sync::oneshot::channel(); + + let state = get_state(&r); + + let Some(storage) = state.remote_storage.clone() else { + return Err(ApiError::InternalServerError(anyhow::anyhow!( + "remote storage not configured, cannot run eviction iteration" + ))) + }; + + let cancel = CancellationToken::new(); + let child_cancel = cancel.clone(); + let _g = cancel.drop_guard(); + + crate::task_mgr::spawn( + MGMT_REQUEST_RUNTIME.handle(), + TaskKind::DiskUsageEviction, + None, + None, + "ondemand disk usage eviction", + false, + async move { + let res = crate::disk_usage_eviction_task::disk_usage_eviction_task_iteration_impl( + &storage, + usage, + &child_cancel, + ) + .await; + + info!(?res, "disk_usage_eviction_task_iteration_impl finished"); + + let _ = tx.send(res); + Ok(()) + } + .in_current_span(), + ); + + let response = rx.await.unwrap().map_err(ApiError::InternalServerError)?; + + json_response(StatusCode::OK, response) +} + async fn handler_404(_: Request) -> Result, ApiError> { json_response( StatusCode::NOT_FOUND, @@ -1185,97 +1279,3 @@ pub fn make_router( .get("/v1/panic", |r| RequestSpan(always_panic_handler).handle(r)) .any(handler_404)) } - -/// Testing helper to transition a tenant to [`crate::tenant::TenantState::Broken`]. -#[cfg(feature = "testing")] -async fn handle_tenant_break(r: Request) -> Result, ApiError> { - let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; - - let tenant = crate::tenant::mgr::get_tenant(tenant_id, true) - .await - .map_err(|_| ApiError::Conflict(String::from("no active tenant found")))?; - - tenant.set_broken("broken from test"); - - json_response(StatusCode::OK, ()) -} - -async fn disk_usage_eviction_run(mut r: Request) -> Result, ApiError> { - check_permission(&r, None)?; - - #[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)] - struct Config { - /// How many bytes to evict before reporting that pressure is relieved. - evict_bytes: u64, - } - - #[derive(Debug, Clone, Copy, serde::Serialize)] - struct Usage { - // remains unchanged after instantiation of the struct - config: Config, - // updated by `add_available_bytes` - freed_bytes: u64, - } - - impl crate::disk_usage_eviction_task::Usage for Usage { - fn has_pressure(&self) -> bool { - self.config.evict_bytes > self.freed_bytes - } - - fn add_available_bytes(&mut self, bytes: u64) { - self.freed_bytes += bytes; - } - } - - let config = json_request::(&mut r) - .await - .map_err(|_| ApiError::BadRequest(anyhow::anyhow!("invalid JSON body")))?; - - let usage = Usage { - config, - freed_bytes: 0, - }; - - use crate::task_mgr::MGMT_REQUEST_RUNTIME; - - let (tx, rx) = tokio::sync::oneshot::channel(); - - let state = get_state(&r); - - let Some(storage) = state.remote_storage.clone() else { - return Err(ApiError::InternalServerError(anyhow::anyhow!( - "remote storage not configured, cannot run eviction iteration" - ))) - }; - - let cancel = CancellationToken::new(); - let child_cancel = cancel.clone(); - let _g = cancel.drop_guard(); - - crate::task_mgr::spawn( - MGMT_REQUEST_RUNTIME.handle(), - TaskKind::DiskUsageEviction, - None, - None, - "ondemand disk usage eviction", - false, - async move { - let res = crate::disk_usage_eviction_task::disk_usage_eviction_task_iteration_impl( - &storage, - usage, - &child_cancel, - ) - .await; - - info!(?res, "disk_usage_eviction_task_iteration_impl finished"); - - let _ = tx.send(res); - Ok(()) - } - .in_current_span(), - ); - - let response = rx.await.unwrap().map_err(ApiError::InternalServerError)?; - - json_response(StatusCode::OK, response) -}