Compare commits

...

1 Commits

Author SHA1 Message Date
Dmitry Rodionov
4c1cb890db try to toggle background activity without a race condition 2022-10-20 22:30:42 +03:00
3 changed files with 109 additions and 14 deletions

View File

@@ -201,3 +201,13 @@ pub struct FailpointConfig {
pub struct TimelineGcRequest {
pub gc_horizon: Option<u64>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct TenantSetBackgroundActivityRequest {
pub run_backround_jobs: bool,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct TenantSetBackgroundActivityResponse {
pub msg: String,
}

View File

@@ -3,6 +3,9 @@ use std::sync::Arc;
use anyhow::{anyhow, Context, Result};
use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
use pageserver_api::models::{
TenantSetBackgroundActivityRequest, TenantSetBackgroundActivityResponse,
};
use remote_storage::GenericRemoteStorage;
use tokio::task::JoinError;
use tracing::*;
@@ -13,11 +16,12 @@ use super::models::{
TimelineCreateRequest,
};
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::storage_sync;
use crate::storage_sync::index::{RemoteIndex, RemoteTimeline};
use crate::task_mgr::TaskKind;
use crate::tenant::{TenantState, Timeline};
use crate::tenant_config::TenantConfOpt;
use crate::{config::PageServerConf, tenant_mgr};
use crate::{storage_sync, task_mgr};
use utils::{
auth::JwtAuth,
http::{
@@ -570,6 +574,63 @@ async fn tenant_status(request: Request<Body>) -> Result<Response<Body>, ApiErro
)
}
async fn tenant_set_background_activity(
mut request: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let request: TenantSetBackgroundActivityRequest = json_request(&mut request).await?;
let tenant = tenant_mgr::get_tenant(tenant_id, false).map_err(ApiError::NotFound)?;
let modified = tenant.set_state_with(|old_state| {
let background_jobs_running = match old_state {
TenantState::Active {
background_jobs_running,
} => background_jobs_running,
_ => return None,
};
match (request.run_backround_jobs, background_jobs_running) {
(true, true) => None,
(false, false) => None,
(true, false) => Some(TenantState::Active {
background_jobs_running: true,
}),
(false, true) => {
// tasks will eventually shut down after that, but we need a guarantee
// that they've stopped so explicitly waiting for it
Some(TenantState::Active {
background_jobs_running: false,
})
}
}
});
if !modified {
return Ok(json_response(
StatusCode::NOT_MODIFIED,
TenantSetBackgroundActivityResponse { msg: "".to_owned() },
)?);
}
// state was modified and request values was set to false which means we changed state
// and now need to wait for tasks shutdown
// XXX can it be changed second time here? and modified flag be outdated now?
if modified && !request.run_backround_jobs {
task_mgr::shutdown_tasks(Some(TaskKind::Compaction), Some(tenant_id), None).await;
task_mgr::shutdown_tasks(Some(TaskKind::GarbageCollector), Some(tenant_id), None).await;
}
Ok(json_response(
StatusCode::OK,
TenantSetBackgroundActivityResponse {
msg: format!("run background jobs set to {}", request.run_backround_jobs),
},
)?)
}
// Helper function to standardize the error messages we produce on bad durations
//
// Intended to be used with anyhow's `with_context`, e.g.:
@@ -904,6 +965,10 @@ pub fn make_router(
.post("/v1/tenant/:tenant_id/timeline", timeline_create_handler)
.post("/v1/tenant/:tenant_id/attach", tenant_attach_handler)
.post("/v1/tenant/:tenant_id/detach", tenant_detach_handler)
.post(
"/v1/tenant/:tenant_id/set_background_activity",
tenant_set_background_activity,
)
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id",
timeline_detail_handler,

View File

@@ -655,22 +655,42 @@ impl Tenant {
}
pub fn set_state(&self, new_state: TenantState) {
match (self.current_state(), new_state) {
(equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => {
debug!("Ignoring new state, equal to the existing one: {equal_state_2:?}");
}
(TenantState::Broken, _) => {
error!("Ignoring state update {new_state:?} for broken tenant");
}
(_, new_state) => {
self.state.send_replace(new_state);
if self.should_run_tasks() {
// Spawn gc and compaction loops. The loops will shut themselves
// down when they notice that the tenant is inactive.
crate::tenant_tasks::start_background_loops(self.tenant_id);
self.set_state_with(|_| Some(new_state));
}
pub fn set_state_with<F>(&self, f: F) -> bool
where
F: FnOnce(&mut TenantState) -> Option<TenantState>,
{
let modify = |old_state: &mut TenantState| {
let new_state = match f(old_state) {
None => return false,
Some(new_state) => new_state,
};
match (old_state, new_state) {
(equal_state_1, equal_state_2) if equal_state_1 == &equal_state_2 => {
debug!("Ignoring new state, equal to the existing one: {equal_state_2:?}");
false
}
(TenantState::Broken, _) => {
error!("Ignoring state update {new_state:?} for broken tenant");
false
}
(old_state, new_state) => {
*old_state = new_state;
true
}
}
};
let modified = self.state.send_if_modified(modify);
if modified && self.should_run_tasks() {
// Spawn gc and compaction loops. The loops will shut themselves
// down when they notice that the tenant is inactive.
crate::tenant_tasks::start_background_loops(self.tenant_id);
}
modified
}
pub fn subscribe_for_state_updates(&self) -> watch::Receiver<TenantState> {