diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index a153f1a01e..18b212a87c 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -201,3 +201,13 @@ pub struct FailpointConfig { pub struct TimelineGcRequest { pub gc_horizon: Option, } + +#[derive(Debug, Serialize, Deserialize)] +pub struct TenantSetBackgroundActivityRequest { + pub run_backround_jobs: bool, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct TenantSetBackgroundActivityResponse { + pub msg: String, +} diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 489adbb2cf..416b8a0bdf 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -3,6 +3,9 @@ use std::sync::Arc; use anyhow::{anyhow, Context, Result}; use hyper::StatusCode; use hyper::{Body, Request, Response, Uri}; +use pageserver_api::models::{ + TenantSetBackgroundActivityRequest, TenantSetBackgroundActivityResponse, +}; use remote_storage::GenericRemoteStorage; use tokio::task::JoinError; use tracing::*; @@ -13,11 +16,12 @@ use super::models::{ TimelineCreateRequest, }; use crate::pgdatadir_mapping::LsnForTimestamp; -use crate::storage_sync; use crate::storage_sync::index::{RemoteIndex, RemoteTimeline}; +use crate::task_mgr::TaskKind; use crate::tenant::{TenantState, Timeline}; use crate::tenant_config::TenantConfOpt; use crate::{config::PageServerConf, tenant_mgr}; +use crate::{storage_sync, task_mgr}; use utils::{ auth::JwtAuth, http::{ @@ -570,6 +574,63 @@ async fn tenant_status(request: Request) -> Result, ApiErro ) } +async fn tenant_set_background_activity( + mut request: Request, +) -> Result, ApiError> { + let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; + check_permission(&request, Some(tenant_id))?; + + let request: TenantSetBackgroundActivityRequest = json_request(&mut request).await?; + + let tenant = tenant_mgr::get_tenant(tenant_id, false).map_err(ApiError::NotFound)?; + + let modified = tenant.set_state_with(|old_state| { + let background_jobs_running = match old_state { + TenantState::Active { + background_jobs_running, + } => background_jobs_running, + _ => return None, + }; + + match (request.run_backround_jobs, background_jobs_running) { + (true, true) => None, + (false, false) => None, + (true, false) => Some(TenantState::Active { + background_jobs_running: true, + }), + (false, true) => { + // tasks will eventually shut down after that, but we need a guarantee + // that they've stopped so explicitly waiting for it + Some(TenantState::Active { + background_jobs_running: false, + }) + } + } + }); + + if !modified { + return Ok(json_response( + StatusCode::NOT_MODIFIED, + TenantSetBackgroundActivityResponse { msg: "".to_owned() }, + )?); + } + + // state was modified and request values was set to false which means we changed state + // and now need to wait for tasks shutdown + // XXX can it be changed second time here? and modified flag be outdated now? + if modified && !request.run_backround_jobs { + task_mgr::shutdown_tasks(Some(TaskKind::Compaction), Some(tenant_id), None).await; + task_mgr::shutdown_tasks(Some(TaskKind::GarbageCollector), Some(tenant_id), None).await; + } + + Ok(json_response( + StatusCode::OK, + TenantSetBackgroundActivityResponse { + msg: format!("run background jobs set to {}", request.run_backround_jobs), + }, + )?) +} + // Helper function to standardize the error messages we produce on bad durations // // Intended to be used with anyhow's `with_context`, e.g.: @@ -904,6 +965,10 @@ pub fn make_router( .post("/v1/tenant/:tenant_id/timeline", timeline_create_handler) .post("/v1/tenant/:tenant_id/attach", tenant_attach_handler) .post("/v1/tenant/:tenant_id/detach", tenant_detach_handler) + .post( + "/v1/tenant/:tenant_id/set_background_activity", + tenant_set_background_activity, + ) .get( "/v1/tenant/:tenant_id/timeline/:timeline_id", timeline_detail_handler, diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 93c473f0fe..1d34a170de 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -655,22 +655,42 @@ impl Tenant { } pub fn set_state(&self, new_state: TenantState) { - match (self.current_state(), new_state) { - (equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => { - debug!("Ignoring new state, equal to the existing one: {equal_state_2:?}"); - } - (TenantState::Broken, _) => { - error!("Ignoring state update {new_state:?} for broken tenant"); - } - (_, new_state) => { - self.state.send_replace(new_state); - if self.should_run_tasks() { - // Spawn gc and compaction loops. The loops will shut themselves - // down when they notice that the tenant is inactive. - crate::tenant_tasks::start_background_loops(self.tenant_id); + self.set_state_with(|_| Some(new_state)); + } + + pub fn set_state_with(&self, f: F) -> bool + where + F: FnOnce(&mut TenantState) -> Option, + { + let modify = |old_state: &mut TenantState| { + let new_state = match f(old_state) { + None => return false, + Some(new_state) => new_state, + }; + + match (old_state, new_state) { + (equal_state_1, equal_state_2) if equal_state_1 == &equal_state_2 => { + debug!("Ignoring new state, equal to the existing one: {equal_state_2:?}"); + false + } + (TenantState::Broken, _) => { + error!("Ignoring state update {new_state:?} for broken tenant"); + false + } + (old_state, new_state) => { + *old_state = new_state; + true } } + }; + + let modified = self.state.send_if_modified(modify); + if modified && self.should_run_tasks() { + // Spawn gc and compaction loops. The loops will shut themselves + // down when they notice that the tenant is inactive. + crate::tenant_tasks::start_background_loops(self.tenant_id); } + modified } pub fn subscribe_for_state_updates(&self) -> watch::Receiver {