Remove background_jobs_enabled, move code from tenant_mgr.rs to tenant.rs

This commit is contained in:
Heikki Linnakangas
2022-11-24 00:26:06 +02:00
committed by Christian Schwarz
parent 0d533ce840
commit 78338f7b94
8 changed files with 191 additions and 220 deletions

View File

@@ -19,10 +19,8 @@ pub enum TenantState {
Loading,
// This tenant is being downloaded from cloud storage.
Attaching,
/// Tenant is fully operational, its background jobs might be running or not.
Active {
background_jobs_running: bool,
},
/// Tenant is fully operational
Active,
/// A tenant is recognized by pageserver, but it is being detached or the system is being
/// shut down.
Paused,
@@ -36,9 +34,7 @@ impl TenantState {
match self {
Self::Loading => true,
Self::Attaching => true,
Self::Active {
background_jobs_running: _,
} => false,
Self::Active => false,
Self::Paused => false,
Self::Broken => false,
}

View File

@@ -604,13 +604,7 @@ components:
id:
type: string
state:
oneOf:
- type: string
- type: object
properties:
background_jobs_running:
type: boolean
type: string
current_physical_size:
type: integer
has_in_progress_downloads:

View File

@@ -648,7 +648,7 @@ impl Tenant {
// We're ready for business.
// Change to active state under the hood spawns background loops
// The loops will shut themselves down when they notice that the tenant is inactive.
self.activate(true);
self.activate();
info!("Done");
@@ -703,6 +703,45 @@ impl Tenant {
.await
}
pub fn create_tenant(
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
tenant_id: TenantId,
remote_storage: Option<GenericRemoteStorage>,
) -> anyhow::Result<Arc<Tenant>> {
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id));
create_tenant_files(conf, tenant_conf, tenant_id)?;
// create tenant in Active state so it is possible to issue create_timeline
// request. Which on timeline activation will trigger tenant activation
let tenant = Arc::new(Tenant::new(
TenantState::Active,
conf,
tenant_conf,
wal_redo_manager,
tenant_id,
remote_storage,
));
Ok(tenant)
}
/// Create a placeholder Tenant object for a broken tenant
pub fn create_broken_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
) -> Arc<Tenant> {
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id));
let tenant = Arc::new(Tenant::new(
TenantState::Broken,
conf,
TenantConfOpt::default(),
wal_redo_manager,
tenant_id,
None,
));
tenant
}
///
/// Load a tenant that's available on local disk
///
@@ -884,9 +923,6 @@ impl Tenant {
// FIXME original collect_timeline_files contained one more check:
// 1. "Timeline has no ancestor and no layer files"
// XXX get rid of enable_background_jobs
let enable_background_jobs = !sorted_timelines.is_empty();
for (timeline_id, local_metadata) in sorted_timelines {
// FIXME should we fail load of whole tenant if one timeline failed?
// consider branch hierarchy. Maybe set one to broken and others to Paused or something
@@ -897,7 +933,7 @@ impl Tenant {
// We're ready for business.
// Change to active state under the hood spawns background loops
// The loops will shut themselves down when they notice that the tenant is inactive.
self.activate(enable_background_jobs);
self.activate();
info!("Done");
@@ -1095,9 +1131,6 @@ impl Tenant {
None => self.bootstrap_timeline(new_timeline_id, pg_version).await?,
};
// Have added new timeline into the tenant, now its background tasks are needed.
self.activate(true);
Ok(Some(loaded_timeline))
}
@@ -1237,24 +1270,13 @@ impl Tenant {
}
pub fn is_active(&self) -> bool {
matches!(self.current_state(), TenantState::Active { .. })
}
pub fn should_run_tasks(&self) -> bool {
matches!(
self.current_state(),
TenantState::Active {
background_jobs_running: true
}
)
self.current_state() == TenantState::Active
}
/// Changes tenant status to active, if it was not broken before.
/// Otherwise, ignores the state change, logging an error.
pub fn activate(&self, enable_background_jobs: bool) {
self.set_state(TenantState::Active {
background_jobs_running: enable_background_jobs,
});
fn activate(&self) {
self.set_state(TenantState::Active);
}
pub fn set_state(&self, new_state: TenantState) {
@@ -1273,14 +1295,10 @@ impl Tenant {
.values()
.filter(|timeline| timeline.current_state() != TimelineState::Broken);
match new_state {
TenantState::Active {
background_jobs_running,
} => {
if background_jobs_running {
// Spawn gc and compaction loops. The loops will shut themselves
// down when they notice that the tenant is inactive.
crate::tenant_tasks::start_background_loops(self.tenant_id);
}
TenantState::Active => {
// Spawn gc and compaction loops. The loops will shut themselves
// down when they notice that the tenant is inactive.
crate::tenant_tasks::start_background_loops(self.tenant_id);
for timeline in not_broken_timelines {
timeline.set_state(TimelineState::Active);
@@ -1474,7 +1492,7 @@ impl Tenant {
))
}
pub(super) fn new(
fn new(
state: TenantState,
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
@@ -2159,6 +2177,132 @@ fn remove_timeline_and_uninit_mark(timeline_dir: &Path, uninit_mark: &Path) -> a
Ok(())
}
fn create_tenant_files(
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
tenant_id: TenantId,
) -> anyhow::Result<()> {
let target_tenant_directory = conf.tenant_path(&tenant_id);
anyhow::ensure!(
!target_tenant_directory.exists(),
"cannot create new tenant repo: '{tenant_id}' directory already exists",
);
let temporary_tenant_dir =
path_with_suffix_extension(&target_tenant_directory, TEMP_FILE_SUFFIX);
debug!(
"Creating temporary directory structure in {}",
temporary_tenant_dir.display()
);
// top-level dir may exist if we are creating it through CLI
crashsafe::create_dir_all(&temporary_tenant_dir).with_context(|| {
format!(
"could not create temporary tenant directory {}",
temporary_tenant_dir.display()
)
})?;
let creation_result = try_create_target_tenant_dir(
conf,
tenant_conf,
tenant_id,
&temporary_tenant_dir,
&target_tenant_directory,
);
if creation_result.is_err() {
error!("Failed to create directory structure for tenant {tenant_id}, cleaning tmp data");
if let Err(e) = fs::remove_dir_all(&temporary_tenant_dir) {
error!("Failed to remove temporary tenant directory {temporary_tenant_dir:?}: {e}")
} else if let Err(e) = crashsafe::fsync(&temporary_tenant_dir) {
error!(
"Failed to fsync removed temporary tenant directory {temporary_tenant_dir:?}: {e}"
)
}
}
creation_result
}
fn try_create_target_tenant_dir(
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
tenant_id: TenantId,
temporary_tenant_dir: &Path,
target_tenant_directory: &Path,
) -> Result<(), anyhow::Error> {
let temporary_tenant_timelines_dir = rebase_directory(
&conf.timelines_path(&tenant_id),
target_tenant_directory,
temporary_tenant_dir,
)
.with_context(|| format!("Failed to resolve tenant {tenant_id} temporary timelines dir"))?;
let temporary_tenant_config_path = rebase_directory(
&conf.tenant_config_path(tenant_id),
target_tenant_directory,
temporary_tenant_dir,
)
.with_context(|| format!("Failed to resolve tenant {tenant_id} temporary config path"))?;
Tenant::persist_tenant_config(&temporary_tenant_config_path, tenant_conf, true).with_context(
|| {
format!(
"Failed to write tenant {} config to {}",
tenant_id,
temporary_tenant_config_path.display()
)
},
)?;
crashsafe::create_dir(&temporary_tenant_timelines_dir).with_context(|| {
format!(
"could not create tenant {} temporary timelines directory {}",
tenant_id,
temporary_tenant_timelines_dir.display()
)
})?;
fail::fail_point!("tenant-creation-before-tmp-rename", |_| {
anyhow::bail!("failpoint tenant-creation-before-tmp-rename");
});
fs::rename(&temporary_tenant_dir, target_tenant_directory).with_context(|| {
format!(
"failed to move tenant {} temporary directory {} into the permanent one {}",
tenant_id,
temporary_tenant_dir.display(),
target_tenant_directory.display()
)
})?;
let target_dir_parent = target_tenant_directory.parent().with_context(|| {
format!(
"Failed to get tenant {} dir parent for {}",
tenant_id,
target_tenant_directory.display()
)
})?;
crashsafe::fsync(target_dir_parent).with_context(|| {
format!(
"Failed to fsync renamed directory's parent {} for tenant {}",
target_dir_parent.display(),
tenant_id,
)
})?;
Ok(())
}
fn rebase_directory(original_path: &Path, base: &Path, new_base: &Path) -> anyhow::Result<PathBuf> {
let relative_path = original_path.strip_prefix(base).with_context(|| {
format!(
"Failed to strip base prefix '{}' off path '{}'",
base.display(),
original_path.display()
)
})?;
Ok(new_base.join(relative_path))
}
/// Create the cluster temporarily in 'initdbpath' directory inside the repository
/// to get bootstrap data for timeline initialization.
fn run_initdb(
@@ -2360,9 +2504,7 @@ pub mod harness {
let walredo_mgr = Arc::new(TestRedoManager);
let tenant = Arc::new(Tenant::new(
TenantState::Active {
background_jobs_running: false,
},
TenantState::Active,
self.conf,
TenantConfOpt::from(self.tenant_conf),
walredo_mgr,

View File

@@ -4,7 +4,7 @@
use std::collections::hash_map;
use std::ffi::OsStr;
use std::fs;
use std::path::{Path, PathBuf};
use std::path::Path;
use std::sync::Arc;
use anyhow::Context;
@@ -16,10 +16,7 @@ use crate::config::PageServerConf;
use crate::task_mgr::{self, TaskKind};
use crate::tenant::{Tenant, TenantState};
use crate::tenant_config::TenantConfOpt;
use crate::walredo::PostgresRedoManager;
use crate::TEMP_FILE_SUFFIX;
use utils::crashsafe::{self, path_with_suffix_extension};
use utils::fs_ext::PathExt;
use utils::id::{TenantId, TimelineId};
@@ -150,19 +147,7 @@ fn load_local_tenant(
Tenant::spawn_attach(conf, tenant_id, &remote_storage)?
} else {
warn!("tenant {tenant_id} has attaching mark file, but pageserver has no remote storage configured");
// XXX there should be a constructor to make a broken tenant
// TODO should we use Tenant::load_tenant_config() here?
let tenant_conf = TenantConfOpt::default();
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id));
let tenant = Tenant::new(
TenantState::Broken,
conf,
tenant_conf,
wal_redo_manager,
tenant_id,
remote_storage,
);
Arc::new(tenant)
Tenant::create_broken_tenant(conf, tenant_id)
}
} else {
info!("tenant {tenant_id} is assumed to be loadable, starting load operation");
@@ -212,131 +197,6 @@ pub async fn shutdown_all_tenants() {
}
}
fn create_tenant_files(
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
tenant_id: TenantId,
) -> anyhow::Result<()> {
let target_tenant_directory = conf.tenant_path(&tenant_id);
anyhow::ensure!(
!target_tenant_directory.exists(),
"cannot create new tenant repo: '{tenant_id}' directory already exists",
);
let temporary_tenant_dir =
path_with_suffix_extension(&target_tenant_directory, TEMP_FILE_SUFFIX);
debug!(
"Creating temporary directory structure in {}",
temporary_tenant_dir.display()
);
// top-level dir may exist if we are creating it through CLI
crashsafe::create_dir_all(&temporary_tenant_dir).with_context(|| {
format!(
"could not create temporary tenant directory {}",
temporary_tenant_dir.display()
)
})?;
let creation_result = try_create_target_tenant_dir(
conf,
tenant_conf,
tenant_id,
&temporary_tenant_dir,
&target_tenant_directory,
);
if creation_result.is_err() {
error!("Failed to create directory structure for tenant {tenant_id}, cleaning tmp data");
if let Err(e) = fs::remove_dir_all(&temporary_tenant_dir) {
error!("Failed to remove temporary tenant directory {temporary_tenant_dir:?}: {e}")
} else if let Err(e) = crashsafe::fsync(&temporary_tenant_dir) {
error!(
"Failed to fsync removed temporary tenant directory {temporary_tenant_dir:?}: {e}"
)
}
}
creation_result
}
fn try_create_target_tenant_dir(
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
tenant_id: TenantId,
temporary_tenant_dir: &Path,
target_tenant_directory: &Path,
) -> Result<(), anyhow::Error> {
let temporary_tenant_timelines_dir = rebase_directory(
&conf.timelines_path(&tenant_id),
target_tenant_directory,
temporary_tenant_dir,
)
.with_context(|| format!("Failed to resolve tenant {tenant_id} temporary timelines dir"))?;
let temporary_tenant_config_path = rebase_directory(
&conf.tenant_config_path(tenant_id),
target_tenant_directory,
temporary_tenant_dir,
)
.with_context(|| format!("Failed to resolve tenant {tenant_id} temporary config path"))?;
Tenant::persist_tenant_config(&temporary_tenant_config_path, tenant_conf, true).with_context(
|| {
format!(
"Failed to write tenant {} config to {}",
tenant_id,
temporary_tenant_config_path.display()
)
},
)?;
crashsafe::create_dir(&temporary_tenant_timelines_dir).with_context(|| {
format!(
"could not create tenant {} temporary timelines directory {}",
tenant_id,
temporary_tenant_timelines_dir.display()
)
})?;
fail::fail_point!("tenant-creation-before-tmp-rename", |_| {
anyhow::bail!("failpoint tenant-creation-before-tmp-rename");
});
fs::rename(&temporary_tenant_dir, target_tenant_directory).with_context(|| {
format!(
"failed to move tenant {} temporary directory {} into the permanent one {}",
tenant_id,
temporary_tenant_dir.display(),
target_tenant_directory.display()
)
})?;
let target_dir_parent = target_tenant_directory.parent().with_context(|| {
format!(
"Failed to get tenant {} dir parent for {}",
tenant_id,
target_tenant_directory.display()
)
})?;
crashsafe::fsync(target_dir_parent).with_context(|| {
format!(
"Failed to fsync renamed directory's parent {} for tenant {}",
target_dir_parent.display(),
tenant_id,
)
})?;
Ok(())
}
fn rebase_directory(original_path: &Path, base: &Path, new_base: &Path) -> anyhow::Result<PathBuf> {
let relative_path = original_path.strip_prefix(base).with_context(|| {
format!(
"Failed to strip base prefix '{}' off path '{}'",
base.display(),
original_path.display()
)
})?;
Ok(new_base.join(relative_path))
}
pub fn create_tenant(
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
@@ -349,20 +209,12 @@ pub fn create_tenant(
Ok(None)
}
hash_map::Entry::Vacant(v) => {
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id));
create_tenant_files(conf, tenant_conf, tenant_id)?;
// create tenant in Active state so it is possible to issue create_timeline
// request. Which on timeline activation will trigger tenant activation
let tenant = Arc::new(Tenant::new(
TenantState::Active {
background_jobs_running: false,
},
let tenant = Tenant::create_tenant(
conf,
tenant_conf,
wal_redo_manager,
tenant_id,
remote_storage.cloned(),
));
)?;
v.insert(tenant);
Ok(Some(tenant_id))
}
@@ -422,9 +274,6 @@ pub async fn delete_timeline(tenant_id: TenantId, timeline_id: TimelineId) -> an
match get_tenant(tenant_id, true) {
Ok(tenant) => {
tenant.delete_timeline(timeline_id)?;
if tenant.list_timelines().is_empty() {
tenant.activate(false);
}
}
Err(e) => anyhow::bail!("Cannot access tenant {tenant_id} in local tenant state: {e:?}"),
}
@@ -436,6 +285,7 @@ pub async fn detach_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
) -> anyhow::Result<()> {
let tenant = match {
let mut tenants_accessor = tenants_state::write_tenants();
tenants_accessor.remove(&tenant_id)

View File

@@ -154,7 +154,7 @@ async fn wait_for_active_tenant(
};
// if the tenant has a proper status already, no need to wait for anything
if tenant.should_run_tasks() {
if tenant.current_state() == TenantState::Active {
ControlFlow::Continue(tenant)
} else {
let mut tenant_state_updates = tenant.subscribe_for_state_updates();
@@ -163,14 +163,12 @@ async fn wait_for_active_tenant(
Ok(()) => {
let new_state = *tenant_state_updates.borrow();
match new_state {
TenantState::Active {
background_jobs_running: true,
} => {
debug!("Tenant state changed to active with background jobs enabled, continuing the task loop");
TenantState::Active => {
debug!("Tenant state changed to active, continuing the task loop");
return ControlFlow::Continue(tenant);
}
state => {
debug!("Not running the task loop, tenant is not active with background jobs enabled: {state:?}");
debug!("Not running the task loop, tenant is not active: {state:?}");
continue;
}
}

View File

@@ -312,7 +312,7 @@ def test_remote_storage_upload_queue_retries(
all_states = client.tenant_list()
[tenant] = [t for t in all_states if TenantId(t["id"]) == tenant_id]
assert tenant["has_in_progress_downloads"] is False
assert tenant["state"] == {"Active": {"background_jobs_running": True}}
assert tenant["state"] == "Active"
wait_until(30, 1, tenant_active)

View File

@@ -41,16 +41,10 @@ def test_tenant_tasks(neon_env_builder: NeonEnvBuilder):
for t in timelines:
client.timeline_delete(tenant, t)
def assert_active_without_jobs(tenant):
assert get_state(tenant) == {"Active": {"background_jobs_running": False}}
# Create tenant, start compute
tenant, _ = env.neon_cli.create_tenant()
env.neon_cli.create_timeline(name, tenant_id=tenant)
pg = env.postgres.create_start(name, tenant_id=tenant)
assert get_state(tenant) == {
"Active": {"background_jobs_running": True}
}, "Pageserver should activate a tenant and start background jobs if timelines are loaded"
# Stop compute
pg.stop()
@@ -59,7 +53,6 @@ def test_tenant_tasks(neon_env_builder: NeonEnvBuilder):
for tenant_info in client.tenant_list():
tenant_id = TenantId(tenant_info["id"])
delete_all_timelines(tenant_id)
wait_until(10, 0.2, lambda: assert_active_without_jobs(tenant_id))
# Assert that all tasks finish quickly after tenant is detached
assert get_metric_value('pageserver_tenant_task_events{event="start"}') > 0

View File

@@ -259,6 +259,4 @@ def test_pageserver_with_empty_tenants(
), f"Tenant {tenant_without_timelines_dir} without timelines dir should be broken"
[loaded_tenant] = [t for t in tenants if t["id"] == str(tenant_with_empty_timelines_dir)]
assert loaded_tenant["state"] == {
"Active": {"background_jobs_running": False}
}, "Tenant {tenant_with_empty_timelines_dir} with empty timelines dir should be active and ready for timeline creation"
assert loaded_tenant["state"] == "Active", "Tenant {tenant_with_empty_timelines_dir} with empty timelines dir should be active and ready for timeline creation"