mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 14:00:38 +00:00
pageserver: cleanup redundant create/attach code, fix detach while attaching (#6277)
## Problem The code for tenant create and tenant attach was just a special case of what upsert_location does. ## Summary of changes - Use `upsert_location` for create and attach APIs - Clean up error handling in upsert_location so that it can generate appropriate HTTP response codes - Update tests that asserted the old non-idempotent behavior of attach - Rework the `test_ignore_while_attaching` test, and fix tenant shutdown during activation, which this test was supposed to cover, but it was actually just waiting for activation to complete.
This commit is contained in:
@@ -15,6 +15,10 @@ use tracing::*;
|
||||
/// specified time (in milliseconds). The main difference is that we use async
|
||||
/// tokio sleep function. Another difference is that we print lines to the log,
|
||||
/// which can be useful in tests to check that the failpoint was hit.
|
||||
///
|
||||
/// Optionally pass a cancellation token, and this failpoint will drop out of
|
||||
/// its sleep when the cancellation token fires. This is useful for testing
|
||||
/// cases where we would like to block something, but test its clean shutdown behavior.
|
||||
#[macro_export]
|
||||
macro_rules! __failpoint_sleep_millis_async {
|
||||
($name:literal) => {{
|
||||
@@ -30,6 +34,24 @@ macro_rules! __failpoint_sleep_millis_async {
|
||||
$crate::failpoint_support::failpoint_sleep_helper($name, duration_str).await
|
||||
}
|
||||
}};
|
||||
($name:literal, $cancel:expr) => {{
|
||||
// If the failpoint is used with a "return" action, set should_sleep to the
|
||||
// returned value (as string). Otherwise it's set to None.
|
||||
let should_sleep = (|| {
|
||||
::fail::fail_point!($name, |x| x);
|
||||
::std::option::Option::None
|
||||
})();
|
||||
|
||||
// Sleep if the action was a returned value
|
||||
if let ::std::option::Option::Some(duration_str) = should_sleep {
|
||||
$crate::failpoint_support::failpoint_sleep_cancellable_helper(
|
||||
$name,
|
||||
duration_str,
|
||||
$cancel,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}};
|
||||
}
|
||||
pub use __failpoint_sleep_millis_async as sleep_millis_async;
|
||||
|
||||
@@ -45,6 +67,22 @@ pub async fn failpoint_sleep_helper(name: &'static str, duration_str: String) {
|
||||
tracing::info!("failpoint {:?}: sleep done", name);
|
||||
}
|
||||
|
||||
// Helper function used by the macro. (A function has nicer scoping so we
|
||||
// don't need to decorate everything with "::")
|
||||
#[doc(hidden)]
|
||||
pub async fn failpoint_sleep_cancellable_helper(
|
||||
name: &'static str,
|
||||
duration_str: String,
|
||||
cancel: &CancellationToken,
|
||||
) {
|
||||
let millis = duration_str.parse::<u64>().unwrap();
|
||||
let d = std::time::Duration::from_millis(millis);
|
||||
|
||||
tracing::info!("failpoint {:?}: sleeping for {:?}", name, d);
|
||||
tokio::time::timeout(d, cancel.cancelled()).await.ok();
|
||||
tracing::info!("failpoint {:?}: sleep done", name);
|
||||
}
|
||||
|
||||
pub fn init() -> fail::FailScenario<'static> {
|
||||
// The failpoints lib provides support for parsing the `FAILPOINTS` env var.
|
||||
// We want non-default behavior for `exit`, though, so, we handle it separately.
|
||||
|
||||
@@ -15,6 +15,7 @@ use hyper::StatusCode;
|
||||
use hyper::{Body, Request, Response, Uri};
|
||||
use metrics::launch_timestamp::LaunchTimestamp;
|
||||
use pageserver_api::models::TenantDetails;
|
||||
use pageserver_api::models::TenantState;
|
||||
use pageserver_api::models::{
|
||||
DownloadRemoteLayersTaskSpawnRequest, LocationConfigMode, TenantAttachRequest,
|
||||
TenantLoadRequest, TenantLocationConfigRequest,
|
||||
@@ -37,6 +38,7 @@ use crate::pgdatadir_mapping::LsnForTimestamp;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::config::{LocationConf, TenantConfOpt};
|
||||
use crate::tenant::mgr::GetActiveTenantError;
|
||||
use crate::tenant::mgr::UpsertLocationError;
|
||||
use crate::tenant::mgr::{
|
||||
GetTenantError, SetNewTenantConfigError, TenantManager, TenantMapError, TenantMapInsertError,
|
||||
TenantSlotError, TenantSlotUpsertError, TenantStateError,
|
||||
@@ -46,7 +48,8 @@ use crate::tenant::size::ModelInputs;
|
||||
use crate::tenant::storage_layer::LayerAccessStatsReset;
|
||||
use crate::tenant::timeline::CompactFlags;
|
||||
use crate::tenant::timeline::Timeline;
|
||||
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError, TenantSharedResources};
|
||||
use crate::tenant::SpawnMode;
|
||||
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError};
|
||||
use crate::{config::PageServerConf, tenant::mgr};
|
||||
use crate::{disk_usage_eviction_task, tenant};
|
||||
use pageserver_api::models::{
|
||||
@@ -112,14 +115,6 @@ impl State {
|
||||
secondary_controller,
|
||||
})
|
||||
}
|
||||
|
||||
fn tenant_resources(&self) -> TenantSharedResources {
|
||||
TenantSharedResources {
|
||||
broker_client: self.broker_client.clone(),
|
||||
remote_storage: self.remote_storage.clone(),
|
||||
deletion_queue_client: self.deletion_queue_client.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
@@ -175,7 +170,7 @@ impl From<TenantSlotError> for ApiError {
|
||||
NotFound(tenant_id) => {
|
||||
ApiError::NotFound(anyhow::anyhow!("NotFound: tenant {tenant_id}").into())
|
||||
}
|
||||
e @ (AlreadyExists(_, _) | Conflict(_)) => ApiError::Conflict(format!("{e}")),
|
||||
e @ AlreadyExists(_, _) => ApiError::Conflict(format!("{e}")),
|
||||
InProgress => {
|
||||
ApiError::ResourceUnavailable("Tenant is being modified concurrently".into())
|
||||
}
|
||||
@@ -194,6 +189,18 @@ impl From<TenantSlotUpsertError> for ApiError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<UpsertLocationError> for ApiError {
|
||||
fn from(e: UpsertLocationError) -> ApiError {
|
||||
use UpsertLocationError::*;
|
||||
match e {
|
||||
BadRequest(e) => ApiError::BadRequest(e),
|
||||
Unavailable(_) => ApiError::ShuttingDown,
|
||||
e @ InProgress => ApiError::Conflict(format!("{e}")),
|
||||
Flush(e) | Other(e) => ApiError::InternalServerError(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<TenantMapError> for ApiError {
|
||||
fn from(e: TenantMapError) -> ApiError {
|
||||
use TenantMapError::*;
|
||||
@@ -680,16 +687,37 @@ async fn tenant_attach_handler(
|
||||
)));
|
||||
}
|
||||
|
||||
mgr::attach_tenant(
|
||||
state.conf,
|
||||
tenant_id,
|
||||
generation,
|
||||
tenant_conf,
|
||||
state.tenant_resources(),
|
||||
&ctx,
|
||||
)
|
||||
.instrument(info_span!("tenant_attach", %tenant_id))
|
||||
.await?;
|
||||
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
|
||||
let location_conf = LocationConf::attached_single(tenant_conf, generation);
|
||||
let tenant = state
|
||||
.tenant_manager
|
||||
.upsert_location(
|
||||
tenant_shard_id,
|
||||
location_conf,
|
||||
None,
|
||||
SpawnMode::Normal,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let Some(tenant) = tenant else {
|
||||
// This should never happen: indicates a bug in upsert_location
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"Upsert succeeded but didn't return tenant!"
|
||||
)));
|
||||
};
|
||||
|
||||
// We might have successfully constructed a Tenant, but it could still
|
||||
// end up in a broken state:
|
||||
if let TenantState::Broken {
|
||||
reason,
|
||||
backtrace: _,
|
||||
} = tenant.current_state()
|
||||
{
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"Tenant state is Broken: {reason}"
|
||||
)));
|
||||
}
|
||||
|
||||
json_response(StatusCode::ACCEPTED, ())
|
||||
}
|
||||
@@ -1148,16 +1176,25 @@ async fn tenant_create_handler(
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
|
||||
|
||||
let new_tenant = mgr::create_tenant(
|
||||
state.conf,
|
||||
tenant_conf,
|
||||
target_tenant_id,
|
||||
generation,
|
||||
state.tenant_resources(),
|
||||
&ctx,
|
||||
)
|
||||
.instrument(info_span!("tenant_create", tenant_id = %target_tenant_id))
|
||||
.await?;
|
||||
let location_conf = LocationConf::attached_single(tenant_conf, generation);
|
||||
|
||||
let new_tenant = state
|
||||
.tenant_manager
|
||||
.upsert_location(
|
||||
target_tenant_id,
|
||||
location_conf,
|
||||
None,
|
||||
SpawnMode::Create,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let Some(new_tenant) = new_tenant else {
|
||||
// This should never happen: indicates a bug in upsert_location
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"Upsert succeeded but didn't return tenant!"
|
||||
)));
|
||||
};
|
||||
|
||||
// We created the tenant. Existing API semantics are that the tenant
|
||||
// is Active when this function returns.
|
||||
@@ -1166,7 +1203,7 @@ async fn tenant_create_handler(
|
||||
.await
|
||||
{
|
||||
// This shouldn't happen because we just created the tenant directory
|
||||
// in tenant::mgr::create_tenant, and there aren't any remote timelines
|
||||
// in upsert_location, and there aren't any remote timelines
|
||||
// to load, so, nothing can really fail during load.
|
||||
// Don't do cleanup because we don't know how we got here.
|
||||
// The tenant will likely be in `Broken` state and subsequent
|
||||
@@ -1267,12 +1304,14 @@ async fn put_tenant_location_config_handler(
|
||||
|
||||
state
|
||||
.tenant_manager
|
||||
.upsert_location(tenant_shard_id, location_conf, flush, &ctx)
|
||||
.await
|
||||
// TODO: badrequest assumes the caller was asking for something unreasonable, but in
|
||||
// principle we might have hit something like concurrent API calls to the same tenant,
|
||||
// which is not a 400 but a 409.
|
||||
.map_err(ApiError::BadRequest)?;
|
||||
.upsert_location(
|
||||
tenant_shard_id,
|
||||
location_conf,
|
||||
flush,
|
||||
tenant::SpawnMode::Normal,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
if let Some(_flush_ms) = flush {
|
||||
match state
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
//!
|
||||
|
||||
use anyhow::{bail, Context};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use camino::Utf8Path;
|
||||
use enumset::EnumSet;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::FutureExt;
|
||||
@@ -1003,7 +1003,7 @@ impl Tenant {
|
||||
// IndexPart is the source of truth.
|
||||
self.clean_up_timelines(&existent_timelines)?;
|
||||
|
||||
failpoint_support::sleep_millis_async!("attach-before-activate");
|
||||
failpoint_support::sleep_millis_async!("attach-before-activate", &self.cancel);
|
||||
|
||||
info!("Done");
|
||||
|
||||
@@ -2036,6 +2036,13 @@ impl Tenant {
|
||||
// It's mesed up.
|
||||
// we just ignore the failure to stop
|
||||
|
||||
// If we're still attaching, fire the cancellation token early to drop out: this
|
||||
// will prevent us flushing, but ensures timely shutdown if some I/O during attach
|
||||
// is very slow.
|
||||
if matches!(self.current_state(), TenantState::Attaching) {
|
||||
self.cancel.cancel();
|
||||
}
|
||||
|
||||
match self.set_stopping(shutdown_progress, false, false).await {
|
||||
Ok(()) => {}
|
||||
Err(SetStoppingError::Broken) => {
|
||||
@@ -2734,6 +2741,10 @@ impl Tenant {
|
||||
"#
|
||||
.to_string();
|
||||
|
||||
fail::fail_point!("tenant-config-before-write", |_| {
|
||||
anyhow::bail!("tenant-config-before-write");
|
||||
});
|
||||
|
||||
// Convert the config to a toml file.
|
||||
conf_content += &toml_edit::ser::to_string_pretty(&location_conf)?;
|
||||
|
||||
@@ -3650,140 +3661,6 @@ fn remove_timeline_and_uninit_mark(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn create_tenant_files(
|
||||
conf: &'static PageServerConf,
|
||||
location_conf: &LocationConf,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
) -> anyhow::Result<Utf8PathBuf> {
|
||||
let target_tenant_directory = conf.tenant_path(tenant_shard_id);
|
||||
anyhow::ensure!(
|
||||
!target_tenant_directory
|
||||
.try_exists()
|
||||
.context("check existence of tenant directory")?,
|
||||
"tenant directory already exists",
|
||||
);
|
||||
|
||||
let temporary_tenant_dir =
|
||||
path_with_suffix_extension(&target_tenant_directory, TEMP_FILE_SUFFIX);
|
||||
debug!("Creating temporary directory structure in {temporary_tenant_dir}");
|
||||
|
||||
// top-level dir may exist if we are creating it through CLI
|
||||
crashsafe::create_dir_all(&temporary_tenant_dir).with_context(|| {
|
||||
format!("could not create temporary tenant directory {temporary_tenant_dir}")
|
||||
})?;
|
||||
|
||||
let creation_result = try_create_target_tenant_dir(
|
||||
conf,
|
||||
location_conf,
|
||||
tenant_shard_id,
|
||||
&temporary_tenant_dir,
|
||||
&target_tenant_directory,
|
||||
)
|
||||
.await;
|
||||
|
||||
if creation_result.is_err() {
|
||||
error!(
|
||||
"Failed to create directory structure for tenant {tenant_shard_id}, cleaning tmp data"
|
||||
);
|
||||
if let Err(e) = fs::remove_dir_all(&temporary_tenant_dir) {
|
||||
error!("Failed to remove temporary tenant directory {temporary_tenant_dir:?}: {e}")
|
||||
} else if let Err(e) = crashsafe::fsync(&temporary_tenant_dir) {
|
||||
error!(
|
||||
"Failed to fsync removed temporary tenant directory {temporary_tenant_dir:?}: {e}"
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
creation_result?;
|
||||
|
||||
Ok(target_tenant_directory)
|
||||
}
|
||||
|
||||
async fn try_create_target_tenant_dir(
|
||||
conf: &'static PageServerConf,
|
||||
location_conf: &LocationConf,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
temporary_tenant_dir: &Utf8Path,
|
||||
target_tenant_directory: &Utf8Path,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
let temporary_tenant_timelines_dir = rebase_directory(
|
||||
&conf.timelines_path(tenant_shard_id),
|
||||
target_tenant_directory,
|
||||
temporary_tenant_dir,
|
||||
)
|
||||
.with_context(|| format!("resolve tenant {tenant_shard_id} temporary timelines dir"))?;
|
||||
let temporary_legacy_tenant_config_path = rebase_directory(
|
||||
&conf.tenant_config_path(tenant_shard_id),
|
||||
target_tenant_directory,
|
||||
temporary_tenant_dir,
|
||||
)
|
||||
.with_context(|| format!("resolve tenant {tenant_shard_id} temporary config path"))?;
|
||||
let temporary_tenant_config_path = rebase_directory(
|
||||
&conf.tenant_location_config_path(tenant_shard_id),
|
||||
target_tenant_directory,
|
||||
temporary_tenant_dir,
|
||||
)
|
||||
.with_context(|| format!("resolve tenant {tenant_shard_id} temporary config path"))?;
|
||||
|
||||
Tenant::persist_tenant_config_at(
|
||||
tenant_shard_id,
|
||||
&temporary_tenant_config_path,
|
||||
&temporary_legacy_tenant_config_path,
|
||||
location_conf,
|
||||
)
|
||||
.await?;
|
||||
|
||||
crashsafe::create_dir(&temporary_tenant_timelines_dir).with_context(|| {
|
||||
format!(
|
||||
"create tenant {} temporary timelines directory {}",
|
||||
tenant_shard_id, temporary_tenant_timelines_dir,
|
||||
)
|
||||
})?;
|
||||
fail::fail_point!("tenant-creation-before-tmp-rename", |_| {
|
||||
anyhow::bail!("failpoint tenant-creation-before-tmp-rename");
|
||||
});
|
||||
|
||||
// Make sure the current tenant directory entries are durable before renaming.
|
||||
// Without this, a crash may reorder any of the directory entry creations above.
|
||||
crashsafe::fsync(temporary_tenant_dir)
|
||||
.with_context(|| format!("sync temporary tenant directory {temporary_tenant_dir:?}"))?;
|
||||
|
||||
fs::rename(temporary_tenant_dir, target_tenant_directory).with_context(|| {
|
||||
format!(
|
||||
"move tenant {} temporary directory {} into the permanent one {}",
|
||||
tenant_shard_id, temporary_tenant_dir, target_tenant_directory
|
||||
)
|
||||
})?;
|
||||
let target_dir_parent = target_tenant_directory.parent().with_context(|| {
|
||||
format!(
|
||||
"get tenant {} dir parent for {}",
|
||||
tenant_shard_id, target_tenant_directory,
|
||||
)
|
||||
})?;
|
||||
crashsafe::fsync(target_dir_parent).with_context(|| {
|
||||
format!(
|
||||
"fsync renamed directory's parent {} for tenant {}",
|
||||
target_dir_parent, tenant_shard_id,
|
||||
)
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn rebase_directory(
|
||||
original_path: &Utf8Path,
|
||||
base: &Utf8Path,
|
||||
new_base: &Utf8Path,
|
||||
) -> anyhow::Result<Utf8PathBuf> {
|
||||
let relative_path = original_path.strip_prefix(base).with_context(|| {
|
||||
format!(
|
||||
"Failed to strip base prefix '{}' off path '{}'",
|
||||
base, original_path
|
||||
)
|
||||
})?;
|
||||
Ok(new_base.join(relative_path))
|
||||
}
|
||||
|
||||
/// Create the cluster temporarily in 'initdbpath' directory inside the repository
|
||||
/// to get bootstrap data for timeline initialization.
|
||||
async fn run_initdb(
|
||||
@@ -3878,6 +3755,7 @@ pub async fn dump_layerfile_from_path(
|
||||
#[cfg(test)]
|
||||
pub(crate) mod harness {
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use camino::Utf8PathBuf;
|
||||
use once_cell::sync::OnceCell;
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
use std::fs;
|
||||
|
||||
@@ -35,7 +35,7 @@ use crate::tenant::config::{
|
||||
};
|
||||
use crate::tenant::delete::DeleteTenantFlow;
|
||||
use crate::tenant::span::debug_assert_current_span_has_tenant_id;
|
||||
use crate::tenant::{create_tenant_files, AttachedTenantConf, SpawnMode, Tenant, TenantState};
|
||||
use crate::tenant::{AttachedTenantConf, SpawnMode, Tenant, TenantState};
|
||||
use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, TEMP_FILE_SUFFIX};
|
||||
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
@@ -754,45 +754,6 @@ async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
|
||||
// caller will log how long we took
|
||||
}
|
||||
|
||||
pub(crate) async fn create_tenant(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_conf: TenantConfOpt,
|
||||
tenant_shard_id: TenantShardId,
|
||||
generation: Generation,
|
||||
resources: TenantSharedResources,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Arc<Tenant>, TenantMapInsertError> {
|
||||
let location_conf = LocationConf::attached_single(tenant_conf, generation);
|
||||
info!("Creating tenant at location {location_conf:?}");
|
||||
|
||||
let slot_guard =
|
||||
tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustNotExist)?;
|
||||
let tenant_path = super::create_tenant_files(conf, &location_conf, &tenant_shard_id).await?;
|
||||
|
||||
let shard_identity = location_conf.shard;
|
||||
let created_tenant = tenant_spawn(
|
||||
conf,
|
||||
tenant_shard_id,
|
||||
&tenant_path,
|
||||
resources,
|
||||
AttachedTenantConf::try_from(location_conf)?,
|
||||
shard_identity,
|
||||
None,
|
||||
&TENANTS,
|
||||
SpawnMode::Create,
|
||||
ctx,
|
||||
)?;
|
||||
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
|
||||
// See https://github.com/neondatabase/neon/issues/4233
|
||||
|
||||
let created_tenant_id = created_tenant.tenant_id();
|
||||
debug_assert_eq!(created_tenant_id, tenant_shard_id.tenant_id);
|
||||
|
||||
slot_guard.upsert(TenantSlot::Attached(created_tenant.clone()))?;
|
||||
|
||||
Ok(created_tenant)
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum SetNewTenantConfigError {
|
||||
#[error(transparent)]
|
||||
@@ -824,6 +785,24 @@ pub(crate) async fn set_new_tenant_config(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum UpsertLocationError {
|
||||
#[error("Bad config request: {0}")]
|
||||
BadRequest(anyhow::Error),
|
||||
|
||||
#[error("Cannot change config in this state: {0}")]
|
||||
Unavailable(#[from] TenantMapError),
|
||||
|
||||
#[error("Tenant is already being modified")]
|
||||
InProgress,
|
||||
|
||||
#[error("Failed to flush: {0}")]
|
||||
Flush(anyhow::Error),
|
||||
|
||||
#[error("Internal error: {0}")]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
impl TenantManager {
|
||||
/// Convenience function so that anyone with a TenantManager can get at the global configuration, without
|
||||
/// having to pass it around everywhere as a separate object.
|
||||
@@ -888,8 +867,9 @@ impl TenantManager {
|
||||
tenant_shard_id: TenantShardId,
|
||||
new_location_config: LocationConf,
|
||||
flush: Option<Duration>,
|
||||
spawn_mode: SpawnMode,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
) -> Result<Option<Arc<Tenant>>, UpsertLocationError> {
|
||||
debug_assert_current_span_has_tenant_id();
|
||||
info!("configuring tenant location to state {new_location_config:?}");
|
||||
|
||||
@@ -911,9 +891,10 @@ impl TenantManager {
|
||||
// A transition from Attached to Attached in the same generation, we may
|
||||
// take our fast path and just provide the updated configuration
|
||||
// to the tenant.
|
||||
tenant.set_new_location_config(AttachedTenantConf::try_from(
|
||||
new_location_config.clone(),
|
||||
)?);
|
||||
tenant.set_new_location_config(
|
||||
AttachedTenantConf::try_from(new_location_config.clone())
|
||||
.map_err(UpsertLocationError::BadRequest)?,
|
||||
);
|
||||
|
||||
Some(FastPathModified::Attached(tenant.clone()))
|
||||
} else {
|
||||
@@ -940,8 +921,7 @@ impl TenantManager {
|
||||
match fast_path_taken {
|
||||
Some(FastPathModified::Attached(tenant)) => {
|
||||
Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
|
||||
.await
|
||||
.map_err(SetNewTenantConfigError::Persist)?;
|
||||
.await?;
|
||||
|
||||
// Transition to AttachedStale means we may well hold a valid generation
|
||||
// still, and have been requested to go stale as part of a migration. If
|
||||
@@ -954,9 +934,9 @@ impl TenantManager {
|
||||
if let Some(flush_timeout) = flush {
|
||||
match tokio::time::timeout(flush_timeout, tenant.flush_remote()).await {
|
||||
Ok(Err(e)) => {
|
||||
return Err(e);
|
||||
return Err(UpsertLocationError::Flush(e));
|
||||
}
|
||||
Ok(Ok(_)) => return Ok(()),
|
||||
Ok(Ok(_)) => return Ok(Some(tenant)),
|
||||
Err(_) => {
|
||||
tracing::warn!(
|
||||
timeout_ms = flush_timeout.as_millis(),
|
||||
@@ -967,14 +947,13 @@ impl TenantManager {
|
||||
}
|
||||
}
|
||||
|
||||
return Ok(());
|
||||
return Ok(Some(tenant));
|
||||
}
|
||||
Some(FastPathModified::Secondary(_secondary_tenant)) => {
|
||||
Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
|
||||
.await
|
||||
.map_err(SetNewTenantConfigError::Persist)?;
|
||||
.await?;
|
||||
|
||||
return Ok(());
|
||||
return Ok(None);
|
||||
}
|
||||
None => {
|
||||
// Proceed with the general case procedure, where we will shutdown & remove any existing
|
||||
@@ -987,7 +966,14 @@ impl TenantManager {
|
||||
// the tenant is inaccessible to the outside world while we are doing this, but that is sensible:
|
||||
// the state is ill-defined while we're in transition. Transitions are async, but fast: we do
|
||||
// not do significant I/O, and shutdowns should be prompt via cancellation tokens.
|
||||
let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
|
||||
let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)
|
||||
.map_err(|e| match e {
|
||||
TenantSlotError::AlreadyExists(_, _) | TenantSlotError::NotFound(_) => {
|
||||
unreachable!("Called with mode Any")
|
||||
}
|
||||
TenantSlotError::InProgress => UpsertLocationError::InProgress,
|
||||
TenantSlotError::MapState(s) => UpsertLocationError::Unavailable(s),
|
||||
})?;
|
||||
|
||||
match slot_guard.get_old_value() {
|
||||
Some(TenantSlot::Attached(tenant)) => {
|
||||
@@ -1025,7 +1011,9 @@ impl TenantManager {
|
||||
Some(TenantSlot::InProgress(_)) => {
|
||||
// This should never happen: acquire_slot should error out
|
||||
// if the contents of a slot were InProgress.
|
||||
anyhow::bail!("Acquired an InProgress slot, this is a bug.")
|
||||
return Err(UpsertLocationError::Other(anyhow::anyhow!(
|
||||
"Acquired an InProgress slot, this is a bug."
|
||||
)));
|
||||
}
|
||||
None => {
|
||||
// Slot was vacant, nothing needs shutting down.
|
||||
@@ -1047,9 +1035,7 @@ impl TenantManager {
|
||||
// Before activating either secondary or attached mode, persist the
|
||||
// configuration, so that on restart we will re-attach (or re-start
|
||||
// secondary) on the tenant.
|
||||
Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
|
||||
.await
|
||||
.map_err(SetNewTenantConfigError::Persist)?;
|
||||
Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config).await?;
|
||||
|
||||
let new_slot = match &new_location_config.mode {
|
||||
LocationMode::Secondary(secondary_config) => {
|
||||
@@ -1066,7 +1052,7 @@ impl TenantManager {
|
||||
shard_identity,
|
||||
None,
|
||||
self.tenants,
|
||||
SpawnMode::Normal,
|
||||
spawn_mode,
|
||||
ctx,
|
||||
)?;
|
||||
|
||||
@@ -1074,9 +1060,20 @@ impl TenantManager {
|
||||
}
|
||||
};
|
||||
|
||||
slot_guard.upsert(new_slot)?;
|
||||
let attached_tenant = if let TenantSlot::Attached(tenant) = &new_slot {
|
||||
Some(tenant.clone())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Ok(())
|
||||
slot_guard.upsert(new_slot).map_err(|e| match e {
|
||||
TenantSlotUpsertError::InternalError(e) => {
|
||||
UpsertLocationError::Other(anyhow::anyhow!(e))
|
||||
}
|
||||
TenantSlotUpsertError::MapState(e) => UpsertLocationError::Unavailable(e),
|
||||
})?;
|
||||
|
||||
Ok(attached_tenant)
|
||||
}
|
||||
|
||||
/// Resetting a tenant is equivalent to detaching it, then attaching it again with the same
|
||||
@@ -1648,55 +1645,6 @@ pub(crate) async fn list_tenants() -> Result<Vec<(TenantShardId, TenantState)>,
|
||||
.collect())
|
||||
}
|
||||
|
||||
/// Execute Attach mgmt API command.
|
||||
///
|
||||
/// Downloading all the tenant data is performed in the background, this merely
|
||||
/// spawns the background task and returns quickly.
|
||||
pub(crate) async fn attach_tenant(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
generation: Generation,
|
||||
tenant_conf: TenantConfOpt,
|
||||
resources: TenantSharedResources,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), TenantMapInsertError> {
|
||||
// This is a legacy API (replaced by `/location_conf`). It does not support sharding
|
||||
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
|
||||
|
||||
let slot_guard =
|
||||
tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustNotExist)?;
|
||||
let location_conf = LocationConf::attached_single(tenant_conf, generation);
|
||||
let tenant_dir = create_tenant_files(conf, &location_conf, &tenant_shard_id).await?;
|
||||
// TODO: tenant directory remains on disk if we bail out from here on.
|
||||
// See https://github.com/neondatabase/neon/issues/4233
|
||||
|
||||
let shard_identity = location_conf.shard;
|
||||
let attached_tenant = tenant_spawn(
|
||||
conf,
|
||||
tenant_shard_id,
|
||||
&tenant_dir,
|
||||
resources,
|
||||
AttachedTenantConf::try_from(location_conf)?,
|
||||
shard_identity,
|
||||
None,
|
||||
&TENANTS,
|
||||
SpawnMode::Normal,
|
||||
ctx,
|
||||
)?;
|
||||
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
|
||||
// See https://github.com/neondatabase/neon/issues/4233
|
||||
|
||||
let attached_tenant_id = attached_tenant.tenant_id();
|
||||
if tenant_id != attached_tenant_id {
|
||||
return Err(TenantMapInsertError::Other(anyhow::anyhow!(
|
||||
"loaded created tenant has unexpected tenant id (expect {tenant_id} != actual {attached_tenant_id})",
|
||||
)));
|
||||
}
|
||||
|
||||
slot_guard.upsert(TenantSlot::Attached(attached_tenant))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum TenantMapInsertError {
|
||||
#[error(transparent)]
|
||||
@@ -1710,7 +1658,7 @@ pub(crate) enum TenantMapInsertError {
|
||||
/// Superset of TenantMapError: issues that can occur when acquiring a slot
|
||||
/// for a particular tenant ID.
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum TenantSlotError {
|
||||
pub(crate) enum TenantSlotError {
|
||||
/// When acquiring a slot with the expectation that the tenant already exists.
|
||||
#[error("Tenant {0} not found")]
|
||||
NotFound(TenantShardId),
|
||||
@@ -1719,9 +1667,6 @@ pub enum TenantSlotError {
|
||||
#[error("tenant {0} already exists, state: {1:?}")]
|
||||
AlreadyExists(TenantShardId, TenantState),
|
||||
|
||||
#[error("tenant {0} already exists in but is not attached")]
|
||||
Conflict(TenantShardId),
|
||||
|
||||
// Tried to read a slot that is currently being mutated by another administrative
|
||||
// operation.
|
||||
#[error("tenant has a state change in progress, try again later")]
|
||||
|
||||
@@ -1917,18 +1917,24 @@ class NeonPageserver(PgProtocol):
|
||||
return None
|
||||
|
||||
def tenant_attach(
|
||||
self, tenant_id: TenantId, config: None | Dict[str, Any] = None, config_null: bool = False
|
||||
self,
|
||||
tenant_id: TenantId,
|
||||
config: None | Dict[str, Any] = None,
|
||||
config_null: bool = False,
|
||||
generation: Optional[int] = None,
|
||||
):
|
||||
"""
|
||||
Tenant attachment passes through here to acquire a generation number before proceeding
|
||||
to call into the pageserver HTTP client.
|
||||
"""
|
||||
client = self.http_client()
|
||||
if generation is None:
|
||||
generation = self.env.attachment_service.attach_hook_issue(tenant_id, self.id)
|
||||
return client.tenant_attach(
|
||||
tenant_id,
|
||||
config,
|
||||
config_null,
|
||||
generation=self.env.attachment_service.attach_hook_issue(tenant_id, self.id),
|
||||
generation=generation,
|
||||
)
|
||||
|
||||
def tenant_detach(self, tenant_id: TenantId):
|
||||
|
||||
@@ -144,8 +144,11 @@ def test_remote_storage_backup_and_restore(
|
||||
# Introduce failpoint in list remote timelines code path to make tenant_attach fail.
|
||||
# This is before the failures injected by test_remote_failures, so it's a permanent error.
|
||||
pageserver_http.configure_failpoints(("storage-sync-list-remote-timelines", "return"))
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*attach failed.*: storage-sync-list-remote-timelines",
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
".*attach failed.*: storage-sync-list-remote-timelines",
|
||||
".*Tenant state is Broken: storage-sync-list-remote-timelines.*",
|
||||
]
|
||||
)
|
||||
# Attach it. This HTTP request will succeed and launch a
|
||||
# background task to load the tenant. In that background task,
|
||||
@@ -159,9 +162,13 @@ def test_remote_storage_backup_and_restore(
|
||||
"data": {"reason": "storage-sync-list-remote-timelines"},
|
||||
}
|
||||
|
||||
# Ensure that even though the tenant is broken, we can't attach it again.
|
||||
with pytest.raises(Exception, match=f"tenant {tenant_id} already exists, state: Broken"):
|
||||
env.pageserver.tenant_attach(tenant_id)
|
||||
# Ensure that even though the tenant is broken, retrying the attachment fails
|
||||
with pytest.raises(Exception, match="Tenant state is Broken"):
|
||||
# Use same generation as in previous attempt
|
||||
gen_state = env.attachment_service.inspect(tenant_id)
|
||||
assert gen_state is not None
|
||||
generation = gen_state[0]
|
||||
env.pageserver.tenant_attach(tenant_id, generation=generation)
|
||||
|
||||
# Restart again, this implicitly clears the failpoint.
|
||||
# test_remote_failures=1 remains active, though, as it's in the pageserver config.
|
||||
@@ -176,10 +183,8 @@ def test_remote_storage_backup_and_restore(
|
||||
), "we shouldn't have tried any layer downloads yet since list remote timelines has a failpoint"
|
||||
env.pageserver.start()
|
||||
|
||||
# Ensure that the pageserver remembers that the tenant was attaching, by
|
||||
# trying to attach it again. It should fail.
|
||||
with pytest.raises(Exception, match=f"tenant {tenant_id} already exists, state:"):
|
||||
env.pageserver.tenant_attach(tenant_id)
|
||||
# The attach should have got far enough that it recovers on restart (i.e. tenant's
|
||||
# config was written to local storage).
|
||||
log.info("waiting for tenant to become active. this should be quick with on-demand download")
|
||||
|
||||
wait_until_tenant_active(
|
||||
|
||||
@@ -627,7 +627,7 @@ def test_ignored_tenant_download_missing_layers(neon_env_builder: NeonEnvBuilder
|
||||
|
||||
# Tests that attach is never working on a tenant, ignored or not, as long as it's not absent locally
|
||||
# Similarly, tests that it's not possible to schedule a `load` for tenat that's not ignored.
|
||||
def test_load_attach_negatives(neon_env_builder: NeonEnvBuilder):
|
||||
def test_load_negatives(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
env = neon_env_builder.init_start()
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
@@ -644,25 +644,16 @@ def test_load_attach_negatives(neon_env_builder: NeonEnvBuilder):
|
||||
):
|
||||
env.pageserver.tenant_load(tenant_id)
|
||||
|
||||
with pytest.raises(
|
||||
expected_exception=PageserverApiException,
|
||||
match=f"tenant {tenant_id} already exists, state: Active",
|
||||
):
|
||||
env.pageserver.tenant_attach(tenant_id)
|
||||
|
||||
pageserver_http.tenant_ignore(tenant_id)
|
||||
|
||||
env.pageserver.allowed_errors.append(".*tenant directory already exists.*")
|
||||
with pytest.raises(
|
||||
expected_exception=PageserverApiException,
|
||||
match="tenant directory already exists",
|
||||
):
|
||||
env.pageserver.tenant_attach(tenant_id)
|
||||
|
||||
|
||||
def test_ignore_while_attaching(
|
||||
def test_detach_while_activating(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
"""
|
||||
Test cancellation behavior for tenants that are stuck somewhere between
|
||||
being attached and reaching Active state.
|
||||
"""
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
@@ -684,39 +675,28 @@ def test_ignore_while_attaching(
|
||||
data_secret = "very secret secret"
|
||||
insert_test_data(pageserver_http, tenant_id, timeline_id, data_id, data_secret, endpoint)
|
||||
|
||||
tenants_before_ignore = [tenant["id"] for tenant in pageserver_http.tenant_list()]
|
||||
tenants_before_detach = [tenant["id"] for tenant in pageserver_http.tenant_list()]
|
||||
|
||||
# Detach it
|
||||
pageserver_http.tenant_detach(tenant_id)
|
||||
|
||||
# And re-attach, but stop attach task_mgr task from completing
|
||||
pageserver_http.configure_failpoints([("attach-before-activate", "return(5000)")])
|
||||
pageserver_http.configure_failpoints([("attach-before-activate", "return(600000)")])
|
||||
env.pageserver.tenant_attach(tenant_id)
|
||||
# Run ignore on the task, thereby cancelling the attach.
|
||||
# XXX This should take priority over attach, i.e., it should cancel the attach task.
|
||||
# But neither the failpoint, nor the proper remote_timeline_client download functions,
|
||||
# are sensitive to task_mgr::shutdown.
|
||||
# This problem is tracked in https://github.com/neondatabase/neon/issues/2996 .
|
||||
# So, for now, effectively, this ignore here will block until attach task completes.
|
||||
pageserver_http.tenant_ignore(tenant_id)
|
||||
|
||||
# Cannot attach it due to some local files existing
|
||||
env.pageserver.allowed_errors.append(".*tenant directory already exists.*")
|
||||
with pytest.raises(
|
||||
expected_exception=PageserverApiException,
|
||||
match="tenant directory already exists",
|
||||
):
|
||||
env.pageserver.tenant_attach(tenant_id)
|
||||
# The tenant is in the Activating state. This should not block us from
|
||||
# shutting it down and detaching it.
|
||||
pageserver_http.tenant_detach(tenant_id)
|
||||
|
||||
tenants_after_ignore = [tenant["id"] for tenant in pageserver_http.tenant_list()]
|
||||
assert tenant_id not in tenants_after_ignore, "Ignored tenant should be missing"
|
||||
assert len(tenants_after_ignore) + 1 == len(
|
||||
tenants_before_ignore
|
||||
tenants_after_detach = [tenant["id"] for tenant in pageserver_http.tenant_list()]
|
||||
assert tenant_id not in tenants_after_detach, "Detached tenant should be missing"
|
||||
assert len(tenants_after_detach) + 1 == len(
|
||||
tenants_before_detach
|
||||
), "Only ignored tenant should be missing"
|
||||
|
||||
# Calling load will bring the tenant back online
|
||||
# Subsequently attaching it again should still work
|
||||
pageserver_http.configure_failpoints([("attach-before-activate", "off")])
|
||||
env.pageserver.tenant_load(tenant_id)
|
||||
|
||||
env.pageserver.tenant_attach(tenant_id)
|
||||
wait_until_tenant_state(pageserver_http, tenant_id, "Active", 5)
|
||||
|
||||
endpoint.stop()
|
||||
|
||||
@@ -29,18 +29,13 @@ def test_tenant_creation_fails(neon_simple_env: NeonEnv):
|
||||
initial_tenants = sorted(
|
||||
map(lambda t: t.split()[0], neon_simple_env.neon_cli.list_tenants().stdout.splitlines())
|
||||
)
|
||||
initial_tenant_dirs = [d for d in tenants_dir.iterdir()]
|
||||
[d for d in tenants_dir.iterdir()]
|
||||
|
||||
neon_simple_env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
".*Failed to create directory structure for tenant .*, cleaning tmp data.*",
|
||||
".*Failed to fsync removed temporary tenant directory .*",
|
||||
]
|
||||
)
|
||||
neon_simple_env.pageserver.allowed_errors.append(".*tenant-config-before-write.*")
|
||||
|
||||
pageserver_http = neon_simple_env.pageserver.http_client()
|
||||
pageserver_http.configure_failpoints(("tenant-creation-before-tmp-rename", "return"))
|
||||
with pytest.raises(Exception, match="tenant-creation-before-tmp-rename"):
|
||||
pageserver_http.configure_failpoints(("tenant-config-before-write", "return"))
|
||||
with pytest.raises(Exception, match="tenant-config-before-write"):
|
||||
_ = neon_simple_env.neon_cli.create_tenant()
|
||||
|
||||
new_tenants = sorted(
|
||||
@@ -48,10 +43,10 @@ def test_tenant_creation_fails(neon_simple_env: NeonEnv):
|
||||
)
|
||||
assert initial_tenants == new_tenants, "should not create new tenants"
|
||||
|
||||
new_tenant_dirs = [d for d in tenants_dir.iterdir()]
|
||||
assert (
|
||||
new_tenant_dirs == initial_tenant_dirs
|
||||
), "pageserver should clean its temp tenant dirs on tenant creation failure"
|
||||
# Any files left behind on disk during failed creation do not prevent
|
||||
# a retry from succeeding.
|
||||
pageserver_http.configure_failpoints(("tenant-config-before-write", "off"))
|
||||
neon_simple_env.neon_cli.create_tenant()
|
||||
|
||||
|
||||
def test_tenants_normal_work(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
Reference in New Issue
Block a user