mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 20:42:54 +00:00
Delete tenant's data from s3 (#4855)
## Summary of changes For context see https://github.com/neondatabase/neon/blob/main/docs/rfcs/022-pageserver-delete-from-s3.md Create Flow to delete tenant's data from pageserver. The approach heavily mimics previously implemented timeline deletion implemented mostly in https://github.com/neondatabase/neon/pull/4384 and followed up in https://github.com/neondatabase/neon/pull/4552 For remaining deletion related issues consult with deletion project here: https://github.com/orgs/neondatabase/projects/33 resolves #4250 resolves https://github.com/neondatabase/neon/issues/3889 --------- Co-authored-by: Joonas Koivunen <joonas@neon.tech>
This commit is contained in:
@@ -24,6 +24,20 @@ pub async fn is_directory_empty(path: impl AsRef<Path>) -> anyhow::Result<bool>
|
||||
Ok(dir.next_entry().await?.is_none())
|
||||
}
|
||||
|
||||
pub async fn list_dir(path: impl AsRef<Path>) -> anyhow::Result<Vec<String>> {
|
||||
let mut dir = tokio::fs::read_dir(&path)
|
||||
.await
|
||||
.context(format!("read_dir({})", path.as_ref().display()))?;
|
||||
|
||||
let mut content = vec![];
|
||||
while let Some(next) = dir.next_entry().await? {
|
||||
let file_name = next.file_name();
|
||||
content.push(file_name.to_string_lossy().to_string());
|
||||
}
|
||||
|
||||
Ok(content)
|
||||
}
|
||||
|
||||
pub fn ignore_not_found(e: io::Error) -> io::Result<()> {
|
||||
if e.kind() == io::ErrorKind::NotFound {
|
||||
Ok(())
|
||||
@@ -43,7 +57,7 @@ where
|
||||
mod test {
|
||||
use std::path::PathBuf;
|
||||
|
||||
use crate::fs_ext::is_directory_empty;
|
||||
use crate::fs_ext::{is_directory_empty, list_dir};
|
||||
|
||||
use super::ignore_absent_files;
|
||||
|
||||
@@ -109,4 +123,25 @@ mod test {
|
||||
|
||||
assert!(!file_path.exists());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_dir_works() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let dir_path = dir.path();
|
||||
|
||||
assert!(list_dir(dir_path).await.unwrap().is_empty());
|
||||
|
||||
let file_path: PathBuf = dir_path.join("testfile");
|
||||
let _ = std::fs::File::create(&file_path).unwrap();
|
||||
|
||||
assert_eq!(&list_dir(dir_path).await.unwrap(), &["testfile"]);
|
||||
|
||||
let another_dir_path: PathBuf = dir_path.join("testdir");
|
||||
std::fs::create_dir(another_dir_path).unwrap();
|
||||
|
||||
let expected = &["testdir", "testfile"];
|
||||
let mut actual = list_dir(dir_path).await.unwrap();
|
||||
actual.sort();
|
||||
assert_eq!(actual, expected);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -373,7 +373,7 @@ fn start_pageserver(
|
||||
let order = pageserver::InitializationOrder {
|
||||
initial_tenant_load: Some(init_done_tx),
|
||||
initial_logical_size_can_start: init_done_rx.clone(),
|
||||
initial_logical_size_attempt: init_logical_size_done_tx,
|
||||
initial_logical_size_attempt: Some(init_logical_size_done_tx),
|
||||
background_jobs_can_start: background_jobs_barrier.clone(),
|
||||
};
|
||||
|
||||
|
||||
@@ -31,7 +31,9 @@ use utils::{
|
||||
use crate::disk_usage_eviction_task::DiskUsageEvictionTaskConfig;
|
||||
use crate::tenant::config::TenantConf;
|
||||
use crate::tenant::config::TenantConfOpt;
|
||||
use crate::tenant::{TENANT_ATTACHING_MARKER_FILENAME, TIMELINES_SEGMENT_NAME};
|
||||
use crate::tenant::{
|
||||
TENANT_ATTACHING_MARKER_FILENAME, TENANT_DELETED_MARKER_FILE_NAME, TIMELINES_SEGMENT_NAME,
|
||||
};
|
||||
use crate::{
|
||||
IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TENANT_CONFIG_NAME, TIMELINE_DELETE_MARK_SUFFIX,
|
||||
TIMELINE_UNINIT_MARK_SUFFIX,
|
||||
@@ -613,6 +615,11 @@ impl PageServerConf {
|
||||
)
|
||||
}
|
||||
|
||||
pub fn tenant_deleted_mark_file_path(&self, tenant_id: &TenantId) -> PathBuf {
|
||||
self.tenant_path(tenant_id)
|
||||
.join(TENANT_DELETED_MARKER_FILE_NAME)
|
||||
}
|
||||
|
||||
pub fn traces_path(&self) -> PathBuf {
|
||||
self.workdir.join("traces")
|
||||
}
|
||||
|
||||
@@ -93,6 +93,47 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
delete:
|
||||
description: |
|
||||
Attempts to delete specified tenant. 500 and 409 errors should be retried until 404 is retrieved.
|
||||
404 means that deletion successfully finished"
|
||||
responses:
|
||||
"400":
|
||||
description: Error when no tenant id found in path
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"401":
|
||||
description: Unauthorized Error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/UnauthorizedError"
|
||||
"403":
|
||||
description: Forbidden Error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ForbiddenError"
|
||||
"404":
|
||||
description: Tenant not found
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/NotFoundError"
|
||||
"409":
|
||||
description: Deletion is already in progress, continue polling
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ConflictError"
|
||||
"500":
|
||||
description: Generic operation error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
|
||||
/v1/tenant/{tenant_id}/timeline:
|
||||
parameters:
|
||||
@@ -820,6 +861,7 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
|
||||
/v1/tenant/config:
|
||||
put:
|
||||
description: |
|
||||
|
||||
@@ -187,7 +187,7 @@ impl From<crate::tenant::DeleteTimelineError> for ApiError {
|
||||
format!("Cannot delete timeline which has child timelines: {children:?}")
|
||||
.into_boxed_str(),
|
||||
),
|
||||
a @ AlreadyInProgress => ApiError::Conflict(a.to_string()),
|
||||
a @ AlreadyInProgress(_) => ApiError::Conflict(a.to_string()),
|
||||
Other(e) => ApiError::InternalServerError(e),
|
||||
}
|
||||
}
|
||||
@@ -208,6 +208,19 @@ impl From<crate::tenant::mgr::DeleteTimelineError> for ApiError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<crate::tenant::delete::DeleteTenantError> for ApiError {
|
||||
fn from(value: crate::tenant::delete::DeleteTenantError) -> Self {
|
||||
use crate::tenant::delete::DeleteTenantError::*;
|
||||
match value {
|
||||
Get(g) => ApiError::from(g),
|
||||
e @ AlreadyInProgress => ApiError::Conflict(e.to_string()),
|
||||
Timeline(t) => ApiError::from(t),
|
||||
Other(o) => ApiError::InternalServerError(o),
|
||||
e @ InvalidState(_) => ApiError::PreconditionFailed(e.to_string().into_boxed_str()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Helper function to construct a TimelineInfo struct for a timeline
|
||||
async fn build_timeline_info(
|
||||
timeline: &Arc<Timeline>,
|
||||
@@ -617,6 +630,23 @@ async fn tenant_status(
|
||||
json_response(StatusCode::OK, tenant_info)
|
||||
}
|
||||
|
||||
async fn tenant_delete_handler(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
// TODO openapi spec
|
||||
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
let state = get_state(&request);
|
||||
|
||||
mgr::delete_tenant(state.conf, state.remote_storage.clone(), tenant_id)
|
||||
.instrument(info_span!("tenant_delete_handler", %tenant_id))
|
||||
.await?;
|
||||
|
||||
json_response(StatusCode::ACCEPTED, ())
|
||||
}
|
||||
|
||||
/// HTTP endpoint to query the current tenant_size of a tenant.
|
||||
///
|
||||
/// This is not used by consumption metrics under [`crate::consumption_metrics`], but can be used
|
||||
@@ -1345,6 +1375,9 @@ pub fn make_router(
|
||||
.get("/v1/tenant", |r| api_handler(r, tenant_list_handler))
|
||||
.post("/v1/tenant", |r| api_handler(r, tenant_create_handler))
|
||||
.get("/v1/tenant/:tenant_id", |r| api_handler(r, tenant_status))
|
||||
.delete("/v1/tenant/:tenant_id", |r| {
|
||||
api_handler(r, tenant_delete_handler)
|
||||
})
|
||||
.get("/v1/tenant/:tenant_id/synthetic_size", |r| {
|
||||
api_handler(r, tenant_size_handler)
|
||||
})
|
||||
|
||||
@@ -190,7 +190,7 @@ pub struct InitializationOrder {
|
||||
|
||||
/// Each timeline owns a clone of this to be consumed on the initial logical size calculation
|
||||
/// attempt. It is important to drop this once the attempt has completed.
|
||||
pub initial_logical_size_attempt: utils::completion::Completion,
|
||||
pub initial_logical_size_attempt: Option<utils::completion::Completion>,
|
||||
|
||||
/// Barrier for when we can start any background jobs.
|
||||
///
|
||||
|
||||
@@ -28,6 +28,7 @@ use std::cmp::min;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::BTreeSet;
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::Debug;
|
||||
use std::fs;
|
||||
use std::fs::File;
|
||||
use std::fs::OpenOptions;
|
||||
@@ -46,8 +47,10 @@ use std::sync::{Mutex, RwLock};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use self::config::TenantConf;
|
||||
use self::delete::DeleteTenantFlow;
|
||||
use self::metadata::LoadMetadataError;
|
||||
use self::metadata::TimelineMetadata;
|
||||
use self::mgr::TenantsMap;
|
||||
use self::remote_timeline_client::RemoteTimelineClient;
|
||||
use self::timeline::uninit::TimelineUninitMark;
|
||||
use self::timeline::uninit::UninitializedTimeline;
|
||||
@@ -106,6 +109,7 @@ macro_rules! pausable_failpoint {
|
||||
|
||||
pub mod blob_io;
|
||||
pub mod block_io;
|
||||
|
||||
pub mod disk_btree;
|
||||
pub(crate) mod ephemeral_file;
|
||||
pub mod layer_map;
|
||||
@@ -118,6 +122,7 @@ mod remote_timeline_client;
|
||||
pub mod storage_layer;
|
||||
|
||||
pub mod config;
|
||||
pub mod delete;
|
||||
pub mod mgr;
|
||||
pub mod tasks;
|
||||
pub mod upload_queue;
|
||||
@@ -145,6 +150,8 @@ pub const TIMELINES_SEGMENT_NAME: &str = "timelines";
|
||||
|
||||
pub const TENANT_ATTACHING_MARKER_FILENAME: &str = "attaching";
|
||||
|
||||
pub const TENANT_DELETED_MARKER_FILE_NAME: &str = "deleted";
|
||||
|
||||
///
|
||||
/// Tenant consists of multiple timelines. Keep them in a hash table.
|
||||
///
|
||||
@@ -183,6 +190,8 @@ pub struct Tenant {
|
||||
cached_synthetic_tenant_size: Arc<AtomicU64>,
|
||||
|
||||
eviction_task_tenant_state: tokio::sync::Mutex<EvictionTaskTenantState>,
|
||||
|
||||
pub(crate) delete_progress: Arc<tokio::sync::Mutex<DeleteTenantFlow>>,
|
||||
}
|
||||
|
||||
// We should not blindly overwrite local metadata with remote one.
|
||||
@@ -274,7 +283,7 @@ pub enum LoadLocalTimelineError {
|
||||
ResumeDeletion(#[source] anyhow::Error),
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[derive(thiserror::Error)]
|
||||
pub enum DeleteTimelineError {
|
||||
#[error("NotFound")]
|
||||
NotFound,
|
||||
@@ -283,17 +292,37 @@ pub enum DeleteTimelineError {
|
||||
HasChildren(Vec<TimelineId>),
|
||||
|
||||
#[error("Timeline deletion is already in progress")]
|
||||
AlreadyInProgress,
|
||||
AlreadyInProgress(Arc<tokio::sync::Mutex<DeleteTimelineFlow>>),
|
||||
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
impl Debug for DeleteTimelineError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::NotFound => write!(f, "NotFound"),
|
||||
Self::HasChildren(c) => f.debug_tuple("HasChildren").field(c).finish(),
|
||||
Self::AlreadyInProgress(_) => f.debug_tuple("AlreadyInProgress").finish(),
|
||||
Self::Other(e) => f.debug_tuple("Other").field(e).finish(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub enum SetStoppingError {
|
||||
AlreadyStopping(completion::Barrier),
|
||||
Broken,
|
||||
}
|
||||
|
||||
impl Debug for SetStoppingError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::AlreadyStopping(_) => f.debug_tuple("AlreadyStopping").finish(),
|
||||
Self::Broken => write!(f, "Broken"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct RemoteStartupData {
|
||||
index_part: IndexPart,
|
||||
remote_metadata: TimelineMetadata,
|
||||
@@ -616,7 +645,7 @@ impl Tenant {
|
||||
// For every timeline, download the metadata file, scan the local directory,
|
||||
// and build a layer map that contains an entry for each remote and local
|
||||
// layer file.
|
||||
let sorted_timelines = tree_sort_timelines(timeline_ancestors)?;
|
||||
let sorted_timelines = tree_sort_timelines(timeline_ancestors, |m| m.ancestor_timeline())?;
|
||||
for (timeline_id, remote_metadata) in sorted_timelines {
|
||||
let (index_part, remote_client) = remote_index_and_client
|
||||
.remove(&timeline_id)
|
||||
@@ -740,12 +769,13 @@ impl Tenant {
|
||||
/// If the loading fails for some reason, the Tenant will go into Broken
|
||||
/// state.
|
||||
#[instrument(skip_all, fields(tenant_id=%tenant_id))]
|
||||
pub fn spawn_load(
|
||||
pub(crate) fn spawn_load(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
init_order: Option<InitializationOrder>,
|
||||
tenants: &'static tokio::sync::RwLock<TenantsMap>,
|
||||
ctx: &RequestContext,
|
||||
) -> Arc<Tenant> {
|
||||
span::debug_assert_current_span_has_tenant_id();
|
||||
@@ -765,7 +795,7 @@ impl Tenant {
|
||||
tenant_conf,
|
||||
wal_redo_manager,
|
||||
tenant_id,
|
||||
remote_storage,
|
||||
remote_storage.clone(),
|
||||
);
|
||||
let tenant = Arc::new(tenant);
|
||||
|
||||
@@ -781,27 +811,83 @@ impl Tenant {
|
||||
"initial tenant load",
|
||||
false,
|
||||
async move {
|
||||
let make_broken = |t: &Tenant, err: anyhow::Error| {
|
||||
error!("load failed, setting tenant state to Broken: {err:?}");
|
||||
t.state.send_modify(|state| {
|
||||
assert!(
|
||||
matches!(*state, TenantState::Loading | TenantState::Stopping { .. }),
|
||||
"the loading task owns the tenant state until activation is complete"
|
||||
);
|
||||
*state = TenantState::broken_from_reason(err.to_string());
|
||||
});
|
||||
};
|
||||
|
||||
let mut init_order = init_order;
|
||||
|
||||
// take the completion because initial tenant loading will complete when all of
|
||||
// these tasks complete.
|
||||
let _completion = init_order.as_mut().and_then(|x| x.initial_tenant_load.take());
|
||||
let _completion = init_order
|
||||
.as_mut()
|
||||
.and_then(|x| x.initial_tenant_load.take());
|
||||
|
||||
// Dont block pageserver startup on figuring out deletion status
|
||||
let pending_deletion = {
|
||||
match DeleteTenantFlow::should_resume_deletion(
|
||||
conf,
|
||||
remote_storage.as_ref(),
|
||||
&tenant_clone,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(should_resume_deletion) => should_resume_deletion,
|
||||
Err(err) => {
|
||||
make_broken(&tenant_clone, anyhow::anyhow!(err));
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
info!("pending deletion {}", pending_deletion.is_some());
|
||||
|
||||
if let Some(deletion) = pending_deletion {
|
||||
// as we are no longer loading, signal completion by dropping
|
||||
// the completion while we resume deletion
|
||||
drop(_completion);
|
||||
// do not hold to initial_logical_size_attempt as it will prevent loading from proceeding without timeout
|
||||
let _ = init_order
|
||||
.as_mut()
|
||||
.and_then(|x| x.initial_logical_size_attempt.take());
|
||||
|
||||
match DeleteTenantFlow::resume(
|
||||
deletion,
|
||||
&tenant_clone,
|
||||
init_order.as_ref(),
|
||||
tenants,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Err(err) => {
|
||||
make_broken(&tenant_clone, anyhow::anyhow!(err));
|
||||
return Ok(());
|
||||
}
|
||||
Ok(()) => return Ok(()),
|
||||
}
|
||||
}
|
||||
|
||||
let background_jobs_can_start =
|
||||
init_order.as_ref().map(|x| &x.background_jobs_can_start);
|
||||
|
||||
match tenant_clone.load(init_order.as_ref(), &ctx).await {
|
||||
Ok(()) => {
|
||||
debug!("load finished, activating");
|
||||
let background_jobs_can_start = init_order.as_ref().map(|x| &x.background_jobs_can_start);
|
||||
debug!("load finished",);
|
||||
|
||||
tenant_clone.activate(broker_client, background_jobs_can_start, &ctx);
|
||||
}
|
||||
Err(err) => {
|
||||
error!("load failed, setting tenant state to Broken: {err:?}");
|
||||
tenant_clone.state.send_modify(|state| {
|
||||
assert_eq!(*state, TenantState::Loading, "the loading task owns the tenant state until activation is complete");
|
||||
*state = TenantState::broken_from_reason(err.to_string());
|
||||
});
|
||||
}
|
||||
Err(err) => make_broken(&tenant_clone, err),
|
||||
}
|
||||
Ok(())
|
||||
|
||||
Ok(())
|
||||
}
|
||||
.instrument({
|
||||
let span = tracing::info_span!(parent: None, "load", tenant_id=%tenant_id);
|
||||
@@ -877,6 +963,8 @@ impl Tenant {
|
||||
)
|
||||
})?;
|
||||
|
||||
info!("Found deletion mark for timeline {}", timeline_id);
|
||||
|
||||
match load_metadata(self.conf, &self.tenant_id, &timeline_id) {
|
||||
Ok(metadata) => {
|
||||
timelines_to_resume_deletion.push((timeline_id, Some(metadata)))
|
||||
@@ -966,9 +1054,11 @@ impl Tenant {
|
||||
|
||||
// Sort the array of timeline IDs into tree-order, so that parent comes before
|
||||
// all its children.
|
||||
tree_sort_timelines(timelines_to_load).map(|sorted_timelines| TenantDirectoryScan {
|
||||
sorted_timelines_to_load: sorted_timelines,
|
||||
timelines_to_resume_deletion,
|
||||
tree_sort_timelines(timelines_to_load, |m| m.ancestor_timeline()).map(|sorted_timelines| {
|
||||
TenantDirectoryScan {
|
||||
sorted_timelines_to_load: sorted_timelines,
|
||||
timelines_to_resume_deletion,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1682,7 +1772,7 @@ impl Tenant {
|
||||
// It's mesed up.
|
||||
// we just ignore the failure to stop
|
||||
|
||||
match self.set_stopping(shutdown_progress).await {
|
||||
match self.set_stopping(shutdown_progress, false).await {
|
||||
Ok(()) => {}
|
||||
Err(SetStoppingError::Broken) => {
|
||||
// assume that this is acceptable
|
||||
@@ -1722,18 +1812,25 @@ impl Tenant {
|
||||
/// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state.
|
||||
///
|
||||
/// This function is not cancel-safe!
|
||||
async fn set_stopping(&self, progress: completion::Barrier) -> Result<(), SetStoppingError> {
|
||||
///
|
||||
/// `allow_transition_from_loading` is needed for the special case of loading task deleting the tenant.
|
||||
async fn set_stopping(
|
||||
&self,
|
||||
progress: completion::Barrier,
|
||||
allow_transition_from_loading: bool,
|
||||
) -> Result<(), SetStoppingError> {
|
||||
let mut rx = self.state.subscribe();
|
||||
|
||||
// cannot stop before we're done activating, so wait out until we're done activating
|
||||
rx.wait_for(|state| match state {
|
||||
TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
|
||||
TenantState::Activating(_) | TenantState::Attaching => {
|
||||
info!(
|
||||
"waiting for {} to turn Active|Broken|Stopping",
|
||||
<&'static str>::from(state)
|
||||
);
|
||||
false
|
||||
}
|
||||
TenantState::Loading => allow_transition_from_loading,
|
||||
TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => true,
|
||||
})
|
||||
.await
|
||||
@@ -1742,9 +1839,16 @@ impl Tenant {
|
||||
// we now know we're done activating, let's see whether this task is the winner to transition into Stopping
|
||||
let mut err = None;
|
||||
let stopping = self.state.send_if_modified(|current_state| match current_state {
|
||||
TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
|
||||
TenantState::Activating(_) | TenantState::Attaching => {
|
||||
unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
|
||||
}
|
||||
TenantState::Loading => {
|
||||
if !allow_transition_from_loading {
|
||||
unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
|
||||
};
|
||||
*current_state = TenantState::Stopping { progress };
|
||||
true
|
||||
}
|
||||
TenantState::Active => {
|
||||
// FIXME: due to time-of-check vs time-of-use issues, it can happen that new timelines
|
||||
// are created after the transition to Stopping. That's harmless, as the Timelines
|
||||
@@ -1813,6 +1917,10 @@ impl Tenant {
|
||||
.expect("cannot drop self.state while on a &self method");
|
||||
|
||||
// we now know we're done activating, let's see whether this task is the winner to transition into Broken
|
||||
self.set_broken_no_wait(reason)
|
||||
}
|
||||
|
||||
pub(crate) fn set_broken_no_wait(&self, reason: String) {
|
||||
self.state.send_modify(|current_state| {
|
||||
match *current_state {
|
||||
TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
|
||||
@@ -1878,22 +1986,28 @@ impl Tenant {
|
||||
/// Given a Vec of timelines and their ancestors (timeline_id, ancestor_id),
|
||||
/// perform a topological sort, so that the parent of each timeline comes
|
||||
/// before the children.
|
||||
fn tree_sort_timelines(
|
||||
timelines: HashMap<TimelineId, TimelineMetadata>,
|
||||
) -> anyhow::Result<Vec<(TimelineId, TimelineMetadata)>> {
|
||||
/// E extracts the ancestor from T
|
||||
/// This allows for T to be different. It can be TimelineMetadata, can be Timeline itself, etc.
|
||||
fn tree_sort_timelines<T, E>(
|
||||
timelines: HashMap<TimelineId, T>,
|
||||
extractor: E,
|
||||
) -> anyhow::Result<Vec<(TimelineId, T)>>
|
||||
where
|
||||
E: Fn(&T) -> Option<TimelineId>,
|
||||
{
|
||||
let mut result = Vec::with_capacity(timelines.len());
|
||||
|
||||
let mut now = Vec::with_capacity(timelines.len());
|
||||
// (ancestor, children)
|
||||
let mut later: HashMap<TimelineId, Vec<(TimelineId, TimelineMetadata)>> =
|
||||
let mut later: HashMap<TimelineId, Vec<(TimelineId, T)>> =
|
||||
HashMap::with_capacity(timelines.len());
|
||||
|
||||
for (timeline_id, metadata) in timelines {
|
||||
if let Some(ancestor_id) = metadata.ancestor_timeline() {
|
||||
for (timeline_id, value) in timelines {
|
||||
if let Some(ancestor_id) = extractor(&value) {
|
||||
let children = later.entry(ancestor_id).or_default();
|
||||
children.push((timeline_id, metadata));
|
||||
children.push((timeline_id, value));
|
||||
} else {
|
||||
now.push((timeline_id, metadata));
|
||||
now.push((timeline_id, value));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2062,7 +2176,7 @@ impl Tenant {
|
||||
remote_client,
|
||||
pg_version,
|
||||
initial_logical_size_can_start.cloned(),
|
||||
initial_logical_size_attempt.cloned(),
|
||||
initial_logical_size_attempt.cloned().flatten(),
|
||||
state,
|
||||
);
|
||||
|
||||
@@ -2146,6 +2260,7 @@ impl Tenant {
|
||||
cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()),
|
||||
cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
|
||||
eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()),
|
||||
delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2162,6 +2277,7 @@ impl Tenant {
|
||||
// FIXME If the config file is not found, assume that we're attaching
|
||||
// a detached tenant and config is passed via attach command.
|
||||
// https://github.com/neondatabase/neon/issues/1555
|
||||
// OR: we're loading after incomplete deletion that managed to remove config.
|
||||
if !target_config_path.exists() {
|
||||
info!("tenant config not found in {target_config_display}");
|
||||
return Ok(TenantConfOpt::default());
|
||||
|
||||
546
pageserver/src/tenant/delete.rs
Normal file
546
pageserver/src/tenant/delete.rs
Normal file
@@ -0,0 +1,546 @@
|
||||
use std::{
|
||||
path::{Path, PathBuf},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use anyhow::Context;
|
||||
use pageserver_api::models::TenantState;
|
||||
use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath};
|
||||
use tokio::sync::OwnedMutexGuard;
|
||||
use tracing::{error, info, instrument, warn, Instrument, Span};
|
||||
|
||||
use utils::{
|
||||
completion, crashsafe, fs_ext,
|
||||
id::{TenantId, TimelineId},
|
||||
};
|
||||
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
context::RequestContext,
|
||||
task_mgr::{self, TaskKind},
|
||||
InitializationOrder,
|
||||
};
|
||||
|
||||
use super::{
|
||||
mgr::{GetTenantError, TenantsMap},
|
||||
span,
|
||||
timeline::delete::DeleteTimelineFlow,
|
||||
tree_sort_timelines, DeleteTimelineError, Tenant,
|
||||
};
|
||||
|
||||
const SHOULD_RESUME_DELETION_FETCH_MARK_ATTEMPTS: u8 = 3;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum DeleteTenantError {
|
||||
#[error("GetTenant {0}")]
|
||||
Get(#[from] GetTenantError),
|
||||
|
||||
#[error("Invalid state {0}. Expected Active or Broken")]
|
||||
InvalidState(TenantState),
|
||||
|
||||
#[error("Tenant deletion is already in progress")]
|
||||
AlreadyInProgress,
|
||||
|
||||
#[error("Timeline {0}")]
|
||||
Timeline(#[from] DeleteTimelineError),
|
||||
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
type DeletionGuard = tokio::sync::OwnedMutexGuard<DeleteTenantFlow>;
|
||||
|
||||
fn remote_tenant_delete_mark_path(
|
||||
conf: &PageServerConf,
|
||||
tenant_id: &TenantId,
|
||||
) -> anyhow::Result<RemotePath> {
|
||||
let tenant_remote_path = conf
|
||||
.tenant_path(tenant_id)
|
||||
.strip_prefix(&conf.workdir)
|
||||
.context("Failed to strip workdir prefix")
|
||||
.and_then(RemotePath::new)
|
||||
.context("tenant path")?;
|
||||
Ok(tenant_remote_path.join(Path::new("deleted")))
|
||||
}
|
||||
|
||||
async fn create_remote_delete_mark(
|
||||
conf: &PageServerConf,
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
tenant_id: &TenantId,
|
||||
) -> Result<(), DeleteTenantError> {
|
||||
let remote_mark_path = remote_tenant_delete_mark_path(conf, tenant_id)?;
|
||||
|
||||
let data: &[u8] = &[];
|
||||
remote_storage
|
||||
.upload(data, 0, &remote_mark_path, None)
|
||||
.await
|
||||
.context("mark upload")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn create_local_delete_mark(
|
||||
conf: &PageServerConf,
|
||||
tenant_id: &TenantId,
|
||||
) -> Result<(), DeleteTenantError> {
|
||||
let marker_path = conf.tenant_deleted_mark_file_path(tenant_id);
|
||||
|
||||
// Note: we're ok to replace existing file.
|
||||
let _ = std::fs::OpenOptions::new()
|
||||
.write(true)
|
||||
.create(true)
|
||||
.open(&marker_path)
|
||||
.with_context(|| format!("could not create delete marker file {marker_path:?}"))?;
|
||||
|
||||
crashsafe::fsync_file_and_parent(&marker_path).context("sync_mark")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn schedule_ordered_timeline_deletions(
|
||||
tenant: &Arc<Tenant>,
|
||||
) -> Result<Vec<(Arc<tokio::sync::Mutex<DeleteTimelineFlow>>, TimelineId)>, DeleteTenantError> {
|
||||
// Tenant is stopping at this point. We know it will be deleted.
|
||||
// No new timelines should be created.
|
||||
// Tree sort timelines to delete from leafs to the root.
|
||||
// NOTE: by calling clone we release the mutex which creates a possibility for a race: pending deletion
|
||||
// can complete and remove timeline from the map in between our call to clone
|
||||
// and `DeleteTimelineFlow::run`, so `run` wont find timeline in `timelines` map.
|
||||
// timelines.lock is currently synchronous so we cant hold it across await point.
|
||||
// So just ignore NotFound error if we get it from `run`.
|
||||
// Beware: in case it becomes async and we try to hold it here, `run` also locks it, which can create a deadlock.
|
||||
let timelines = tenant.timelines.lock().unwrap().clone();
|
||||
let sorted =
|
||||
tree_sort_timelines(timelines, |t| t.get_ancestor_timeline_id()).context("tree sort")?;
|
||||
|
||||
let mut already_running_deletions = vec![];
|
||||
|
||||
for (timeline_id, _) in sorted.into_iter().rev() {
|
||||
if let Err(e) = DeleteTimelineFlow::run(tenant, timeline_id, true).await {
|
||||
match e {
|
||||
DeleteTimelineError::NotFound => {
|
||||
// Timeline deletion finished after call to clone above but before call
|
||||
// to `DeleteTimelineFlow::run` and removed timeline from the map.
|
||||
continue;
|
||||
}
|
||||
DeleteTimelineError::AlreadyInProgress(guard) => {
|
||||
already_running_deletions.push((guard, timeline_id));
|
||||
continue;
|
||||
}
|
||||
e => return Err(DeleteTenantError::Timeline(e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(already_running_deletions)
|
||||
}
|
||||
|
||||
async fn ensure_timelines_dir_empty(timelines_path: &Path) -> Result<(), DeleteTenantError> {
|
||||
// Assert timelines dir is empty.
|
||||
if !fs_ext::is_directory_empty(timelines_path).await? {
|
||||
// Display first 10 items in directory
|
||||
let list = &fs_ext::list_dir(timelines_path).await.context("list_dir")?[..10];
|
||||
return Err(DeleteTenantError::Other(anyhow::anyhow!(
|
||||
"Timelines directory is not empty after all timelines deletion: {list:?}"
|
||||
)));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn remove_tenant_remote_delete_mark(
|
||||
conf: &PageServerConf,
|
||||
remote_storage: Option<&GenericRemoteStorage>,
|
||||
tenant_id: &TenantId,
|
||||
) -> Result<(), DeleteTenantError> {
|
||||
if let Some(remote_storage) = remote_storage {
|
||||
remote_storage
|
||||
.delete(&remote_tenant_delete_mark_path(conf, tenant_id)?)
|
||||
.await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Cleanup fs traces: tenant config, timelines dir local delete mark, tenant dir
|
||||
async fn cleanup_remaining_fs_traces(
|
||||
conf: &PageServerConf,
|
||||
tenant_id: &TenantId,
|
||||
) -> Result<(), DeleteTenantError> {
|
||||
let rm = |p: PathBuf, is_dir: bool| async move {
|
||||
if is_dir {
|
||||
tokio::fs::remove_dir(&p).await
|
||||
} else {
|
||||
tokio::fs::remove_file(&p).await
|
||||
}
|
||||
.or_else(fs_ext::ignore_not_found)
|
||||
.with_context(|| {
|
||||
let to_display = p.display();
|
||||
format!("failed to delete {to_display}")
|
||||
})
|
||||
};
|
||||
|
||||
rm(conf.tenant_config_path(tenant_id), false).await?;
|
||||
|
||||
fail::fail_point!("tenant-delete-before-remove-timelines-dir", |_| {
|
||||
Err(anyhow::anyhow!(
|
||||
"failpoint: tenant-delete-before-remove-timelines-dir"
|
||||
))?
|
||||
});
|
||||
|
||||
rm(conf.timelines_path(tenant_id), true).await?;
|
||||
|
||||
fail::fail_point!("tenant-delete-before-remove-deleted-mark", |_| {
|
||||
Err(anyhow::anyhow!(
|
||||
"failpoint: tenant-delete-before-remove-deleted-mark"
|
||||
))?
|
||||
});
|
||||
|
||||
rm(conf.tenant_deleted_mark_file_path(tenant_id), false).await?;
|
||||
|
||||
fail::fail_point!("tenant-delete-before-remove-tenant-dir", |_| {
|
||||
Err(anyhow::anyhow!(
|
||||
"failpoint: tenant-delete-before-remove-tenant-dir"
|
||||
))?
|
||||
});
|
||||
|
||||
rm(conf.tenant_path(tenant_id), true).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Orchestrates tenant shut down of all tasks, removes its in-memory structures,
|
||||
/// and deletes its data from both disk and s3.
|
||||
/// The sequence of steps:
|
||||
/// 1. Upload remote deletion mark.
|
||||
/// 2. Create local mark file.
|
||||
/// 3. Shutdown tasks
|
||||
/// 4. Run ordered timeline deletions
|
||||
/// 5. Wait for timeline deletion operations that were scheduled before tenant deletion was requested
|
||||
/// 6. Remove remote mark
|
||||
/// 7. Cleanup remaining fs traces, tenant dir, config, timelines dir, local delete mark
|
||||
/// It is resumable from any step in case a crash/restart occurs.
|
||||
/// There are three entrypoints to the process:
|
||||
/// 1. [`DeleteTenantFlow::run`] this is the main one called by a management api handler.
|
||||
/// 2. [`DeleteTenantFlow::resume`] is called during restarts when local or remote deletion marks are still there.
|
||||
/// Note the only other place that messes around timeline delete mark is the `Tenant::spawn_load` function.
|
||||
#[derive(Default)]
|
||||
pub enum DeleteTenantFlow {
|
||||
#[default]
|
||||
NotStarted,
|
||||
InProgress,
|
||||
Finished,
|
||||
}
|
||||
|
||||
impl DeleteTenantFlow {
|
||||
// These steps are run in the context of management api request handler.
|
||||
// Long running steps are continued to run in the background.
|
||||
// NB: If this fails half-way through, and is retried, the retry will go through
|
||||
// all the same steps again. Make sure the code here is idempotent, and don't
|
||||
// error out if some of the shutdown tasks have already been completed!
|
||||
// NOTE: static needed for background part.
|
||||
// We assume that calling code sets up the span with tenant_id.
|
||||
#[instrument(skip_all)]
|
||||
pub(crate) async fn run(
|
||||
conf: &'static PageServerConf,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
tenants: &'static tokio::sync::RwLock<TenantsMap>,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<(), DeleteTenantError> {
|
||||
span::debug_assert_current_span_has_tenant_id();
|
||||
|
||||
let (tenant, mut guard) = Self::prepare(tenants, tenant_id).await?;
|
||||
|
||||
if let Err(e) = Self::run_inner(&mut guard, conf, remote_storage.as_ref(), &tenant).await {
|
||||
tenant.set_broken(format!("{e:#}")).await;
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
Self::schedule_background(guard, conf, remote_storage, tenants, tenant);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Helper function needed to be able to match once on returned error and transition tenant into broken state.
|
||||
// This is needed because tenant.shutwodn is not idempotent. If tenant state is set to stopping another call to tenant.shutdown
|
||||
// will result in an error, but here we need to be able to retry shutdown when tenant deletion is retried.
|
||||
// So the solution is to set tenant state to broken.
|
||||
async fn run_inner(
|
||||
guard: &mut OwnedMutexGuard<Self>,
|
||||
conf: &'static PageServerConf,
|
||||
remote_storage: Option<&GenericRemoteStorage>,
|
||||
tenant: &Tenant,
|
||||
) -> Result<(), DeleteTenantError> {
|
||||
guard.mark_in_progress()?;
|
||||
|
||||
fail::fail_point!("tenant-delete-before-create-remote-mark", |_| {
|
||||
Err(anyhow::anyhow!(
|
||||
"failpoint: tenant-delete-before-create-remote-mark"
|
||||
))?
|
||||
});
|
||||
|
||||
// IDEA: implement detach as delete without remote storage. Then they would use the same lock (deletion_progress) so wont contend.
|
||||
// Though sounds scary, different mark name?
|
||||
// Detach currently uses remove_dir_all so in case of a crash we can end up in a weird state.
|
||||
if let Some(remote_storage) = &remote_storage {
|
||||
create_remote_delete_mark(conf, remote_storage, &tenant.tenant_id)
|
||||
.await
|
||||
.context("remote_mark")?
|
||||
}
|
||||
|
||||
fail::fail_point!("tenant-delete-before-create-local-mark", |_| {
|
||||
Err(anyhow::anyhow!(
|
||||
"failpoint: tenant-delete-before-create-local-mark"
|
||||
))?
|
||||
});
|
||||
|
||||
create_local_delete_mark(conf, &tenant.tenant_id)
|
||||
.await
|
||||
.context("local delete mark")?;
|
||||
|
||||
fail::fail_point!("tenant-delete-before-background", |_| {
|
||||
Err(anyhow::anyhow!(
|
||||
"failpoint: tenant-delete-before-background"
|
||||
))?
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn mark_in_progress(&mut self) -> anyhow::Result<()> {
|
||||
match self {
|
||||
Self::Finished => anyhow::bail!("Bug. Is in finished state"),
|
||||
Self::InProgress { .. } => { /* We're in a retry */ }
|
||||
Self::NotStarted => { /* Fresh start */ }
|
||||
}
|
||||
|
||||
*self = Self::InProgress;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn should_resume_deletion(
|
||||
conf: &'static PageServerConf,
|
||||
remote_storage: Option<&GenericRemoteStorage>,
|
||||
tenant: &Tenant,
|
||||
) -> Result<Option<DeletionGuard>, DeleteTenantError> {
|
||||
let acquire = |t: &Tenant| {
|
||||
Some(
|
||||
Arc::clone(&t.delete_progress)
|
||||
.try_lock_owned()
|
||||
.expect("we're the only owner during init"),
|
||||
)
|
||||
};
|
||||
|
||||
let tenant_id = tenant.tenant_id;
|
||||
// Check local mark first, if its there there is no need to go to s3 to check whether remote one exists.
|
||||
if conf.tenant_deleted_mark_file_path(&tenant_id).exists() {
|
||||
return Ok(acquire(tenant));
|
||||
}
|
||||
|
||||
// If remote storage is there we rely on it
|
||||
if let Some(remote_storage) = remote_storage {
|
||||
let remote_mark_path = remote_tenant_delete_mark_path(conf, &tenant_id)?;
|
||||
|
||||
let attempt = 1;
|
||||
loop {
|
||||
match remote_storage.download(&remote_mark_path).await {
|
||||
Ok(_) => return Ok(acquire(tenant)),
|
||||
Err(e) => {
|
||||
if matches!(e, DownloadError::NotFound) {
|
||||
return Ok(None);
|
||||
}
|
||||
if attempt > SHOULD_RESUME_DELETION_FETCH_MARK_ATTEMPTS {
|
||||
return Err(anyhow::anyhow!(e))?;
|
||||
}
|
||||
|
||||
warn!(
|
||||
"failed to fetch tenant deletion mark at {} attempt {}",
|
||||
&remote_mark_path, attempt
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
pub(crate) async fn resume(
|
||||
guard: DeletionGuard,
|
||||
tenant: &Arc<Tenant>,
|
||||
init_order: Option<&InitializationOrder>,
|
||||
tenants: &'static tokio::sync::RwLock<TenantsMap>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), DeleteTenantError> {
|
||||
let (_, progress) = completion::channel();
|
||||
|
||||
tenant
|
||||
.set_stopping(progress, true)
|
||||
.await
|
||||
.expect("cant be stopping or broken");
|
||||
|
||||
// Do not consume valuable resources during the load phase, continue deletion once init phase is complete.
|
||||
let background_jobs_can_start = init_order.as_ref().map(|x| &x.background_jobs_can_start);
|
||||
if let Some(background) = background_jobs_can_start {
|
||||
info!("waiting for backgound jobs barrier");
|
||||
background.clone().wait().await;
|
||||
info!("ready for backgound jobs barrier");
|
||||
}
|
||||
|
||||
// Tenant may not be loadable if we fail late in cleanup_remaining_fs_traces (e g remove timelines dir)
|
||||
let timelines_path = tenant.conf.timelines_path(&tenant.tenant_id);
|
||||
if timelines_path.exists() {
|
||||
tenant.load(init_order, ctx).await.context("load")?;
|
||||
}
|
||||
|
||||
Self::background(
|
||||
guard,
|
||||
tenant.conf,
|
||||
tenant.remote_storage.clone(),
|
||||
tenants,
|
||||
tenant,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn prepare(
|
||||
tenants: &tokio::sync::RwLock<TenantsMap>,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<(Arc<Tenant>, tokio::sync::OwnedMutexGuard<Self>), DeleteTenantError> {
|
||||
let m = tenants.read().await;
|
||||
|
||||
let tenant = m
|
||||
.get(&tenant_id)
|
||||
.ok_or(GetTenantError::NotFound(tenant_id))?;
|
||||
|
||||
// FIXME: unsure about active only. Our init jobs may not be cancellable properly,
|
||||
// so at least for now allow deletions only for active tenants. TODO recheck
|
||||
// Broken and Stopping is needed for retries.
|
||||
if !matches!(
|
||||
tenant.current_state(),
|
||||
TenantState::Active | TenantState::Broken { .. }
|
||||
) {
|
||||
return Err(DeleteTenantError::InvalidState(tenant.current_state()));
|
||||
}
|
||||
|
||||
let guard = Arc::clone(&tenant.delete_progress)
|
||||
.try_lock_owned()
|
||||
.map_err(|_| DeleteTenantError::AlreadyInProgress)?;
|
||||
|
||||
fail::fail_point!("tenant-delete-before-shutdown", |_| {
|
||||
Err(anyhow::anyhow!("failpoint: tenant-delete-before-shutdown"))?
|
||||
});
|
||||
|
||||
// make pageserver shutdown not to wait for our completion
|
||||
let (_, progress) = completion::channel();
|
||||
|
||||
// It would be good to only set stopping here and continue shutdown in the background, but shutdown is not idempotent.
|
||||
// i e it is an error to do:
|
||||
// tenant.set_stopping
|
||||
// tenant.shutdown
|
||||
// Its also bad that we're holding tenants.read here.
|
||||
// TODO relax set_stopping to be idempotent?
|
||||
if tenant.shutdown(progress, false).await.is_err() {
|
||||
return Err(DeleteTenantError::Other(anyhow::anyhow!(
|
||||
"tenant shutdown is already in progress"
|
||||
)));
|
||||
}
|
||||
|
||||
Ok((Arc::clone(tenant), guard))
|
||||
}
|
||||
|
||||
fn schedule_background(
|
||||
guard: OwnedMutexGuard<Self>,
|
||||
conf: &'static PageServerConf,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
tenants: &'static tokio::sync::RwLock<TenantsMap>,
|
||||
tenant: Arc<Tenant>,
|
||||
) {
|
||||
let tenant_id = tenant.tenant_id;
|
||||
|
||||
task_mgr::spawn(
|
||||
task_mgr::BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::TimelineDeletionWorker,
|
||||
Some(tenant_id),
|
||||
None,
|
||||
"tenant_delete",
|
||||
false,
|
||||
async move {
|
||||
if let Err(err) =
|
||||
Self::background(guard, conf, remote_storage, tenants, &tenant).await
|
||||
{
|
||||
error!("Error: {err:#}");
|
||||
tenant.set_broken(format!("{err:#}")).await;
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
.instrument({
|
||||
let span = tracing::info_span!(parent: None, "delete_tenant", tenant_id=%tenant_id);
|
||||
span.follows_from(Span::current());
|
||||
span
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
async fn background(
|
||||
mut guard: OwnedMutexGuard<Self>,
|
||||
conf: &PageServerConf,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
tenants: &'static tokio::sync::RwLock<TenantsMap>,
|
||||
tenant: &Arc<Tenant>,
|
||||
) -> Result<(), DeleteTenantError> {
|
||||
// Tree sort timelines, schedule delete for them. Mention retries from the console side.
|
||||
// Note that if deletion fails we dont mark timelines as broken,
|
||||
// the whole tenant will become broken as by `Self::schedule_background` logic
|
||||
let already_running_timeline_deletions = schedule_ordered_timeline_deletions(tenant)
|
||||
.await
|
||||
.context("schedule_ordered_timeline_deletions")?;
|
||||
|
||||
fail::fail_point!("tenant-delete-before-polling-ongoing-deletions", |_| {
|
||||
Err(anyhow::anyhow!(
|
||||
"failpoint: tenant-delete-before-polling-ongoing-deletions"
|
||||
))?
|
||||
});
|
||||
|
||||
// Wait for deletions that were already running at the moment when tenant deletion was requested.
|
||||
// When we can lock deletion guard it means that corresponding timeline deletion finished.
|
||||
for (guard, timeline_id) in already_running_timeline_deletions {
|
||||
let flow = guard.lock().await;
|
||||
if !flow.is_finished() {
|
||||
return Err(DeleteTenantError::Other(anyhow::anyhow!(
|
||||
"already running timeline deletion failed: {timeline_id}"
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
let timelines_path = conf.timelines_path(&tenant.tenant_id);
|
||||
// May not exist if we fail in cleanup_remaining_fs_traces after removing it
|
||||
if timelines_path.exists() {
|
||||
// sanity check to guard against layout changes
|
||||
ensure_timelines_dir_empty(&timelines_path)
|
||||
.await
|
||||
.context("timelines dir not empty")?;
|
||||
}
|
||||
|
||||
remove_tenant_remote_delete_mark(conf, remote_storage.as_ref(), &tenant.tenant_id).await?;
|
||||
|
||||
fail::fail_point!("tenant-delete-before-cleanup-remaining-fs-traces", |_| {
|
||||
Err(anyhow::anyhow!(
|
||||
"failpoint: tenant-delete-before-cleanup-remaining-fs-traces"
|
||||
))?
|
||||
});
|
||||
|
||||
cleanup_remaining_fs_traces(conf, &tenant.tenant_id)
|
||||
.await
|
||||
.context("cleanup_remaining_fs_traces")?;
|
||||
|
||||
let mut locked = tenants.write().await;
|
||||
if locked.remove(&tenant.tenant_id).is_none() {
|
||||
warn!("Tenant got removed from tenants map during deletion");
|
||||
};
|
||||
|
||||
*guard = Self::Finished;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -20,17 +20,19 @@ use crate::config::PageServerConf;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::task_mgr::{self, TaskKind};
|
||||
use crate::tenant::config::TenantConfOpt;
|
||||
use crate::tenant::delete::DeleteTenantFlow;
|
||||
use crate::tenant::{create_tenant_files, CreateTenantFilesMode, Tenant, TenantState};
|
||||
use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME};
|
||||
|
||||
use utils::fs_ext::PathExt;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use super::delete::DeleteTenantError;
|
||||
use super::timeline::delete::DeleteTimelineFlow;
|
||||
|
||||
/// The tenants known to the pageserver.
|
||||
/// The enum variants are used to distinguish the different states that the pageserver can be in.
|
||||
enum TenantsMap {
|
||||
pub(crate) enum TenantsMap {
|
||||
/// [`init_tenant_mgr`] is not done yet.
|
||||
Initializing,
|
||||
/// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded.
|
||||
@@ -42,13 +44,13 @@ enum TenantsMap {
|
||||
}
|
||||
|
||||
impl TenantsMap {
|
||||
fn get(&self, tenant_id: &TenantId) -> Option<&Arc<Tenant>> {
|
||||
pub(crate) fn get(&self, tenant_id: &TenantId) -> Option<&Arc<Tenant>> {
|
||||
match self {
|
||||
TenantsMap::Initializing => None,
|
||||
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.get(tenant_id),
|
||||
}
|
||||
}
|
||||
fn remove(&mut self, tenant_id: &TenantId) -> Option<Arc<Tenant>> {
|
||||
pub(crate) fn remove(&mut self, tenant_id: &TenantId) -> Option<Arc<Tenant>> {
|
||||
match self {
|
||||
TenantsMap::Initializing => None,
|
||||
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.remove(tenant_id),
|
||||
@@ -97,7 +99,9 @@ pub async fn init_tenant_mgr(
|
||||
);
|
||||
}
|
||||
} else {
|
||||
// This case happens if we crash during attach before creating the attach marker file
|
||||
// This case happens if we:
|
||||
// * crash during attach before creating the attach marker file
|
||||
// * crash during tenant delete before removing tenant directory
|
||||
let is_empty = tenant_dir_path.is_empty_dir().with_context(|| {
|
||||
format!("Failed to check whether {tenant_dir_path:?} is an empty dir")
|
||||
})?;
|
||||
@@ -124,6 +128,7 @@ pub async fn init_tenant_mgr(
|
||||
broker_client.clone(),
|
||||
remote_storage.clone(),
|
||||
Some(init_order.clone()),
|
||||
&TENANTS,
|
||||
&ctx,
|
||||
) {
|
||||
Ok(tenant) => {
|
||||
@@ -154,12 +159,13 @@ pub async fn init_tenant_mgr(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn schedule_local_tenant_processing(
|
||||
pub(crate) fn schedule_local_tenant_processing(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_path: &Path,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
init_order: Option<InitializationOrder>,
|
||||
tenants: &'static tokio::sync::RwLock<TenantsMap>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Tenant>> {
|
||||
anyhow::ensure!(
|
||||
@@ -219,6 +225,7 @@ pub fn schedule_local_tenant_processing(
|
||||
broker_client,
|
||||
remote_storage,
|
||||
init_order,
|
||||
tenants,
|
||||
ctx,
|
||||
)
|
||||
};
|
||||
@@ -356,7 +363,7 @@ pub async fn create_tenant(
|
||||
// See https://github.com/neondatabase/neon/issues/4233
|
||||
|
||||
let created_tenant =
|
||||
schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, None, ctx)?;
|
||||
schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, None, &TENANTS, ctx)?;
|
||||
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
|
||||
// See https://github.com/neondatabase/neon/issues/4233
|
||||
|
||||
@@ -417,6 +424,14 @@ pub async fn get_tenant(
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn delete_tenant(
|
||||
conf: &'static PageServerConf,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<(), DeleteTenantError> {
|
||||
DeleteTenantFlow::run(conf, remote_storage, &TENANTS, tenant_id).await
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum DeleteTimelineError {
|
||||
#[error("Tenant {0}")]
|
||||
@@ -432,7 +447,7 @@ pub async fn delete_timeline(
|
||||
_ctx: &RequestContext,
|
||||
) -> Result<(), DeleteTimelineError> {
|
||||
let tenant = get_tenant(tenant_id, true).await?;
|
||||
DeleteTimelineFlow::run(&tenant, timeline_id).await?;
|
||||
DeleteTimelineFlow::run(&tenant, timeline_id, false).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -507,7 +522,7 @@ pub async fn load_tenant(
|
||||
.with_context(|| format!("Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"))?;
|
||||
}
|
||||
|
||||
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, None, ctx)
|
||||
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, None, &TENANTS, ctx)
|
||||
.with_context(|| {
|
||||
format!("Failed to schedule tenant processing in path {tenant_path:?}")
|
||||
})?;
|
||||
@@ -588,7 +603,7 @@ pub async fn attach_tenant(
|
||||
.context("check for attach marker file existence")?;
|
||||
anyhow::ensure!(marker_file_exists, "create_tenant_files should have created the attach marker file");
|
||||
|
||||
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), None, ctx)?;
|
||||
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), None, &TENANTS, ctx)?;
|
||||
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
|
||||
// See https://github.com/neondatabase/neon/issues/4233
|
||||
|
||||
|
||||
@@ -919,7 +919,7 @@ impl Timeline {
|
||||
pub fn set_state(&self, new_state: TimelineState) {
|
||||
match (self.current_state(), new_state) {
|
||||
(equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => {
|
||||
warn!("Ignoring new state, equal to the existing one: {equal_state_2:?}");
|
||||
info!("Ignoring new state, equal to the existing one: {equal_state_2:?}");
|
||||
}
|
||||
(st, TimelineState::Loading) => {
|
||||
error!("ignoring transition from {st:?} into Loading state");
|
||||
|
||||
@@ -219,27 +219,13 @@ async fn delete_local_layer_files(
|
||||
}
|
||||
};
|
||||
|
||||
let r = if metadata.is_dir() {
|
||||
// There shouldnt be any directories inside timeline dir as of current layout.
|
||||
if metadata.is_dir() {
|
||||
warn!(path=%entry.path().display(), "unexpected directory under timeline dir");
|
||||
tokio::fs::remove_dir(entry.path()).await
|
||||
} else {
|
||||
tokio::fs::remove_file(entry.path()).await
|
||||
};
|
||||
|
||||
if let Err(e) = r {
|
||||
if e.kind() == std::io::ErrorKind::NotFound {
|
||||
warn!(
|
||||
timeline_dir=?local_timeline_directory,
|
||||
path=?entry.path().display(),
|
||||
"got not found err while removing timeline dir, proceeding anyway"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
anyhow::bail!(anyhow::anyhow!(
|
||||
"Failed to remove: {}. Error: {e}",
|
||||
entry.path().display()
|
||||
));
|
||||
}
|
||||
.with_context(|| format!("Failed to remove: {}", entry.path().display()))?;
|
||||
}
|
||||
|
||||
info!("finished deleting layer files, releasing layer_removal_cs.lock()");
|
||||
@@ -359,10 +345,11 @@ impl DeleteTimelineFlow {
|
||||
// NB: If this fails half-way through, and is retried, the retry will go through
|
||||
// all the same steps again. Make sure the code here is idempotent, and don't
|
||||
// error out if some of the shutdown tasks have already been completed!
|
||||
#[instrument(skip_all, fields(tenant_id=%tenant.tenant_id, %timeline_id))]
|
||||
#[instrument(skip(tenant), fields(tenant_id=%tenant.tenant_id))]
|
||||
pub async fn run(
|
||||
tenant: &Arc<Tenant>,
|
||||
timeline_id: TimelineId,
|
||||
inplace: bool,
|
||||
) -> Result<(), DeleteTimelineError> {
|
||||
let (timeline, mut guard) = Self::prepare(tenant, timeline_id)?;
|
||||
|
||||
@@ -380,7 +367,11 @@ impl DeleteTimelineFlow {
|
||||
))?
|
||||
});
|
||||
|
||||
Self::schedule_background(guard, tenant.conf, Arc::clone(tenant), timeline);
|
||||
if inplace {
|
||||
Self::background(guard, tenant.conf, tenant, &timeline).await?
|
||||
} else {
|
||||
Self::schedule_background(guard, tenant.conf, Arc::clone(tenant), timeline);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -398,6 +389,8 @@ impl DeleteTimelineFlow {
|
||||
}
|
||||
|
||||
/// Shortcut to create Timeline in stopping state and spawn deletion task.
|
||||
/// See corresponding parts of [`crate::tenant::delete::DeleteTenantFlow`]
|
||||
#[instrument(skip_all, fields(%timeline_id))]
|
||||
pub async fn resume_deletion(
|
||||
tenant: Arc<Tenant>,
|
||||
timeline_id: TimelineId,
|
||||
@@ -444,11 +437,15 @@ impl DeleteTimelineFlow {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(%timeline_id))]
|
||||
pub async fn cleanup_remaining_timeline_fs_traces(
|
||||
tenant: &Tenant,
|
||||
timeline_id: TimelineId,
|
||||
) -> anyhow::Result<()> {
|
||||
cleanup_remaining_timeline_fs_traces(tenant.conf, tenant.tenant_id, timeline_id).await
|
||||
let r =
|
||||
cleanup_remaining_timeline_fs_traces(tenant.conf, tenant.tenant_id, timeline_id).await;
|
||||
info!("Done");
|
||||
r
|
||||
}
|
||||
|
||||
fn prepare(
|
||||
@@ -494,11 +491,17 @@ impl DeleteTimelineFlow {
|
||||
// At the end of the operation we're holding the guard and need to lock timelines map
|
||||
// to remove the timeline from it.
|
||||
// Always if you have two locks that are taken in different order this can result in a deadlock.
|
||||
let delete_lock_guard = DeletionGuard(
|
||||
Arc::clone(&timeline.delete_progress)
|
||||
.try_lock_owned()
|
||||
.map_err(|_| DeleteTimelineError::AlreadyInProgress)?,
|
||||
);
|
||||
|
||||
let delete_progress = Arc::clone(&timeline.delete_progress);
|
||||
let delete_lock_guard = match delete_progress.try_lock_owned() {
|
||||
Ok(guard) => DeletionGuard(guard),
|
||||
Err(_) => {
|
||||
// Unfortunately if lock fails arc is consumed.
|
||||
return Err(DeleteTimelineError::AlreadyInProgress(Arc::clone(
|
||||
&timeline.delete_progress,
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
timeline.set_state(TimelineState::Stopping);
|
||||
|
||||
@@ -553,10 +556,14 @@ impl DeleteTimelineFlow {
|
||||
|
||||
remove_timeline_from_tenant(tenant, timeline.timeline_id, &guard).await?;
|
||||
|
||||
*guard.0 = Self::Finished;
|
||||
*guard = Self::Finished;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn is_finished(&self) -> bool {
|
||||
matches!(self, Self::Finished)
|
||||
}
|
||||
}
|
||||
|
||||
struct DeletionGuard(OwnedMutexGuard<DeleteTimelineFlow>);
|
||||
|
||||
@@ -32,6 +32,7 @@ import requests
|
||||
from _pytest.config import Config
|
||||
from _pytest.config.argparsing import Parser
|
||||
from _pytest.fixtures import FixtureRequest
|
||||
from mypy_boto3_s3 import S3Client
|
||||
|
||||
# Type-related stuff
|
||||
from psycopg2.extensions import connection as PgConnection
|
||||
@@ -432,7 +433,7 @@ class NeonEnvBuilder:
|
||||
self.port_distributor = port_distributor
|
||||
self.remote_storage = remote_storage
|
||||
self.ext_remote_storage: Optional[S3Storage] = None
|
||||
self.remote_storage_client: Optional[Any] = None
|
||||
self.remote_storage_client: Optional[S3Client] = None
|
||||
self.remote_storage_users = remote_storage_users
|
||||
self.broker = broker
|
||||
self.run_id = run_id
|
||||
@@ -875,7 +876,14 @@ class NeonEnv:
|
||||
|
||||
def timeline_dir(self, tenant_id: TenantId, timeline_id: TimelineId) -> Path:
|
||||
"""Get a timeline directory's path based on the repo directory of the test environment"""
|
||||
return self.repo_dir / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
|
||||
return self.tenant_dir(tenant_id) / "timelines" / str(timeline_id)
|
||||
|
||||
def tenant_dir(
|
||||
self,
|
||||
tenant_id: TenantId,
|
||||
) -> Path:
|
||||
"""Get a tenant directory's path based on the repo directory of the test environment"""
|
||||
return self.repo_dir / "tenants" / str(tenant_id)
|
||||
|
||||
def get_pageserver_version(self) -> str:
|
||||
bin_pageserver = str(self.neon_binpath / "pageserver")
|
||||
|
||||
@@ -210,6 +210,10 @@ class PageserverHttpClient(requests.Session):
|
||||
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/detach", params=params)
|
||||
self.verbose_error(res)
|
||||
|
||||
def tenant_delete(self, tenant_id: TenantId):
|
||||
res = self.delete(f"http://localhost:{self.port}/v1/tenant/{tenant_id}")
|
||||
self.verbose_error(res)
|
||||
|
||||
def tenant_load(self, tenant_id: TenantId):
|
||||
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/load")
|
||||
self.verbose_error(res)
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
import time
|
||||
from typing import Any, Dict, Optional
|
||||
from typing import TYPE_CHECKING, Any, Dict, Optional
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
|
||||
from fixtures.remote_storage import RemoteStorageKind, S3Storage
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
from fixtures.utils import wait_until
|
||||
|
||||
|
||||
def assert_tenant_state(
|
||||
@@ -199,20 +201,19 @@ def wait_timeline_detail_404(
|
||||
timeline_id: TimelineId,
|
||||
iterations: int,
|
||||
):
|
||||
last_exc = None
|
||||
for _ in range(iterations):
|
||||
time.sleep(0.250)
|
||||
def timeline_is_missing():
|
||||
data = {}
|
||||
try:
|
||||
data = pageserver_http.timeline_detail(tenant_id, timeline_id)
|
||||
log.info(f"detail {data}")
|
||||
log.info(f"timeline detail {data}")
|
||||
except PageserverApiException as e:
|
||||
log.debug(e)
|
||||
if e.status_code == 404:
|
||||
return
|
||||
|
||||
last_exc = e
|
||||
raise RuntimeError(f"Timeline exists state {data.get('state')}")
|
||||
|
||||
raise last_exc or RuntimeError(f"Timeline wasnt deleted in time, state: {data['state']}")
|
||||
wait_until(iterations, interval=0.250, func=timeline_is_missing)
|
||||
|
||||
|
||||
def timeline_delete_wait_completed(
|
||||
@@ -224,3 +225,71 @@ def timeline_delete_wait_completed(
|
||||
):
|
||||
pageserver_http.timeline_delete(tenant_id=tenant_id, timeline_id=timeline_id, **delete_args)
|
||||
wait_timeline_detail_404(pageserver_http, tenant_id, timeline_id, iterations)
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
# TODO avoid by combining remote storage related stuff in single type
|
||||
# and just passing in this type instead of whole builder
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
|
||||
|
||||
def assert_prefix_empty(neon_env_builder: "NeonEnvBuilder", prefix: Optional[str] = None):
|
||||
# For local_fs we need to properly handle empty directories, which we currently dont, so for simplicity stick to s3 api.
|
||||
assert neon_env_builder.remote_storage_kind in (
|
||||
RemoteStorageKind.MOCK_S3,
|
||||
RemoteStorageKind.REAL_S3,
|
||||
)
|
||||
# For mypy
|
||||
assert isinstance(neon_env_builder.remote_storage, S3Storage)
|
||||
assert neon_env_builder.remote_storage_client is not None
|
||||
|
||||
# Note that this doesnt use pagination, so list is not guaranteed to be exhaustive.
|
||||
response = neon_env_builder.remote_storage_client.list_objects_v2(
|
||||
Bucket=neon_env_builder.remote_storage.bucket_name,
|
||||
Prefix=prefix or neon_env_builder.remote_storage.prefix_in_bucket or "",
|
||||
)
|
||||
objects = response.get("Contents")
|
||||
assert (
|
||||
response["KeyCount"] == 0
|
||||
), f"remote dir with prefix {prefix} is not empty after deletion: {objects}"
|
||||
|
||||
|
||||
def wait_tenant_status_404(
|
||||
pageserver_http: PageserverHttpClient,
|
||||
tenant_id: TenantId,
|
||||
iterations: int,
|
||||
):
|
||||
def tenant_is_missing():
|
||||
data = {}
|
||||
try:
|
||||
data = pageserver_http.tenant_status(tenant_id)
|
||||
log.info(f"tenant status {data}")
|
||||
except PageserverApiException as e:
|
||||
log.debug(e)
|
||||
if e.status_code == 404:
|
||||
return
|
||||
|
||||
raise RuntimeError(f"Timeline exists state {data.get('state')}")
|
||||
|
||||
wait_until(iterations, interval=0.250, func=tenant_is_missing)
|
||||
|
||||
|
||||
def tenant_delete_wait_completed(
|
||||
pageserver_http: PageserverHttpClient,
|
||||
tenant_id: TenantId,
|
||||
iterations: int,
|
||||
):
|
||||
pageserver_http.tenant_delete(tenant_id=tenant_id)
|
||||
wait_tenant_status_404(pageserver_http, tenant_id=tenant_id, iterations=iterations)
|
||||
|
||||
|
||||
MANY_SMALL_LAYERS_TENANT_CONFIG = {
|
||||
"gc_period": "0s",
|
||||
"compaction_period": "0s",
|
||||
"checkpoint_distance": f"{1024**2}",
|
||||
"image_creation_threshold": "100",
|
||||
}
|
||||
|
||||
|
||||
def poll_for_remote_storage_iterations(remote_storage_kind: RemoteStorageKind) -> int:
|
||||
return 20 if remote_storage_kind is RemoteStorageKind.REAL_S3 else 6
|
||||
|
||||
@@ -6,13 +6,16 @@ import subprocess
|
||||
import tarfile
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any, Callable, Dict, List, Tuple, TypeVar
|
||||
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Tuple, TypeVar
|
||||
from urllib.parse import urlencode
|
||||
|
||||
import allure
|
||||
from psycopg2.extensions import cursor
|
||||
|
||||
from fixtures.log_helper import log
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from fixtures.neon_fixtures import PgBin
|
||||
from fixtures.types import TimelineId
|
||||
|
||||
Fn = TypeVar("Fn", bound=Callable[..., Any])
|
||||
@@ -314,3 +317,15 @@ def wait_while(number_of_iterations: int, interval: float, func):
|
||||
except Exception:
|
||||
return
|
||||
raise Exception("timed out while waiting for %s" % func)
|
||||
|
||||
|
||||
def run_pg_bench_small(pg_bin: "PgBin", connstr: str):
|
||||
"""
|
||||
Fast way to populate data.
|
||||
For more layers consider combining with these tenant settings:
|
||||
{
|
||||
"checkpoint_distance": 1024 ** 2,
|
||||
"image_creation_threshold": 100,
|
||||
}
|
||||
"""
|
||||
pg_bin.run(["pgbench", "-i", "-I dtGvp", "-s1", connstr])
|
||||
|
||||
@@ -97,6 +97,11 @@ def test_remote_storage_backup_and_restore(
|
||||
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
|
||||
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
|
||||
|
||||
# Thats because of UnreliableWrapper's injected failures
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*failed to fetch tenant deletion mark at tenants/({tenant_id}|{env.initial_tenant})/deleted attempt 1.*"
|
||||
)
|
||||
|
||||
checkpoint_numbers = range(1, 3)
|
||||
|
||||
for checkpoint_number in checkpoint_numbers:
|
||||
|
||||
250
test_runner/regress/test_tenant_delete.py
Normal file
250
test_runner/regress/test_tenant_delete.py
Normal file
@@ -0,0 +1,250 @@
|
||||
import enum
|
||||
import os
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
last_flush_lsn_upload,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverApiException
|
||||
from fixtures.pageserver.utils import (
|
||||
MANY_SMALL_LAYERS_TENANT_CONFIG,
|
||||
assert_prefix_empty,
|
||||
poll_for_remote_storage_iterations,
|
||||
tenant_delete_wait_completed,
|
||||
wait_tenant_status_404,
|
||||
wait_until_tenant_active,
|
||||
wait_until_tenant_state,
|
||||
)
|
||||
from fixtures.remote_storage import RemoteStorageKind, available_remote_storages
|
||||
from fixtures.types import TenantId
|
||||
from fixtures.utils import run_pg_bench_small
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"remote_storage_kind", [RemoteStorageKind.NOOP, *available_remote_storages()]
|
||||
)
|
||||
def test_tenant_delete_smoke(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
remote_storage_kind: RemoteStorageKind,
|
||||
pg_bin: PgBin,
|
||||
):
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=remote_storage_kind,
|
||||
test_name="test_tenant_delete_smoke",
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
# first try to delete non existing tenant
|
||||
tenant_id = TenantId.generate()
|
||||
env.pageserver.allowed_errors.append(f".*NotFound: tenant {tenant_id}.*")
|
||||
with pytest.raises(PageserverApiException, match=f"NotFound: tenant {tenant_id}"):
|
||||
ps_http.tenant_delete(tenant_id=tenant_id)
|
||||
|
||||
env.neon_cli.create_tenant(
|
||||
tenant_id=tenant_id,
|
||||
conf=MANY_SMALL_LAYERS_TENANT_CONFIG,
|
||||
)
|
||||
|
||||
# create two timelines one being the parent of another
|
||||
parent = None
|
||||
for timeline in ["first", "second"]:
|
||||
timeline_id = env.neon_cli.create_branch(
|
||||
timeline, tenant_id=tenant_id, ancestor_branch_name=parent
|
||||
)
|
||||
with env.endpoints.create_start(timeline, tenant_id=tenant_id) as endpoint:
|
||||
run_pg_bench_small(pg_bin, endpoint.connstr())
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant=tenant_id, timeline=timeline_id)
|
||||
|
||||
parent = timeline
|
||||
|
||||
iterations = poll_for_remote_storage_iterations(remote_storage_kind)
|
||||
|
||||
tenant_delete_wait_completed(ps_http, tenant_id, iterations)
|
||||
|
||||
tenant_path = env.tenant_dir(tenant_id=tenant_id)
|
||||
assert not tenant_path.exists()
|
||||
|
||||
if remote_storage_kind in [RemoteStorageKind.MOCK_S3, RemoteStorageKind.REAL_S3]:
|
||||
assert_prefix_empty(
|
||||
neon_env_builder,
|
||||
prefix="/".join(
|
||||
(
|
||||
"tenants",
|
||||
str(tenant_id),
|
||||
)
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
class Check(enum.Enum):
|
||||
RETRY_WITHOUT_RESTART = enum.auto()
|
||||
RETRY_WITH_RESTART = enum.auto()
|
||||
|
||||
|
||||
FAILPOINTS = [
|
||||
"tenant-delete-before-shutdown",
|
||||
"tenant-delete-before-create-remote-mark",
|
||||
"tenant-delete-before-create-local-mark",
|
||||
"tenant-delete-before-background",
|
||||
"tenant-delete-before-polling-ongoing-deletions",
|
||||
"tenant-delete-before-cleanup-remaining-fs-traces",
|
||||
"tenant-delete-before-remove-timelines-dir",
|
||||
"tenant-delete-before-remove-deleted-mark",
|
||||
"tenant-delete-before-remove-tenant-dir",
|
||||
# Some failpoints from timeline deletion
|
||||
"timeline-delete-before-index-deleted-at",
|
||||
"timeline-delete-before-rm",
|
||||
"timeline-delete-before-index-delete",
|
||||
"timeline-delete-after-rm-dir",
|
||||
]
|
||||
|
||||
FAILPOINTS_BEFORE_BACKGROUND = [
|
||||
"timeline-delete-before-schedule",
|
||||
"tenant-delete-before-shutdown",
|
||||
"tenant-delete-before-create-remote-mark",
|
||||
"tenant-delete-before-create-local-mark",
|
||||
"tenant-delete-before-background",
|
||||
]
|
||||
|
||||
|
||||
def combinations():
|
||||
result = []
|
||||
|
||||
remotes = [RemoteStorageKind.NOOP, RemoteStorageKind.MOCK_S3]
|
||||
if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE"):
|
||||
remotes.append(RemoteStorageKind.REAL_S3)
|
||||
|
||||
for remote_storage_kind in remotes:
|
||||
for delete_failpoint in FAILPOINTS:
|
||||
if remote_storage_kind == RemoteStorageKind.NOOP and delete_failpoint in (
|
||||
"timeline-delete-before-index-delete",
|
||||
):
|
||||
# the above failpoint are not relevant for config without remote storage
|
||||
continue
|
||||
|
||||
result.append((remote_storage_kind, delete_failpoint))
|
||||
return result
|
||||
|
||||
|
||||
@pytest.mark.parametrize("remote_storage_kind, failpoint", combinations())
|
||||
@pytest.mark.parametrize("check", list(Check))
|
||||
def test_delete_tenant_exercise_crash_safety_failpoints(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
remote_storage_kind: RemoteStorageKind,
|
||||
failpoint: str,
|
||||
check: Check,
|
||||
pg_bin: PgBin,
|
||||
):
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind, "test_delete_tenant_exercise_crash_safety_failpoints"
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=MANY_SMALL_LAYERS_TENANT_CONFIG)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
# From deletion polling
|
||||
f".*NotFound: tenant {env.initial_tenant}.*",
|
||||
# allow errors caused by failpoints
|
||||
f".*failpoint: {failpoint}",
|
||||
# It appears when we stopped flush loop during deletion (attempt) and then pageserver is stopped
|
||||
".*freeze_and_flush_on_shutdown.*failed to freeze and flush: cannot flush frozen layers when flush_loop is not running, state is Exited",
|
||||
# We may leave some upload tasks in the queue. They're likely deletes.
|
||||
# For uploads we explicitly wait with `last_flush_lsn_upload` below.
|
||||
# So by ignoring these instead of waiting for empty upload queue
|
||||
# we execute more distinct code paths.
|
||||
'.*stopping left-over name="remote upload".*',
|
||||
]
|
||||
)
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
timeline_id = env.neon_cli.create_timeline("delete", tenant_id=tenant_id)
|
||||
with env.endpoints.create_start("delete", tenant_id=tenant_id) as endpoint:
|
||||
# generate enough layers
|
||||
run_pg_bench_small(pg_bin, endpoint.connstr())
|
||||
if remote_storage_kind is RemoteStorageKind.NOOP:
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
else:
|
||||
last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id)
|
||||
|
||||
ps_http.configure_failpoints((failpoint, "return"))
|
||||
|
||||
iterations = poll_for_remote_storage_iterations(remote_storage_kind)
|
||||
|
||||
# These failpoints are earlier than background task is spawned.
|
||||
# so they result in api request failure.
|
||||
if failpoint in FAILPOINTS_BEFORE_BACKGROUND:
|
||||
with pytest.raises(PageserverApiException, match=failpoint):
|
||||
ps_http.tenant_delete(tenant_id)
|
||||
|
||||
else:
|
||||
ps_http.tenant_delete(tenant_id)
|
||||
tenant_info = wait_until_tenant_state(
|
||||
pageserver_http=ps_http,
|
||||
tenant_id=tenant_id,
|
||||
expected_state="Broken",
|
||||
iterations=iterations,
|
||||
)
|
||||
|
||||
reason = tenant_info["state"]["data"]["reason"]
|
||||
log.info(f"tenant broken: {reason}")
|
||||
|
||||
# failpoint may not be the only error in the stack
|
||||
assert reason.endswith(f"failpoint: {failpoint}"), reason
|
||||
|
||||
if check is Check.RETRY_WITH_RESTART:
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start()
|
||||
|
||||
if (
|
||||
remote_storage_kind is RemoteStorageKind.NOOP
|
||||
and failpoint == "tenant-delete-before-create-local-mark"
|
||||
):
|
||||
tenant_delete_wait_completed(ps_http, tenant_id, iterations=iterations)
|
||||
elif failpoint in (
|
||||
"tenant-delete-before-shutdown",
|
||||
"tenant-delete-before-create-remote-mark",
|
||||
):
|
||||
wait_until_tenant_active(
|
||||
ps_http, tenant_id=tenant_id, iterations=iterations, period=0.25
|
||||
)
|
||||
tenant_delete_wait_completed(ps_http, tenant_id, iterations=iterations)
|
||||
else:
|
||||
# Pageserver should've resumed deletion after restart.
|
||||
wait_tenant_status_404(ps_http, tenant_id, iterations=iterations + 10)
|
||||
elif check is Check.RETRY_WITHOUT_RESTART:
|
||||
# this should succeed
|
||||
# this also checks that delete can be retried even when tenant is in Broken state
|
||||
ps_http.configure_failpoints((failpoint, "off"))
|
||||
|
||||
tenant_delete_wait_completed(ps_http, tenant_id, iterations=iterations)
|
||||
|
||||
# Check remote is impty
|
||||
if remote_storage_kind is RemoteStorageKind.MOCK_S3:
|
||||
assert_prefix_empty(
|
||||
neon_env_builder,
|
||||
prefix="/".join(
|
||||
(
|
||||
"tenants",
|
||||
str(tenant_id),
|
||||
)
|
||||
),
|
||||
)
|
||||
|
||||
tenant_dir = env.tenant_dir(tenant_id)
|
||||
# Check local is empty
|
||||
assert not tenant_dir.exists()
|
||||
|
||||
|
||||
# TODO test concurrent deletions with "hang" failpoint
|
||||
# TODO test tenant delete continues after attach
|
||||
@@ -66,6 +66,10 @@ def test_tenant_reattach(
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*Tenant {tenant_id} will not become active\\. Current state: Stopping.*"
|
||||
)
|
||||
# Thats because of UnreliableWrapper's injected failures
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*failed to fetch tenant deletion mark at tenants/({tenant_id}|{env.initial_tenant})/deleted attempt 1.*"
|
||||
)
|
||||
|
||||
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
|
||||
with endpoint.cursor() as cur:
|
||||
|
||||
@@ -146,6 +146,11 @@ def test_tenants_attached_after_download(
|
||||
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
|
||||
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
|
||||
|
||||
# Thats because of UnreliableWrapper's injected failures
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*failed to fetch tenant deletion mark at tenants/({tenant_id}|{env.initial_tenant})/deleted attempt 1.*"
|
||||
)
|
||||
|
||||
for checkpoint_number in range(1, 3):
|
||||
with endpoint.cursor() as cur:
|
||||
cur.execute(
|
||||
|
||||
@@ -4,7 +4,6 @@ import queue
|
||||
import shutil
|
||||
import threading
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
@@ -18,6 +17,8 @@ from fixtures.neon_fixtures import (
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverApiException
|
||||
from fixtures.pageserver.utils import (
|
||||
assert_prefix_empty,
|
||||
poll_for_remote_storage_iterations,
|
||||
timeline_delete_wait_completed,
|
||||
wait_for_last_record_lsn,
|
||||
wait_for_upload,
|
||||
@@ -27,7 +28,6 @@ from fixtures.pageserver.utils import (
|
||||
)
|
||||
from fixtures.remote_storage import (
|
||||
RemoteStorageKind,
|
||||
S3Storage,
|
||||
available_remote_storages,
|
||||
)
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
@@ -187,10 +187,9 @@ def test_delete_timeline_exercise_crash_safety_failpoints(
|
||||
8. Retry or restart without the failpoint and check the result.
|
||||
"""
|
||||
|
||||
if remote_storage_kind is not None:
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind, "test_delete_timeline_exercise_crash_safety_failpoints"
|
||||
)
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind, "test_delete_timeline_exercise_crash_safety_failpoints"
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf={
|
||||
@@ -231,7 +230,7 @@ def test_delete_timeline_exercise_crash_safety_failpoints(
|
||||
|
||||
ps_http.configure_failpoints((failpoint, "return"))
|
||||
|
||||
iterations = 20 if remote_storage_kind is RemoteStorageKind.REAL_S3 else 4
|
||||
iterations = poll_for_remote_storage_iterations(remote_storage_kind)
|
||||
|
||||
# These failpoints are earlier than background task is spawned.
|
||||
# so they result in api request failure.
|
||||
@@ -280,14 +279,14 @@ def test_delete_timeline_exercise_crash_safety_failpoints(
|
||||
"remote_storage_s3_request_seconds_count",
|
||||
filter={"request_type": "get_object", "result": "err"},
|
||||
).value
|
||||
== 1
|
||||
== 2 # One is missing tenant deletion mark, second is missing index part
|
||||
)
|
||||
assert (
|
||||
m.query_one(
|
||||
"remote_storage_s3_request_seconds_count",
|
||||
filter={"request_type": "get_object", "result": "ok"},
|
||||
).value
|
||||
== 1
|
||||
== 1 # index part for initial timeline
|
||||
)
|
||||
elif check is Check.RETRY_WITHOUT_RESTART:
|
||||
# this should succeed
|
||||
@@ -413,27 +412,6 @@ def test_timeline_resurrection_on_attach(
|
||||
assert all([tl["state"] == "Active" for tl in timelines])
|
||||
|
||||
|
||||
def assert_prefix_empty(neon_env_builder: NeonEnvBuilder, prefix: Optional[str] = None):
|
||||
# For local_fs we need to properly handle empty directories, which we currently dont, so for simplicity stick to s3 api.
|
||||
assert neon_env_builder.remote_storage_kind in (
|
||||
RemoteStorageKind.MOCK_S3,
|
||||
RemoteStorageKind.REAL_S3,
|
||||
)
|
||||
# For mypy
|
||||
assert isinstance(neon_env_builder.remote_storage, S3Storage)
|
||||
|
||||
# Note that this doesnt use pagination, so list is not guaranteed to be exhaustive.
|
||||
assert neon_env_builder.remote_storage_client is not None
|
||||
response = neon_env_builder.remote_storage_client.list_objects_v2(
|
||||
Bucket=neon_env_builder.remote_storage.bucket_name,
|
||||
Prefix=prefix or neon_env_builder.remote_storage.prefix_in_bucket or "",
|
||||
)
|
||||
objects = response.get("Contents")
|
||||
assert (
|
||||
response["KeyCount"] == 0
|
||||
), f"remote dir with prefix {prefix} is not empty after deletion: {objects}"
|
||||
|
||||
|
||||
def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
When deleting a timeline, if we succeed in setting the deleted flag remotely
|
||||
|
||||
Reference in New Issue
Block a user